hat.drivers.iec60870.encodings.iec101

IEC 60870-5-101 messages

  1"""IEC 60870-5-101 messages"""
  2
  3from hat.drivers.iec60870.encodings.iec101.common import (
  4    AsduTypeError,
  5    CauseSize,
  6    AsduAddressSize,
  7    IoAddressSize,
  8    TimeSize,
  9    Time,
 10    time_from_datetime,
 11    time_to_datetime,
 12    OriginatorAddress,
 13    AsduAddress,
 14    IoAddress,
 15    OtherCauseType,
 16    AsduType,
 17    CauseType,
 18    Cause,
 19    QualityType,
 20    IndicationQuality,
 21    MeasurementQuality,
 22    CounterQuality,
 23    ProtectionQuality,
 24    Quality,
 25    FreezeCode,
 26    SingleValue,
 27    DoubleValue,
 28    RegulatingValue,
 29    StepPositionValue,
 30    BitstringValue,
 31    NormalizedValue,
 32    ScaledValue,
 33    FloatingValue,
 34    BinaryCounterValue,
 35    ProtectionValue,
 36    ProtectionStartValue,
 37    ProtectionCommandValue,
 38    StatusValue,
 39    IoElement_M_SP_NA,
 40    IoElement_M_SP_TA,
 41    IoElement_M_DP_NA,
 42    IoElement_M_DP_TA,
 43    IoElement_M_ST_NA,
 44    IoElement_M_ST_TA,
 45    IoElement_M_BO_NA,
 46    IoElement_M_BO_TA,
 47    IoElement_M_ME_NA,
 48    IoElement_M_ME_TA,
 49    IoElement_M_ME_NB,
 50    IoElement_M_ME_TB,
 51    IoElement_M_ME_NC,
 52    IoElement_M_ME_TC,
 53    IoElement_M_IT_NA,
 54    IoElement_M_IT_TA,
 55    IoElement_M_EP_TA,
 56    IoElement_M_EP_TB,
 57    IoElement_M_EP_TC,
 58    IoElement_M_PS_NA,
 59    IoElement_M_ME_ND,
 60    IoElement_M_SP_TB,
 61    IoElement_M_DP_TB,
 62    IoElement_M_ST_TB,
 63    IoElement_M_BO_TB,
 64    IoElement_M_ME_TD,
 65    IoElement_M_ME_TE,
 66    IoElement_M_ME_TF,
 67    IoElement_M_IT_TB,
 68    IoElement_M_EP_TD,
 69    IoElement_M_EP_TE,
 70    IoElement_M_EP_TF,
 71    IoElement_C_SC_NA,
 72    IoElement_C_DC_NA,
 73    IoElement_C_RC_NA,
 74    IoElement_C_SE_NA,
 75    IoElement_C_SE_NB,
 76    IoElement_C_SE_NC,
 77    IoElement_C_BO_NA,
 78    IoElement_M_EI_NA,
 79    IoElement_C_IC_NA,
 80    IoElement_C_CI_NA,
 81    IoElement_C_RD_NA,
 82    IoElement_C_CS_NA,
 83    IoElement_C_TS_NA,
 84    IoElement_C_RP_NA,
 85    IoElement_C_CD_NA,
 86    IoElement_P_ME_NA,
 87    IoElement_P_ME_NB,
 88    IoElement_P_ME_NC,
 89    IoElement_P_AC_NA,
 90    IoElement_F_FR_NA,
 91    IoElement_F_SR_NA,
 92    IoElement_F_SC_NA,
 93    IoElement_F_LS_NA,
 94    IoElement_F_AF_NA,
 95    IoElement_F_SG_NA,
 96    IoElement_F_DR_TA,
 97    IoElement,
 98    IO,
 99    ASDU)
100from hat.drivers.iec60870.encodings.iec101.encoder import (
101    Encoder,
102    asdu_type_time_sizes,
103    decode_cause,
104    encode_cause,
105    decode_cause_type,
106    encode_cause_type,
107    decode_io_element,
108    encode_io_element,
109    decode_quality,
110    encode_quality,
111    decode_step_position_value,
112    encode_step_position_value,
113    decode_bitstring_value,
114    encode_bitstring_value,
115    decode_normalized_value,
116    encode_normalized_value,
117    decode_scaled_value,
118    encode_scaled_value,
119    decode_floating_value,
120    encode_floating_value,
121    decode_binary_counter_value,
122    encode_binary_counter_value,
123    decode_protection_start_value,
124    encode_protection_start_value,
125    decode_protection_command_value,
126    encode_protection_command_value,
127    decode_status_value,
128    encode_status_value)
129
130
131__all__ = ['AsduTypeError',
132           'CauseSize',
133           'AsduAddressSize',
134           'IoAddressSize',
135           'TimeSize',
136           'Time',
137           'time_from_datetime',
138           'time_to_datetime',
139           'OriginatorAddress',
140           'AsduAddress',
141           'IoAddress',
142           'OtherCauseType',
143           'AsduType',
144           'CauseType',
145           'Cause',
146           'QualityType',
147           'IndicationQuality',
148           'MeasurementQuality',
149           'CounterQuality',
150           'ProtectionQuality',
151           'Quality',
152           'FreezeCode',
153           'SingleValue',
154           'DoubleValue',
155           'RegulatingValue',
156           'StepPositionValue',
157           'BitstringValue',
158           'NormalizedValue',
159           'ScaledValue',
160           'FloatingValue',
161           'BinaryCounterValue',
162           'ProtectionValue',
163           'ProtectionStartValue',
164           'ProtectionCommandValue',
165           'StatusValue',
166           'IoElement_M_SP_NA',
167           'IoElement_M_SP_TA',
168           'IoElement_M_DP_NA',
169           'IoElement_M_DP_TA',
170           'IoElement_M_ST_NA',
171           'IoElement_M_ST_TA',
172           'IoElement_M_BO_NA',
173           'IoElement_M_BO_TA',
174           'IoElement_M_ME_NA',
175           'IoElement_M_ME_TA',
176           'IoElement_M_ME_NB',
177           'IoElement_M_ME_TB',
178           'IoElement_M_ME_NC',
179           'IoElement_M_ME_TC',
180           'IoElement_M_IT_NA',
181           'IoElement_M_IT_TA',
182           'IoElement_M_EP_TA',
183           'IoElement_M_EP_TB',
184           'IoElement_M_EP_TC',
185           'IoElement_M_PS_NA',
186           'IoElement_M_ME_ND',
187           'IoElement_M_SP_TB',
188           'IoElement_M_DP_TB',
189           'IoElement_M_ST_TB',
190           'IoElement_M_BO_TB',
191           'IoElement_M_ME_TD',
192           'IoElement_M_ME_TE',
193           'IoElement_M_ME_TF',
194           'IoElement_M_IT_TB',
195           'IoElement_M_EP_TD',
196           'IoElement_M_EP_TE',
197           'IoElement_M_EP_TF',
198           'IoElement_C_SC_NA',
199           'IoElement_C_DC_NA',
200           'IoElement_C_RC_NA',
201           'IoElement_C_SE_NA',
202           'IoElement_C_SE_NB',
203           'IoElement_C_SE_NC',
204           'IoElement_C_BO_NA',
205           'IoElement_M_EI_NA',
206           'IoElement_C_IC_NA',
207           'IoElement_C_CI_NA',
208           'IoElement_C_RD_NA',
209           'IoElement_C_CS_NA',
210           'IoElement_C_TS_NA',
211           'IoElement_C_RP_NA',
212           'IoElement_C_CD_NA',
213           'IoElement_P_ME_NA',
214           'IoElement_P_ME_NB',
215           'IoElement_P_ME_NC',
216           'IoElement_P_AC_NA',
217           'IoElement_F_FR_NA',
218           'IoElement_F_SR_NA',
219           'IoElement_F_SC_NA',
220           'IoElement_F_LS_NA',
221           'IoElement_F_AF_NA',
222           'IoElement_F_SG_NA',
223           'IoElement_F_DR_TA',
224           'IoElement',
225           'IO',
226           'ASDU',
227           'Encoder',
228           'asdu_type_time_sizes',
229           'decode_cause',
230           'encode_cause',
231           'decode_cause_type',
232           'encode_cause_type',
233           'decode_io_element',
234           'encode_io_element',
235           'decode_quality',
236           'encode_quality',
237           'decode_step_position_value',
238           'encode_step_position_value',
239           'decode_bitstring_value',
240           'encode_bitstring_value',
241           'decode_normalized_value',
242           'encode_normalized_value',
243           'decode_scaled_value',
244           'encode_scaled_value',
245           'decode_floating_value',
246           'encode_floating_value',
247           'decode_binary_counter_value',
248           'encode_binary_counter_value',
249           'decode_protection_start_value',
250           'encode_protection_start_value',
251           'decode_protection_command_value',
252           'encode_protection_command_value',
253           'decode_status_value',
254           'encode_status_value']
class AsduTypeError(builtins.Exception):
8class AsduTypeError(Exception):
9    pass

Common base class for all non-exit exceptions.

class CauseSize(enum.Enum):
12class CauseSize(enum.Enum):
13    ONE = 1
14    TWO = 2
ONE = <CauseSize.ONE: 1>
TWO = <CauseSize.TWO: 2>
class AsduAddressSize(enum.Enum):
17class AsduAddressSize(enum.Enum):
18    ONE = 1
19    TWO = 2
ONE = <AsduAddressSize.ONE: 1>
TWO = <AsduAddressSize.TWO: 2>
class IoAddressSize(enum.Enum):
22class IoAddressSize(enum.Enum):
23    ONE = 1
24    TWO = 2
25    THREE = 3
ONE = <IoAddressSize.ONE: 1>
TWO = <IoAddressSize.TWO: 2>
THREE = <IoAddressSize.THREE: 3>
class TimeSize(enum.Enum):
28class TimeSize(enum.Enum):
29    TWO = 2
30    THREE = 3
31    FOUR = 4
32    SEVEN = 7
TWO = <TimeSize.TWO: 2>
THREE = <TimeSize.THREE: 3>
FOUR = <TimeSize.FOUR: 4>
SEVEN = <TimeSize.SEVEN: 7>
class Time(typing.NamedTuple):
35class Time(typing.NamedTuple):
36    size: TimeSize
37    milliseconds: int
38    """milliseconds in range [0, 59999]"""
39    invalid: bool | None
40    """available for size THREE, FOUR, SEVEN"""
41    minutes: int | None
42    """available for size THREE, FOUR, SEVEN (minutes in range [0, 59])"""
43    summer_time: bool | None
44    """available for size FOUR, SEVEN"""
45    hours: int | None
46    """available for size FOUR, SEVEN (hours in range [0, 23])"""
47    day_of_week: int | None
48    """available for size SEVEN (day_of_week in range [1, 7])"""
49    day_of_month: int | None
50    """available for size SEVEN (day_of_month in range [1, 31])"""
51    months: int | None
52    """available for size SEVEN (months in range [1, 12])"""
53    years: int | None
54    """available for size SEVEN (years in range [0, 99])"""

Time(size, milliseconds, invalid, minutes, summer_time, hours, day_of_week, day_of_month, months, years)

Time( size: TimeSize, milliseconds: int, invalid: bool | None, minutes: int | None, summer_time: bool | None, hours: int | None, day_of_week: int | None, day_of_month: int | None, months: int | None, years: int | None)

Create new instance of Time(size, milliseconds, invalid, minutes, summer_time, hours, day_of_week, day_of_month, months, years)

size: TimeSize

Alias for field number 0

milliseconds: int

milliseconds in range [0, 59999]

invalid: bool | None

available for size THREE, FOUR, SEVEN

minutes: int | None

available for size THREE, FOUR, SEVEN (minutes in range [0, 59])

summer_time: bool | None

available for size FOUR, SEVEN

hours: int | None

available for size FOUR, SEVEN (hours in range [0, 23])

day_of_week: int | None

available for size SEVEN (day_of_week in range [1, 7])

day_of_month: int | None

available for size SEVEN (day_of_month in range [1, 31])

months: int | None

available for size SEVEN (months in range [1, 12])

years: int | None

available for size SEVEN (years in range [0, 99])

def time_from_datetime( dt: datetime.datetime, invalid: bool = False) -> Time:
70def time_from_datetime(dt: datetime.datetime,
71                       invalid: bool = False
72                       ) -> Time:
73    """Create Time from datetime.datetime"""
74    # TODO document edge cases (local time, os implementation, ...)
75    #  rounding microseconds to the nearest millisecond
76    dt_rounded = (
77        dt.replace(microsecond=0) +
78        datetime.timedelta(milliseconds=round(dt.microsecond / 1000)))
79    local_time = time.localtime(dt_rounded.timestamp())
80
81    return Time(
82        size=TimeSize.SEVEN,
83        milliseconds=(local_time.tm_sec * 1000 +
84                      dt_rounded.microsecond // 1000),
85        invalid=invalid,
86        minutes=local_time.tm_min,
87        summer_time=bool(local_time.tm_isdst),
88        hours=local_time.tm_hour,
89        day_of_week=local_time.tm_wday + 1,
90        day_of_month=local_time.tm_mday,
91        months=local_time.tm_mon,
92        years=local_time.tm_year % 100)

Create Time from datetime.datetime

def time_to_datetime(t: Time) -> datetime.datetime:
 95def time_to_datetime(t: Time
 96                     ) -> datetime.datetime:
 97    """Convert Time to datetime.datetime"""
 98    # TODO document edge cases (local time, os implementation, ...)
 99    # TODO support TimeSize.FOUR
100    if t.size == TimeSize.TWO:
101        local_now = datetime.datetime.now()
102        local_dt = local_now.replace(
103            second=int(t.milliseconds / 1000),
104            microsecond=(t.milliseconds % 1000) * 1000)
105
106        local_seconds = local_now.second + local_now.microsecond / 1_000_000
107        t_seconds = t.milliseconds / 1_000
108
109        if abs(local_seconds - t_seconds) > 30:
110            if local_seconds < t_seconds:
111                local_dt = local_dt - datetime.timedelta(minutes=1)
112
113            else:
114                local_dt = local_dt + datetime.timedelta(minutes=1)
115
116    elif t.size == TimeSize.THREE:
117        local_now = datetime.datetime.now()
118        local_dt = local_now.replace(
119            minute=t.minutes,
120            second=int(t.milliseconds / 1000),
121            microsecond=(t.milliseconds % 1000) * 1000)
122
123        local_minutes = (local_now.minute +
124                         local_now.second / 60 +
125                         local_now.microsecond / 60_000_000)
126        t_minutes = t.minutes + t.milliseconds / 60_000
127
128        if abs(local_minutes - t_minutes) > 30:
129            if local_minutes < t_minutes:
130                local_dt = local_dt - datetime.timedelta(hours=1)
131
132            else:
133                local_dt = local_dt + datetime.timedelta(hours=1)
134
135    elif t.size == TimeSize.SEVEN:
136        local_dt = datetime.datetime(
137            year=2000 + t.years if t.years < 70 else 1900 + t.years,
138            month=t.months,
139            day=t.day_of_month,
140            hour=t.hours,
141            minute=t.minutes,
142            second=int(t.milliseconds / 1000),
143            microsecond=(t.milliseconds % 1000) * 1000,
144            fold=not t.summer_time)
145
146    else:
147        raise ValueError('unsupported time size')
148
149    return local_dt.astimezone(tz=datetime.timezone.utc)

Convert Time to datetime.datetime

OriginatorAddress = <class 'int'>
AsduAddress = <class 'int'>
IoAddress = <class 'int'>
OtherCauseType = <class 'int'>
class AsduType(enum.Enum):
25class AsduType(enum.Enum):
26    M_SP_NA = 1
27    M_SP_TA = 2
28    M_DP_NA = 3
29    M_DP_TA = 4
30    M_ST_NA = 5
31    M_ST_TA = 6
32    M_BO_NA = 7
33    M_BO_TA = 8
34    M_ME_NA = 9
35    M_ME_TA = 10
36    M_ME_NB = 11
37    M_ME_TB = 12
38    M_ME_NC = 13
39    M_ME_TC = 14
40    M_IT_NA = 15
41    M_IT_TA = 16
42    M_EP_TA = 17
43    M_EP_TB = 18
44    M_EP_TC = 19
45    M_PS_NA = 20
46    M_ME_ND = 21
47    M_SP_TB = 30
48    M_DP_TB = 31
49    M_ST_TB = 32
50    M_BO_TB = 33
51    M_ME_TD = 34
52    M_ME_TE = 35
53    M_ME_TF = 36
54    M_IT_TB = 37
55    M_EP_TD = 38
56    M_EP_TE = 39
57    M_EP_TF = 40
58    C_SC_NA = 45
59    C_DC_NA = 46
60    C_RC_NA = 47
61    C_SE_NA = 48
62    C_SE_NB = 49
63    C_SE_NC = 50
64    C_BO_NA = 51
65    M_EI_NA = 70
66    C_IC_NA = 100
67    C_CI_NA = 101
68    C_RD_NA = 102
69    C_CS_NA = 103
70    C_TS_NA = 104
71    C_RP_NA = 105
72    C_CD_NA = 106
73    P_ME_NA = 110
74    P_ME_NB = 111
75    P_ME_NC = 112
76    P_AC_NA = 113
77    F_FR_NA = 120
78    F_SR_NA = 121
79    F_SC_NA = 122
80    F_LS_NA = 123
81    F_AF_NA = 124
82    F_SG_NA = 125
83    F_DR_TA = 126
M_SP_NA = <AsduType.M_SP_NA: 1>
M_SP_TA = <AsduType.M_SP_TA: 2>
M_DP_NA = <AsduType.M_DP_NA: 3>
M_DP_TA = <AsduType.M_DP_TA: 4>
M_ST_NA = <AsduType.M_ST_NA: 5>
M_ST_TA = <AsduType.M_ST_TA: 6>
M_BO_NA = <AsduType.M_BO_NA: 7>
M_BO_TA = <AsduType.M_BO_TA: 8>
M_ME_NA = <AsduType.M_ME_NA: 9>
M_ME_TA = <AsduType.M_ME_TA: 10>
M_ME_NB = <AsduType.M_ME_NB: 11>
M_ME_TB = <AsduType.M_ME_TB: 12>
M_ME_NC = <AsduType.M_ME_NC: 13>
M_ME_TC = <AsduType.M_ME_TC: 14>
M_IT_NA = <AsduType.M_IT_NA: 15>
M_IT_TA = <AsduType.M_IT_TA: 16>
M_EP_TA = <AsduType.M_EP_TA: 17>
M_EP_TB = <AsduType.M_EP_TB: 18>
M_EP_TC = <AsduType.M_EP_TC: 19>
M_PS_NA = <AsduType.M_PS_NA: 20>
M_ME_ND = <AsduType.M_ME_ND: 21>
M_SP_TB = <AsduType.M_SP_TB: 30>
M_DP_TB = <AsduType.M_DP_TB: 31>
M_ST_TB = <AsduType.M_ST_TB: 32>
M_BO_TB = <AsduType.M_BO_TB: 33>
M_ME_TD = <AsduType.M_ME_TD: 34>
M_ME_TE = <AsduType.M_ME_TE: 35>
M_ME_TF = <AsduType.M_ME_TF: 36>
M_IT_TB = <AsduType.M_IT_TB: 37>
M_EP_TD = <AsduType.M_EP_TD: 38>
M_EP_TE = <AsduType.M_EP_TE: 39>
M_EP_TF = <AsduType.M_EP_TF: 40>
C_SC_NA = <AsduType.C_SC_NA: 45>
C_DC_NA = <AsduType.C_DC_NA: 46>
C_RC_NA = <AsduType.C_RC_NA: 47>
C_SE_NA = <AsduType.C_SE_NA: 48>
C_SE_NB = <AsduType.C_SE_NB: 49>
C_SE_NC = <AsduType.C_SE_NC: 50>
C_BO_NA = <AsduType.C_BO_NA: 51>
M_EI_NA = <AsduType.M_EI_NA: 70>
C_IC_NA = <AsduType.C_IC_NA: 100>
C_CI_NA = <AsduType.C_CI_NA: 101>
C_RD_NA = <AsduType.C_RD_NA: 102>
C_CS_NA = <AsduType.C_CS_NA: 103>
C_TS_NA = <AsduType.C_TS_NA: 104>
C_RP_NA = <AsduType.C_RP_NA: 105>
C_CD_NA = <AsduType.C_CD_NA: 106>
P_ME_NA = <AsduType.P_ME_NA: 110>
P_ME_NB = <AsduType.P_ME_NB: 111>
P_ME_NC = <AsduType.P_ME_NC: 112>
P_AC_NA = <AsduType.P_AC_NA: 113>
F_FR_NA = <AsduType.F_FR_NA: 120>
F_SR_NA = <AsduType.F_SR_NA: 121>
F_SC_NA = <AsduType.F_SC_NA: 122>
F_LS_NA = <AsduType.F_LS_NA: 123>
F_AF_NA = <AsduType.F_AF_NA: 124>
F_SG_NA = <AsduType.F_SG_NA: 125>
F_DR_TA = <AsduType.F_DR_TA: 126>
class CauseType(enum.Enum):
 86class CauseType(enum.Enum):
 87    UNDEFINED = 0
 88    PERIODIC = 1
 89    BACKGROUND_SCAN = 2
 90    SPONTANEOUS = 3
 91    INITIALIZED = 4
 92    REQUEST = 5
 93    ACTIVATION = 6
 94    ACTIVATION_CONFIRMATION = 7
 95    DEACTIVATION = 8
 96    DEACTIVATION_CONFIRMATION = 9
 97    ACTIVATION_TERMINATION = 10
 98    REMOTE_COMMAND = 11
 99    LOCAL_COMMAND = 12
100    FILE_TRANSFER = 13
101    INTERROGATED_STATION = 20
102    INTERROGATED_GROUP01 = 21
103    INTERROGATED_GROUP02 = 22
104    INTERROGATED_GROUP03 = 23
105    INTERROGATED_GROUP04 = 24
106    INTERROGATED_GROUP05 = 25
107    INTERROGATED_GROUP06 = 26
108    INTERROGATED_GROUP07 = 27
109    INTERROGATED_GROUP08 = 28
110    INTERROGATED_GROUP09 = 29
111    INTERROGATED_GROUP10 = 30
112    INTERROGATED_GROUP11 = 31
113    INTERROGATED_GROUP12 = 32
114    INTERROGATED_GROUP13 = 33
115    INTERROGATED_GROUP14 = 34
116    INTERROGATED_GROUP15 = 35
117    INTERROGATED_GROUP16 = 36
118    INTERROGATED_COUNTER = 37
119    INTERROGATED_COUNTER01 = 38
120    INTERROGATED_COUNTER02 = 39
121    INTERROGATED_COUNTER03 = 40
122    INTERROGATED_COUNTER04 = 41
123    UNKNOWN_TYPE = 44
124    UNKNOWN_CAUSE = 45
125    UNKNOWN_ASDU_ADDRESS = 46
126    UNKNOWN_IO_ADDRESS = 47
UNDEFINED = <CauseType.UNDEFINED: 0>
PERIODIC = <CauseType.PERIODIC: 1>
BACKGROUND_SCAN = <CauseType.BACKGROUND_SCAN: 2>
SPONTANEOUS = <CauseType.SPONTANEOUS: 3>
INITIALIZED = <CauseType.INITIALIZED: 4>
REQUEST = <CauseType.REQUEST: 5>
ACTIVATION = <CauseType.ACTIVATION: 6>
ACTIVATION_CONFIRMATION = <CauseType.ACTIVATION_CONFIRMATION: 7>
DEACTIVATION = <CauseType.DEACTIVATION: 8>
DEACTIVATION_CONFIRMATION = <CauseType.DEACTIVATION_CONFIRMATION: 9>
ACTIVATION_TERMINATION = <CauseType.ACTIVATION_TERMINATION: 10>
REMOTE_COMMAND = <CauseType.REMOTE_COMMAND: 11>
LOCAL_COMMAND = <CauseType.LOCAL_COMMAND: 12>
FILE_TRANSFER = <CauseType.FILE_TRANSFER: 13>
INTERROGATED_STATION = <CauseType.INTERROGATED_STATION: 20>
INTERROGATED_GROUP01 = <CauseType.INTERROGATED_GROUP01: 21>
INTERROGATED_GROUP02 = <CauseType.INTERROGATED_GROUP02: 22>
INTERROGATED_GROUP03 = <CauseType.INTERROGATED_GROUP03: 23>
INTERROGATED_GROUP04 = <CauseType.INTERROGATED_GROUP04: 24>
INTERROGATED_GROUP05 = <CauseType.INTERROGATED_GROUP05: 25>
INTERROGATED_GROUP06 = <CauseType.INTERROGATED_GROUP06: 26>
INTERROGATED_GROUP07 = <CauseType.INTERROGATED_GROUP07: 27>
INTERROGATED_GROUP08 = <CauseType.INTERROGATED_GROUP08: 28>
INTERROGATED_GROUP09 = <CauseType.INTERROGATED_GROUP09: 29>
INTERROGATED_GROUP10 = <CauseType.INTERROGATED_GROUP10: 30>
INTERROGATED_GROUP11 = <CauseType.INTERROGATED_GROUP11: 31>
INTERROGATED_GROUP12 = <CauseType.INTERROGATED_GROUP12: 32>
INTERROGATED_GROUP13 = <CauseType.INTERROGATED_GROUP13: 33>
INTERROGATED_GROUP14 = <CauseType.INTERROGATED_GROUP14: 34>
INTERROGATED_GROUP15 = <CauseType.INTERROGATED_GROUP15: 35>
INTERROGATED_GROUP16 = <CauseType.INTERROGATED_GROUP16: 36>
INTERROGATED_COUNTER = <CauseType.INTERROGATED_COUNTER: 37>
INTERROGATED_COUNTER01 = <CauseType.INTERROGATED_COUNTER01: 38>
INTERROGATED_COUNTER02 = <CauseType.INTERROGATED_COUNTER02: 39>
INTERROGATED_COUNTER03 = <CauseType.INTERROGATED_COUNTER03: 40>
INTERROGATED_COUNTER04 = <CauseType.INTERROGATED_COUNTER04: 41>
UNKNOWN_TYPE = <CauseType.UNKNOWN_TYPE: 44>
UNKNOWN_CAUSE = <CauseType.UNKNOWN_CAUSE: 45>
UNKNOWN_ASDU_ADDRESS = <CauseType.UNKNOWN_ASDU_ADDRESS: 46>
UNKNOWN_IO_ADDRESS = <CauseType.UNKNOWN_IO_ADDRESS: 47>
class Cause(typing.NamedTuple):
129class Cause(typing.NamedTuple):
130    type: CauseType | OtherCauseType
131    is_negative_confirm: bool
132    is_test: bool
133    originator_address: OriginatorAddress

Cause(type, is_negative_confirm, is_test, originator_address)

Cause( type: CauseType | int, is_negative_confirm: bool, is_test: bool, originator_address: int)

Create new instance of Cause(type, is_negative_confirm, is_test, originator_address)

type: CauseType | int

Alias for field number 0

is_negative_confirm: bool

Alias for field number 1

is_test: bool

Alias for field number 2

originator_address: int

Alias for field number 3

class QualityType(enum.Enum):
136class QualityType(enum.Enum):
137    INDICATION = 0
138    MEASUREMENT = 1
139    COUNTER = 2
140    PROTECTION = 3
INDICATION = <QualityType.INDICATION: 0>
MEASUREMENT = <QualityType.MEASUREMENT: 1>
COUNTER = <QualityType.COUNTER: 2>
PROTECTION = <QualityType.PROTECTION: 3>
class IndicationQuality(typing.NamedTuple):
143class IndicationQuality(typing.NamedTuple):
144    invalid: bool
145    not_topical: bool
146    substituted: bool
147    blocked: bool

IndicationQuality(invalid, not_topical, substituted, blocked)

IndicationQuality(invalid: bool, not_topical: bool, substituted: bool, blocked: bool)

Create new instance of IndicationQuality(invalid, not_topical, substituted, blocked)

invalid: bool

Alias for field number 0

not_topical: bool

Alias for field number 1

substituted: bool

Alias for field number 2

blocked: bool

Alias for field number 3

class MeasurementQuality(typing.NamedTuple):
150class MeasurementQuality(typing.NamedTuple):
151    invalid: bool
152    not_topical: bool
153    substituted: bool
154    blocked: bool
155    overflow: bool

MeasurementQuality(invalid, not_topical, substituted, blocked, overflow)

MeasurementQuality( invalid: bool, not_topical: bool, substituted: bool, blocked: bool, overflow: bool)

Create new instance of MeasurementQuality(invalid, not_topical, substituted, blocked, overflow)

invalid: bool

Alias for field number 0

not_topical: bool

Alias for field number 1

substituted: bool

Alias for field number 2

blocked: bool

Alias for field number 3

overflow: bool

Alias for field number 4

class CounterQuality(typing.NamedTuple):
158class CounterQuality(typing.NamedTuple):
159    invalid: bool
160    adjusted: bool
161    overflow: bool
162    sequence: int
163    """sequence in range [0, 31]"""

CounterQuality(invalid, adjusted, overflow, sequence)

CounterQuality(invalid: bool, adjusted: bool, overflow: bool, sequence: int)

Create new instance of CounterQuality(invalid, adjusted, overflow, sequence)

invalid: bool

Alias for field number 0

adjusted: bool

Alias for field number 1

overflow: bool

Alias for field number 2

sequence: int

sequence in range [0, 31]

class ProtectionQuality(typing.NamedTuple):
166class ProtectionQuality(typing.NamedTuple):
167    invalid: bool
168    not_topical: bool
169    substituted: bool
170    blocked: bool
171    time_invalid: bool

ProtectionQuality(invalid, not_topical, substituted, blocked, time_invalid)

ProtectionQuality( invalid: bool, not_topical: bool, substituted: bool, blocked: bool, time_invalid: bool)

Create new instance of ProtectionQuality(invalid, not_topical, substituted, blocked, time_invalid)

invalid: bool

Alias for field number 0

not_topical: bool

Alias for field number 1

substituted: bool

Alias for field number 2

blocked: bool

Alias for field number 3

time_invalid: bool

Alias for field number 4

class FreezeCode(enum.Enum):
180class FreezeCode(enum.Enum):
181    READ = 0
182    FREEZE = 1
183    FREEZE_AND_RESET = 2
184    RESET = 3
READ = <FreezeCode.READ: 0>
FREEZE = <FreezeCode.FREEZE: 1>
FREEZE_AND_RESET = <FreezeCode.FREEZE_AND_RESET: 2>
RESET = <FreezeCode.RESET: 3>
class SingleValue(enum.Enum):
187class SingleValue(enum.Enum):
188    OFF = 0
189    ON = 1
OFF = <SingleValue.OFF: 0>
ON = <SingleValue.ON: 1>
class DoubleValue(enum.Enum):
192class DoubleValue(enum.Enum):
193    """DoubleDataValue
194
195    `FAULT` stands for value 3, defined in the protocol as *INDETERMINATE*.
196    This is in order to make it more distinguishable from ``INTERMEDIATE``.
197
198    """
199    INTERMEDIATE = 0
200    OFF = 1
201    ON = 2
202    FAULT = 3

DoubleDataValue

FAULT stands for value 3, defined in the protocol as INDETERMINATE. This is in order to make it more distinguishable from INTERMEDIATE.

INTERMEDIATE = <DoubleValue.INTERMEDIATE: 0>
OFF = <DoubleValue.OFF: 1>
ON = <DoubleValue.ON: 2>
FAULT = <DoubleValue.FAULT: 3>
class RegulatingValue(enum.Enum):
205class RegulatingValue(enum.Enum):
206    LOWER = 1
207    HIGHER = 2
LOWER = <RegulatingValue.LOWER: 1>
HIGHER = <RegulatingValue.HIGHER: 2>
class StepPositionValue(typing.NamedTuple):
210class StepPositionValue(typing.NamedTuple):
211    value: int
212    """value in range [-64, 63]"""
213    transient: bool

StepPositionValue(value, transient)

StepPositionValue(value: int, transient: bool)

Create new instance of StepPositionValue(value, transient)

value: int

value in range [-64, 63]

transient: bool

Alias for field number 1

class BitstringValue(typing.NamedTuple):
216class BitstringValue(typing.NamedTuple):
217    value: util.Bytes
218    """bitstring encoded as 4 bytes"""

BitstringValue(value,)

BitstringValue(value: bytes | bytearray | memoryview)

Create new instance of BitstringValue(value,)

value: bytes | bytearray | memoryview

bitstring encoded as 4 bytes

class NormalizedValue(typing.NamedTuple):
221class NormalizedValue(typing.NamedTuple):
222    value: float
223    """value in range [-1.0, 1.0)"""

NormalizedValue(value,)

NormalizedValue(value: float)

Create new instance of NormalizedValue(value,)

value: float

value in range [-1.0, 1.0)

class ScaledValue(typing.NamedTuple):
226class ScaledValue(typing.NamedTuple):
227    value: int
228    """value in range [-2^15, 2^15-1]"""

ScaledValue(value,)

ScaledValue(value: int)

Create new instance of ScaledValue(value,)

value: int

value in range [-2^15, 2^15-1]

class FloatingValue(typing.NamedTuple):
231class FloatingValue(typing.NamedTuple):
232    value: float

FloatingValue(value,)

FloatingValue(value: float)

Create new instance of FloatingValue(value,)

value: float

Alias for field number 0

class BinaryCounterValue(typing.NamedTuple):
235class BinaryCounterValue(typing.NamedTuple):
236    value: int
237    """value in range [-2^31, 2^31-1]"""

BinaryCounterValue(value,)

BinaryCounterValue(value: int)

Create new instance of BinaryCounterValue(value,)

value: int

value in range [-2^31, 2^31-1]

class ProtectionValue(enum.Enum):
240class ProtectionValue(enum.Enum):
241    OFF = 1
242    ON = 2
OFF = <ProtectionValue.OFF: 1>
ON = <ProtectionValue.ON: 2>
class ProtectionStartValue(typing.NamedTuple):
245class ProtectionStartValue(typing.NamedTuple):
246    general: bool
247    l1: bool
248    l2: bool
249    l3: bool
250    ie: bool
251    reverse: bool

ProtectionStartValue(general, l1, l2, l3, ie, reverse)

ProtectionStartValue(general: bool, l1: bool, l2: bool, l3: bool, ie: bool, reverse: bool)

Create new instance of ProtectionStartValue(general, l1, l2, l3, ie, reverse)

general: bool

Alias for field number 0

l1: bool

Alias for field number 1

l2: bool

Alias for field number 2

l3: bool

Alias for field number 3

ie: bool

Alias for field number 4

reverse: bool

Alias for field number 5

class ProtectionCommandValue(typing.NamedTuple):
254class ProtectionCommandValue(typing.NamedTuple):
255    general: bool
256    l1: bool
257    l2: bool
258    l3: bool

ProtectionCommandValue(general, l1, l2, l3)

ProtectionCommandValue(general: bool, l1: bool, l2: bool, l3: bool)

Create new instance of ProtectionCommandValue(general, l1, l2, l3)

general: bool

Alias for field number 0

l1: bool

Alias for field number 1

l2: bool

Alias for field number 2

l3: bool

Alias for field number 3

class StatusValue(typing.NamedTuple):
261class StatusValue(typing.NamedTuple):
262    value: list[bool]
263    """value length is 16"""
264    change: list[bool]
265    """change length is 16"""

StatusValue(value, change)

StatusValue(value: list[bool], change: list[bool])

Create new instance of StatusValue(value, change)

value: list[bool]

value length is 16

change: list[bool]

change length is 16

class IoElement_M_SP_NA(typing.NamedTuple):
268class IoElement_M_SP_NA(typing.NamedTuple):
269    value: SingleValue
270    quality: IndicationQuality

IoElement_M_SP_NA(value, quality)

IoElement_M_SP_NA( value: SingleValue, quality: IndicationQuality)

Create new instance of IoElement_M_SP_NA(value, quality)

value: SingleValue

Alias for field number 0

Alias for field number 1

class IoElement_M_SP_TA(typing.NamedTuple):
273class IoElement_M_SP_TA(typing.NamedTuple):
274    value: SingleValue
275    quality: IndicationQuality

IoElement_M_SP_TA(value, quality)

IoElement_M_SP_TA( value: SingleValue, quality: IndicationQuality)

Create new instance of IoElement_M_SP_TA(value, quality)

value: SingleValue

Alias for field number 0

Alias for field number 1

class IoElement_M_DP_NA(typing.NamedTuple):
278class IoElement_M_DP_NA(typing.NamedTuple):
279    value: DoubleValue
280    quality: IndicationQuality

IoElement_M_DP_NA(value, quality)

IoElement_M_DP_NA( value: DoubleValue, quality: IndicationQuality)

Create new instance of IoElement_M_DP_NA(value, quality)

value: DoubleValue

Alias for field number 0

Alias for field number 1

class IoElement_M_DP_TA(typing.NamedTuple):
283class IoElement_M_DP_TA(typing.NamedTuple):
284    value: DoubleValue
285    quality: IndicationQuality

IoElement_M_DP_TA(value, quality)

IoElement_M_DP_TA( value: DoubleValue, quality: IndicationQuality)

Create new instance of IoElement_M_DP_TA(value, quality)

value: DoubleValue

Alias for field number 0

Alias for field number 1

class IoElement_M_ST_NA(typing.NamedTuple):
288class IoElement_M_ST_NA(typing.NamedTuple):
289    value: StepPositionValue
290    quality: MeasurementQuality

IoElement_M_ST_NA(value, quality)

IoElement_M_ST_NA( value: StepPositionValue, quality: MeasurementQuality)

Create new instance of IoElement_M_ST_NA(value, quality)

Alias for field number 0

Alias for field number 1

class IoElement_M_ST_TA(typing.NamedTuple):
293class IoElement_M_ST_TA(typing.NamedTuple):
294    value: StepPositionValue
295    quality: MeasurementQuality

IoElement_M_ST_TA(value, quality)

IoElement_M_ST_TA( value: StepPositionValue, quality: MeasurementQuality)

Create new instance of IoElement_M_ST_TA(value, quality)

Alias for field number 0

Alias for field number 1

class IoElement_M_BO_NA(typing.NamedTuple):
298class IoElement_M_BO_NA(typing.NamedTuple):
299    value: BitstringValue
300    quality: MeasurementQuality

IoElement_M_BO_NA(value, quality)

IoElement_M_BO_NA( value: BitstringValue, quality: MeasurementQuality)

Create new instance of IoElement_M_BO_NA(value, quality)

Alias for field number 0

Alias for field number 1

class IoElement_M_BO_TA(typing.NamedTuple):
303class IoElement_M_BO_TA(typing.NamedTuple):
304    value: BitstringValue
305    quality: MeasurementQuality

IoElement_M_BO_TA(value, quality)

IoElement_M_BO_TA( value: BitstringValue, quality: MeasurementQuality)

Create new instance of IoElement_M_BO_TA(value, quality)

Alias for field number 0

Alias for field number 1

class IoElement_M_ME_NA(typing.NamedTuple):
308class IoElement_M_ME_NA(typing.NamedTuple):
309    value: NormalizedValue
310    quality: MeasurementQuality

IoElement_M_ME_NA(value, quality)

IoElement_M_ME_NA( value: NormalizedValue, quality: MeasurementQuality)

Create new instance of IoElement_M_ME_NA(value, quality)

Alias for field number 0

Alias for field number 1

class IoElement_M_ME_TA(typing.NamedTuple):
313class IoElement_M_ME_TA(typing.NamedTuple):
314    value: NormalizedValue
315    quality: MeasurementQuality

IoElement_M_ME_TA(value, quality)

IoElement_M_ME_TA( value: NormalizedValue, quality: MeasurementQuality)

Create new instance of IoElement_M_ME_TA(value, quality)

Alias for field number 0

Alias for field number 1

class IoElement_M_ME_NB(typing.NamedTuple):
318class IoElement_M_ME_NB(typing.NamedTuple):
319    value: ScaledValue
320    quality: MeasurementQuality

IoElement_M_ME_NB(value, quality)

IoElement_M_ME_NB( value: ScaledValue, quality: MeasurementQuality)

Create new instance of IoElement_M_ME_NB(value, quality)

value: ScaledValue

Alias for field number 0

Alias for field number 1

class IoElement_M_ME_TB(typing.NamedTuple):
323class IoElement_M_ME_TB(typing.NamedTuple):
324    value: ScaledValue
325    quality: MeasurementQuality

IoElement_M_ME_TB(value, quality)

IoElement_M_ME_TB( value: ScaledValue, quality: MeasurementQuality)

Create new instance of IoElement_M_ME_TB(value, quality)

value: ScaledValue

Alias for field number 0

Alias for field number 1

class IoElement_M_ME_NC(typing.NamedTuple):
328class IoElement_M_ME_NC(typing.NamedTuple):
329    value: FloatingValue
330    quality: MeasurementQuality

IoElement_M_ME_NC(value, quality)

IoElement_M_ME_NC( value: FloatingValue, quality: MeasurementQuality)

Create new instance of IoElement_M_ME_NC(value, quality)

value: FloatingValue

Alias for field number 0

Alias for field number 1

class IoElement_M_ME_TC(typing.NamedTuple):
333class IoElement_M_ME_TC(typing.NamedTuple):
334    value: FloatingValue
335    quality: MeasurementQuality

IoElement_M_ME_TC(value, quality)

IoElement_M_ME_TC( value: FloatingValue, quality: MeasurementQuality)

Create new instance of IoElement_M_ME_TC(value, quality)

value: FloatingValue

Alias for field number 0

Alias for field number 1

class IoElement_M_IT_NA(typing.NamedTuple):
338class IoElement_M_IT_NA(typing.NamedTuple):
339    value: BinaryCounterValue
340    quality: CounterQuality

IoElement_M_IT_NA(value, quality)

IoElement_M_IT_NA( value: BinaryCounterValue, quality: CounterQuality)

Create new instance of IoElement_M_IT_NA(value, quality)

Alias for field number 0

quality: CounterQuality

Alias for field number 1

class IoElement_M_IT_TA(typing.NamedTuple):
343class IoElement_M_IT_TA(typing.NamedTuple):
344    value: BinaryCounterValue
345    quality: CounterQuality

IoElement_M_IT_TA(value, quality)

IoElement_M_IT_TA( value: BinaryCounterValue, quality: CounterQuality)

Create new instance of IoElement_M_IT_TA(value, quality)

Alias for field number 0

quality: CounterQuality

Alias for field number 1

class IoElement_M_EP_TA(typing.NamedTuple):
348class IoElement_M_EP_TA(typing.NamedTuple):
349    value: ProtectionValue
350    quality: ProtectionQuality
351    elapsed_time: int
352    """elapsed_time in range [0, 65535]"""

IoElement_M_EP_TA(value, quality, elapsed_time)

IoElement_M_EP_TA( value: ProtectionValue, quality: ProtectionQuality, elapsed_time: int)

Create new instance of IoElement_M_EP_TA(value, quality, elapsed_time)

Alias for field number 0

Alias for field number 1

elapsed_time: int

elapsed_time in range [0, 65535]

class IoElement_M_EP_TB(typing.NamedTuple):
355class IoElement_M_EP_TB(typing.NamedTuple):
356    value: ProtectionStartValue
357    quality: ProtectionQuality
358    duration_time: int
359    """duration_time in range [0, 65535]"""

IoElement_M_EP_TB(value, quality, duration_time)

IoElement_M_EP_TB( value: ProtectionStartValue, quality: ProtectionQuality, duration_time: int)

Create new instance of IoElement_M_EP_TB(value, quality, duration_time)

Alias for field number 0

Alias for field number 1

duration_time: int

duration_time in range [0, 65535]

class IoElement_M_EP_TC(typing.NamedTuple):
362class IoElement_M_EP_TC(typing.NamedTuple):
363    value: ProtectionCommandValue
364    quality: ProtectionQuality
365    operating_time: int
366    """operating_time in range [0, 65535]"""

IoElement_M_EP_TC(value, quality, operating_time)

IoElement_M_EP_TC( value: ProtectionCommandValue, quality: ProtectionQuality, operating_time: int)

Create new instance of IoElement_M_EP_TC(value, quality, operating_time)

Alias for field number 0

Alias for field number 1

operating_time: int

operating_time in range [0, 65535]

class IoElement_M_PS_NA(typing.NamedTuple):
369class IoElement_M_PS_NA(typing.NamedTuple):
370    value: StatusValue
371    quality: MeasurementQuality

IoElement_M_PS_NA(value, quality)

IoElement_M_PS_NA( value: StatusValue, quality: MeasurementQuality)

Create new instance of IoElement_M_PS_NA(value, quality)

value: StatusValue

Alias for field number 0

Alias for field number 1

class IoElement_M_ME_ND(typing.NamedTuple):
374class IoElement_M_ME_ND(typing.NamedTuple):
375    value: NormalizedValue

IoElement_M_ME_ND(value,)

IoElement_M_ME_ND(value: NormalizedValue)

Create new instance of IoElement_M_ME_ND(value,)

Alias for field number 0

class IoElement_M_SP_TB(typing.NamedTuple):
378class IoElement_M_SP_TB(typing.NamedTuple):
379    value: SingleValue
380    quality: IndicationQuality

IoElement_M_SP_TB(value, quality)

IoElement_M_SP_TB( value: SingleValue, quality: IndicationQuality)

Create new instance of IoElement_M_SP_TB(value, quality)

value: SingleValue

Alias for field number 0

Alias for field number 1

class IoElement_M_DP_TB(typing.NamedTuple):
383class IoElement_M_DP_TB(typing.NamedTuple):
384    value: DoubleValue
385    quality: IndicationQuality

IoElement_M_DP_TB(value, quality)

IoElement_M_DP_TB( value: DoubleValue, quality: IndicationQuality)

Create new instance of IoElement_M_DP_TB(value, quality)

value: DoubleValue

Alias for field number 0

Alias for field number 1

class IoElement_M_ST_TB(typing.NamedTuple):
388class IoElement_M_ST_TB(typing.NamedTuple):
389    value: StepPositionValue
390    quality: MeasurementQuality

IoElement_M_ST_TB(value, quality)

IoElement_M_ST_TB( value: StepPositionValue, quality: MeasurementQuality)

Create new instance of IoElement_M_ST_TB(value, quality)

Alias for field number 0

Alias for field number 1

class IoElement_M_BO_TB(typing.NamedTuple):
393class IoElement_M_BO_TB(typing.NamedTuple):
394    value: BitstringValue
395    quality: MeasurementQuality

IoElement_M_BO_TB(value, quality)

IoElement_M_BO_TB( value: BitstringValue, quality: MeasurementQuality)

Create new instance of IoElement_M_BO_TB(value, quality)

Alias for field number 0

Alias for field number 1

class IoElement_M_ME_TD(typing.NamedTuple):
398class IoElement_M_ME_TD(typing.NamedTuple):
399    value: NormalizedValue
400    quality: MeasurementQuality

IoElement_M_ME_TD(value, quality)

IoElement_M_ME_TD( value: NormalizedValue, quality: MeasurementQuality)

Create new instance of IoElement_M_ME_TD(value, quality)

Alias for field number 0

Alias for field number 1

class IoElement_M_ME_TE(typing.NamedTuple):
403class IoElement_M_ME_TE(typing.NamedTuple):
404    value: ScaledValue
405    quality: MeasurementQuality

IoElement_M_ME_TE(value, quality)

IoElement_M_ME_TE( value: ScaledValue, quality: MeasurementQuality)

Create new instance of IoElement_M_ME_TE(value, quality)

value: ScaledValue

Alias for field number 0

Alias for field number 1

class IoElement_M_ME_TF(typing.NamedTuple):
408class IoElement_M_ME_TF(typing.NamedTuple):
409    value: FloatingValue
410    quality: MeasurementQuality

IoElement_M_ME_TF(value, quality)

IoElement_M_ME_TF( value: FloatingValue, quality: MeasurementQuality)

Create new instance of IoElement_M_ME_TF(value, quality)

value: FloatingValue

Alias for field number 0

Alias for field number 1

class IoElement_M_IT_TB(typing.NamedTuple):
413class IoElement_M_IT_TB(typing.NamedTuple):
414    value: BinaryCounterValue
415    quality: CounterQuality

IoElement_M_IT_TB(value, quality)

IoElement_M_IT_TB( value: BinaryCounterValue, quality: CounterQuality)

Create new instance of IoElement_M_IT_TB(value, quality)

Alias for field number 0

quality: CounterQuality

Alias for field number 1

class IoElement_M_EP_TD(typing.NamedTuple):
418class IoElement_M_EP_TD(typing.NamedTuple):
419    value: ProtectionValue
420    quality: ProtectionQuality
421    elapsed_time: int
422    """elapsed_time in range [0, 65535]"""

IoElement_M_EP_TD(value, quality, elapsed_time)

IoElement_M_EP_TD( value: ProtectionValue, quality: ProtectionQuality, elapsed_time: int)

Create new instance of IoElement_M_EP_TD(value, quality, elapsed_time)

Alias for field number 0

Alias for field number 1

elapsed_time: int

elapsed_time in range [0, 65535]

class IoElement_M_EP_TE(typing.NamedTuple):
425class IoElement_M_EP_TE(typing.NamedTuple):
426    value: ProtectionStartValue
427    quality: ProtectionQuality
428    duration_time: int
429    """duration_time in range [0, 65535]"""

IoElement_M_EP_TE(value, quality, duration_time)

IoElement_M_EP_TE( value: ProtectionStartValue, quality: ProtectionQuality, duration_time: int)

Create new instance of IoElement_M_EP_TE(value, quality, duration_time)

Alias for field number 0

Alias for field number 1

duration_time: int

duration_time in range [0, 65535]

class IoElement_M_EP_TF(typing.NamedTuple):
432class IoElement_M_EP_TF(typing.NamedTuple):
433    value: ProtectionCommandValue
434    quality: ProtectionQuality
435    operating_time: int
436    """operating_time in range [0, 65535]"""

IoElement_M_EP_TF(value, quality, operating_time)

IoElement_M_EP_TF( value: ProtectionCommandValue, quality: ProtectionQuality, operating_time: int)

Create new instance of IoElement_M_EP_TF(value, quality, operating_time)

Alias for field number 0

Alias for field number 1

operating_time: int

operating_time in range [0, 65535]

class IoElement_C_SC_NA(typing.NamedTuple):
439class IoElement_C_SC_NA(typing.NamedTuple):
440    value: SingleValue
441    select: bool
442    qualifier: int
443    """qualifier in range [0, 31]"""

IoElement_C_SC_NA(value, select, qualifier)

IoElement_C_SC_NA( value: SingleValue, select: bool, qualifier: int)

Create new instance of IoElement_C_SC_NA(value, select, qualifier)

value: SingleValue

Alias for field number 0

select: bool

Alias for field number 1

qualifier: int

qualifier in range [0, 31]

class IoElement_C_DC_NA(typing.NamedTuple):
446class IoElement_C_DC_NA(typing.NamedTuple):
447    value: DoubleValue
448    select: bool
449    qualifier: int
450    """qualifier in range [0, 31]"""

IoElement_C_DC_NA(value, select, qualifier)

IoElement_C_DC_NA( value: DoubleValue, select: bool, qualifier: int)

Create new instance of IoElement_C_DC_NA(value, select, qualifier)

value: DoubleValue

Alias for field number 0

select: bool

Alias for field number 1

qualifier: int

qualifier in range [0, 31]

class IoElement_C_RC_NA(typing.NamedTuple):
453class IoElement_C_RC_NA(typing.NamedTuple):
454    value: RegulatingValue
455    select: bool
456    qualifier: int
457    """qualifier in range [0, 31]"""

IoElement_C_RC_NA(value, select, qualifier)

IoElement_C_RC_NA( value: RegulatingValue, select: bool, qualifier: int)

Create new instance of IoElement_C_RC_NA(value, select, qualifier)

Alias for field number 0

select: bool

Alias for field number 1

qualifier: int

qualifier in range [0, 31]

class IoElement_C_SE_NA(typing.NamedTuple):
460class IoElement_C_SE_NA(typing.NamedTuple):
461    value: NormalizedValue
462    select: bool

IoElement_C_SE_NA(value, select)

IoElement_C_SE_NA( value: NormalizedValue, select: bool)

Create new instance of IoElement_C_SE_NA(value, select)

Alias for field number 0

select: bool

Alias for field number 1

class IoElement_C_SE_NB(typing.NamedTuple):
465class IoElement_C_SE_NB(typing.NamedTuple):
466    value: ScaledValue
467    select: bool

IoElement_C_SE_NB(value, select)

IoElement_C_SE_NB( value: ScaledValue, select: bool)

Create new instance of IoElement_C_SE_NB(value, select)

value: ScaledValue

Alias for field number 0

select: bool

Alias for field number 1

class IoElement_C_SE_NC(typing.NamedTuple):
470class IoElement_C_SE_NC(typing.NamedTuple):
471    value: FloatingValue
472    select: bool

IoElement_C_SE_NC(value, select)

IoElement_C_SE_NC( value: FloatingValue, select: bool)

Create new instance of IoElement_C_SE_NC(value, select)

value: FloatingValue

Alias for field number 0

select: bool

Alias for field number 1

class IoElement_C_BO_NA(typing.NamedTuple):
475class IoElement_C_BO_NA(typing.NamedTuple):
476    value: BitstringValue

IoElement_C_BO_NA(value,)

IoElement_C_BO_NA(value: BitstringValue)

Create new instance of IoElement_C_BO_NA(value,)

Alias for field number 0

class IoElement_M_EI_NA(typing.NamedTuple):
479class IoElement_M_EI_NA(typing.NamedTuple):
480    param_change: bool
481    cause: int
482    """cause in range [0, 127]"""

IoElement_M_EI_NA(param_change, cause)

IoElement_M_EI_NA(param_change: bool, cause: int)

Create new instance of IoElement_M_EI_NA(param_change, cause)

param_change: bool

Alias for field number 0

cause: int

cause in range [0, 127]

class IoElement_C_IC_NA(typing.NamedTuple):
485class IoElement_C_IC_NA(typing.NamedTuple):
486    qualifier: int
487    """qualifier in range [0, 255]"""

IoElement_C_IC_NA(qualifier,)

IoElement_C_IC_NA(qualifier: int)

Create new instance of IoElement_C_IC_NA(qualifier,)

qualifier: int

qualifier in range [0, 255]

class IoElement_C_CI_NA(typing.NamedTuple):
490class IoElement_C_CI_NA(typing.NamedTuple):
491    request: int
492    """request in range [0, 63]"""
493    freeze: FreezeCode

IoElement_C_CI_NA(request, freeze)

IoElement_C_CI_NA( request: int, freeze: FreezeCode)

Create new instance of IoElement_C_CI_NA(request, freeze)

request: int

request in range [0, 63]

freeze: FreezeCode

Alias for field number 1

class IoElement_C_RD_NA(typing.NamedTuple):
496class IoElement_C_RD_NA(typing.NamedTuple):
497    pass

IoElement_C_RD_NA()

IoElement_C_RD_NA()

Create new instance of IoElement_C_RD_NA()

class IoElement_C_CS_NA(typing.NamedTuple):
500class IoElement_C_CS_NA(typing.NamedTuple):
501    time: Time
502    """time size is SEVEN"""

IoElement_C_CS_NA(time,)

IoElement_C_CS_NA(time: Time)

Create new instance of IoElement_C_CS_NA(time,)

time: Time

time size is SEVEN

class IoElement_C_TS_NA(typing.NamedTuple):
505class IoElement_C_TS_NA(typing.NamedTuple):
506    pass

IoElement_C_TS_NA()

IoElement_C_TS_NA()

Create new instance of IoElement_C_TS_NA()

class IoElement_C_RP_NA(typing.NamedTuple):
509class IoElement_C_RP_NA(typing.NamedTuple):
510    qualifier: int
511    """qualifier in range [0, 255]"""

IoElement_C_RP_NA(qualifier,)

IoElement_C_RP_NA(qualifier: int)

Create new instance of IoElement_C_RP_NA(qualifier,)

qualifier: int

qualifier in range [0, 255]

class IoElement_C_CD_NA(typing.NamedTuple):
514class IoElement_C_CD_NA(typing.NamedTuple):
515    time: int
516    """time in range [0, 65535]"""

IoElement_C_CD_NA(time,)

IoElement_C_CD_NA(time: int)

Create new instance of IoElement_C_CD_NA(time,)

time: int

time in range [0, 65535]

class IoElement_P_ME_NA(typing.NamedTuple):
519class IoElement_P_ME_NA(typing.NamedTuple):
520    value: NormalizedValue
521    qualifier: int
522    """qualifier in range [0, 255]"""

IoElement_P_ME_NA(value, qualifier)

IoElement_P_ME_NA( value: NormalizedValue, qualifier: int)

Create new instance of IoElement_P_ME_NA(value, qualifier)

Alias for field number 0

qualifier: int

qualifier in range [0, 255]

class IoElement_P_ME_NB(typing.NamedTuple):
525class IoElement_P_ME_NB(typing.NamedTuple):
526    value: ScaledValue
527    qualifier: int
528    """qualifier in range [0, 255]"""

IoElement_P_ME_NB(value, qualifier)

IoElement_P_ME_NB( value: ScaledValue, qualifier: int)

Create new instance of IoElement_P_ME_NB(value, qualifier)

value: ScaledValue

Alias for field number 0

qualifier: int

qualifier in range [0, 255]

class IoElement_P_ME_NC(typing.NamedTuple):
531class IoElement_P_ME_NC(typing.NamedTuple):
532    value: FloatingValue
533    qualifier: int
534    """qualifier in range [0, 255]"""

IoElement_P_ME_NC(value, qualifier)

IoElement_P_ME_NC( value: FloatingValue, qualifier: int)

Create new instance of IoElement_P_ME_NC(value, qualifier)

value: FloatingValue

Alias for field number 0

qualifier: int

qualifier in range [0, 255]

class IoElement_P_AC_NA(typing.NamedTuple):
537class IoElement_P_AC_NA(typing.NamedTuple):
538    qualifier: int
539    """qualifier in range [0, 255]"""

IoElement_P_AC_NA(qualifier,)

IoElement_P_AC_NA(qualifier: int)

Create new instance of IoElement_P_AC_NA(qualifier,)

qualifier: int

qualifier in range [0, 255]

class IoElement_F_FR_NA(typing.NamedTuple):
542class IoElement_F_FR_NA(typing.NamedTuple):
543    file_name: int
544    """file_name in range [0, 65535]"""
545    file_length: int
546    """file_length in range [0, 16777215]"""
547    ready: bool

IoElement_F_FR_NA(file_name, file_length, ready)

IoElement_F_FR_NA(file_name: int, file_length: int, ready: bool)

Create new instance of IoElement_F_FR_NA(file_name, file_length, ready)

file_name: int

file_name in range [0, 65535]

file_length: int

file_length in range [0, 16777215]

ready: bool

Alias for field number 2

class IoElement_F_SR_NA(typing.NamedTuple):
550class IoElement_F_SR_NA(typing.NamedTuple):
551    file_name: int
552    """file_name in range [0, 65535]"""
553    section_name: int
554    """section_name in range [0, 255]"""
555    section_length: int
556    """section_length in range [0, 16777215]"""
557    ready: bool

IoElement_F_SR_NA(file_name, section_name, section_length, ready)

IoElement_F_SR_NA(file_name: int, section_name: int, section_length: int, ready: bool)

Create new instance of IoElement_F_SR_NA(file_name, section_name, section_length, ready)

file_name: int

file_name in range [0, 65535]

section_name: int

section_name in range [0, 255]

section_length: int

section_length in range [0, 16777215]

ready: bool

Alias for field number 3

class IoElement_F_SC_NA(typing.NamedTuple):
560class IoElement_F_SC_NA(typing.NamedTuple):
561    file_name: int
562    """file_name in range [0, 65535]"""
563    section_name: int
564    """section_name in range [0, 255]"""
565    qualifier: int
566    """qualifier in range [0, 255]"""

IoElement_F_SC_NA(file_name, section_name, qualifier)

IoElement_F_SC_NA(file_name: int, section_name: int, qualifier: int)

Create new instance of IoElement_F_SC_NA(file_name, section_name, qualifier)

file_name: int

file_name in range [0, 65535]

section_name: int

section_name in range [0, 255]

qualifier: int

qualifier in range [0, 255]

class IoElement_F_LS_NA(typing.NamedTuple):
569class IoElement_F_LS_NA(typing.NamedTuple):
570    file_name: int
571    """file_name in range [0, 65535]"""
572    section_name: int
573    """section_name in range [0, 255]"""
574    last_qualifier: int
575    """last_qualifier in range [0, 255]"""
576    checksum: int
577    """checksum in range [0, 255]"""

IoElement_F_LS_NA(file_name, section_name, last_qualifier, checksum)

IoElement_F_LS_NA( file_name: int, section_name: int, last_qualifier: int, checksum: int)

Create new instance of IoElement_F_LS_NA(file_name, section_name, last_qualifier, checksum)

file_name: int

file_name in range [0, 65535]

section_name: int

section_name in range [0, 255]

last_qualifier: int

last_qualifier in range [0, 255]

checksum: int

checksum in range [0, 255]

class IoElement_F_AF_NA(typing.NamedTuple):
580class IoElement_F_AF_NA(typing.NamedTuple):
581    file_name: int
582    """file_name in range [0, 65535]"""
583    section_name: int
584    """section_name in range [0, 255]"""
585    qualifier: int
586    """qualifier in range [0, 255]"""

IoElement_F_AF_NA(file_name, section_name, qualifier)

IoElement_F_AF_NA(file_name: int, section_name: int, qualifier: int)

Create new instance of IoElement_F_AF_NA(file_name, section_name, qualifier)

file_name: int

file_name in range [0, 65535]

section_name: int

section_name in range [0, 255]

qualifier: int

qualifier in range [0, 255]

class IoElement_F_SG_NA(typing.NamedTuple):
589class IoElement_F_SG_NA(typing.NamedTuple):
590    file_name: int
591    """file_name in range [0, 65535]"""
592    section_name: int
593    """section_name in range [0, 255]"""
594    segment: util.Bytes

IoElement_F_SG_NA(file_name, section_name, segment)

IoElement_F_SG_NA( file_name: int, section_name: int, segment: bytes | bytearray | memoryview)

Create new instance of IoElement_F_SG_NA(file_name, section_name, segment)

file_name: int

file_name in range [0, 65535]

section_name: int

section_name in range [0, 255]

segment: bytes | bytearray | memoryview

Alias for field number 2

class IoElement_F_DR_TA(typing.NamedTuple):
597class IoElement_F_DR_TA(typing.NamedTuple):
598    file_name: int
599    """file_name in range [0, 65535]"""
600    file_length: int
601    """file_length in range [0, 16777215]"""
602    more_follows: bool
603    is_directory: bool
604    transfer_active: bool
605    creation_time: Time

IoElement_F_DR_TA(file_name, file_length, more_follows, is_directory, transfer_active, creation_time)

IoElement_F_DR_TA( file_name: int, file_length: int, more_follows: bool, is_directory: bool, transfer_active: bool, creation_time: Time)

Create new instance of IoElement_F_DR_TA(file_name, file_length, more_follows, is_directory, transfer_active, creation_time)

file_name: int

file_name in range [0, 65535]

file_length: int

file_length in range [0, 16777215]

more_follows: bool

Alias for field number 2

is_directory: bool

Alias for field number 3

transfer_active: bool

Alias for field number 4

creation_time: Time

Alias for field number 5

class IO(typing.NamedTuple):
668class IO(typing.NamedTuple):
669    address: IoAddress
670    elements: list[IoElement]
671    time: Time | None

IO(address, elements, time)

address: int

Alias for field number 0

time: Time | None

Alias for field number 2

class ASDU(typing.NamedTuple):
674class ASDU(typing.NamedTuple):
675    type: AsduType
676    cause: Cause
677    address: AsduAddress
678    ios: list[IO]

ASDU(type, cause, address, ios)

ASDU( type: AsduType, cause: Cause, address: int, ios: list[IO])

Create new instance of ASDU(type, cause, address, ios)

type: AsduType

Alias for field number 0

cause: Cause

Alias for field number 1

address: int

Alias for field number 2

ios: list[IO]

Alias for field number 3

class Encoder:
12class Encoder:
13
14    def __init__(self,
15                 cause_size: common.CauseSize,
16                 asdu_address_size: common.AsduAddressSize,
17                 io_address_size: common.IoAddressSize,
18                 max_asdu_size: int = 252):
19        self._cause_size = cause_size
20        self._max_asdu_size = max_asdu_size
21        self._encoder = encoder.Encoder(
22            cause_size=cause_size,
23            asdu_address_size=asdu_address_size,
24            io_address_size=io_address_size,
25            asdu_type_time_sizes=asdu_type_time_sizes,
26            inverted_sequence_bit=False,
27            decode_io_element_cb=decode_io_element,
28            encode_io_element_cb=encode_io_element)
29
30    @property
31    def max_asdu_size(self) -> int:
32        return self._max_asdu_size
33
34    @property
35    def cause_size(self) -> common.CauseSize:
36        return self._encoder.cause_size
37
38    @property
39    def asdu_address_size(self) -> common.AsduAddressSize:
40        return self._encoder.asdu_address_size
41
42    @property
43    def io_address_size(self) -> common.IoAddressSize:
44        return self._encoder.io_address_size
45
46    def decode_asdu(self,
47                    asdu_bytes: util.Bytes
48                    ) -> tuple[common.ASDU, util.Bytes]:
49        asdu, rest = self._encoder.decode_asdu(asdu_bytes)
50
51        asdu_type = _decode_asdu_type(asdu.type)
52
53        cause = decode_cause(asdu.cause, self._cause_size)
54        address = asdu.address
55        ios = [common.IO(address=io.address,
56                         elements=io.elements,
57                         time=io.time)
58               for io in asdu.ios]
59
60        asdu = common.ASDU(type=asdu_type,
61                           cause=cause,
62                           address=address,
63                           ios=ios)
64        return asdu, rest
65
66    def encode_asdu(self, asdu: common.ASDU) -> util.Bytes:
67        asdu_type = asdu.type.value
68        cause = encode_cause(asdu.cause, self._cause_size)
69        address = asdu.address
70        ios = [encoder.common.IO(address=io.address,
71                                 elements=io.elements,
72                                 time=io.time)
73               for io in asdu.ios]
74
75        asdu = encoder.common.ASDU(type=asdu_type,
76                                   cause=cause,
77                                   address=address,
78                                   ios=ios)
79
80        return self._encoder.encode_asdu(asdu)
Encoder( cause_size: CauseSize, asdu_address_size: AsduAddressSize, io_address_size: IoAddressSize, max_asdu_size: int = 252)
14    def __init__(self,
15                 cause_size: common.CauseSize,
16                 asdu_address_size: common.AsduAddressSize,
17                 io_address_size: common.IoAddressSize,
18                 max_asdu_size: int = 252):
19        self._cause_size = cause_size
20        self._max_asdu_size = max_asdu_size
21        self._encoder = encoder.Encoder(
22            cause_size=cause_size,
23            asdu_address_size=asdu_address_size,
24            io_address_size=io_address_size,
25            asdu_type_time_sizes=asdu_type_time_sizes,
26            inverted_sequence_bit=False,
27            decode_io_element_cb=decode_io_element,
28            encode_io_element_cb=encode_io_element)
max_asdu_size: int
30    @property
31    def max_asdu_size(self) -> int:
32        return self._max_asdu_size
cause_size: CauseSize
34    @property
35    def cause_size(self) -> common.CauseSize:
36        return self._encoder.cause_size
asdu_address_size: AsduAddressSize
38    @property
39    def asdu_address_size(self) -> common.AsduAddressSize:
40        return self._encoder.asdu_address_size
io_address_size: IoAddressSize
42    @property
43    def io_address_size(self) -> common.IoAddressSize:
44        return self._encoder.io_address_size
def decode_asdu( self, asdu_bytes: bytes | bytearray | memoryview) -> tuple[ASDU, bytes | bytearray | memoryview]:
46    def decode_asdu(self,
47                    asdu_bytes: util.Bytes
48                    ) -> tuple[common.ASDU, util.Bytes]:
49        asdu, rest = self._encoder.decode_asdu(asdu_bytes)
50
51        asdu_type = _decode_asdu_type(asdu.type)
52
53        cause = decode_cause(asdu.cause, self._cause_size)
54        address = asdu.address
55        ios = [common.IO(address=io.address,
56                         elements=io.elements,
57                         time=io.time)
58               for io in asdu.ios]
59
60        asdu = common.ASDU(type=asdu_type,
61                           cause=cause,
62                           address=address,
63                           ios=ios)
64        return asdu, rest
def encode_asdu( self, asdu: ASDU) -> bytes | bytearray | memoryview:
66    def encode_asdu(self, asdu: common.ASDU) -> util.Bytes:
67        asdu_type = asdu.type.value
68        cause = encode_cause(asdu.cause, self._cause_size)
69        address = asdu.address
70        ios = [encoder.common.IO(address=io.address,
71                                 elements=io.elements,
72                                 time=io.time)
73               for io in asdu.ios]
74
75        asdu = encoder.common.ASDU(type=asdu_type,
76                                   cause=cause,
77                                   address=address,
78                                   ios=ios)
79
80        return self._encoder.encode_asdu(asdu)
asdu_type_time_sizes = {2: <TimeSize.THREE: 3>, 4: <TimeSize.THREE: 3>, 6: <TimeSize.THREE: 3>, 8: <TimeSize.THREE: 3>, 10: <TimeSize.THREE: 3>, 12: <TimeSize.THREE: 3>, 14: <TimeSize.THREE: 3>, 16: <TimeSize.THREE: 3>, 17: <TimeSize.THREE: 3>, 18: <TimeSize.THREE: 3>, 19: <TimeSize.THREE: 3>, 30: <TimeSize.SEVEN: 7>, 31: <TimeSize.SEVEN: 7>, 32: <TimeSize.SEVEN: 7>, 33: <TimeSize.SEVEN: 7>, 34: <TimeSize.SEVEN: 7>, 35: <TimeSize.SEVEN: 7>, 36: <TimeSize.SEVEN: 7>, 37: <TimeSize.SEVEN: 7>, 38: <TimeSize.SEVEN: 7>, 39: <TimeSize.SEVEN: 7>, 40: <TimeSize.SEVEN: 7>}
def decode_cause( cause: int, cause_size: CauseSize) -> Cause:
107def decode_cause(cause: int,
108                 cause_size: common.CauseSize
109                 ) -> common.Cause:
110    cause_type = decode_cause_type(cause & 0x3F)
111    is_negative_confirm = bool(cause & 0x40)
112    is_test = bool(cause & 0x80)
113
114    if cause_size == common.CauseSize.ONE:
115        originator_address = 0
116
117    elif cause_size == common.CauseSize.TWO:
118        originator_address = cause >> 8
119
120    else:
121        raise ValueError('unsupported cause size')
122
123    return common.Cause(type=cause_type,
124                        is_negative_confirm=is_negative_confirm,
125                        is_test=is_test,
126                        originator_address=originator_address)
def encode_cause( cause: Cause, cause_size: CauseSize) -> int:
129def encode_cause(cause: common.Cause,
130                 cause_size: common.CauseSize
131                 ) -> int:
132    result = ((0x80 if cause.is_test else 0) |
133              (0x40 if cause.is_negative_confirm else 0) |
134              encode_cause_type(cause.type))
135
136    if cause_size == common.CauseSize.ONE:
137        return result
138
139    if cause_size == common.CauseSize.TWO:
140        return result | (cause.originator_address << 8)
141
142    raise ValueError('unsupported cause size')
def decode_cause_type( value: int) -> CauseType | int:
145def decode_cause_type(value: int
146                      ) -> common.CauseType | common.OtherCauseType:
147    with contextlib.suppress(ValueError):
148        return common.CauseType(value)
149    return value
def encode_cause_type( cause_type: CauseType | int) -> int:
152def encode_cause_type(cause_type: common.CauseType | common.OtherCauseType
153                      ) -> int:
154    return (cause_type.value if isinstance(cause_type, common.CauseType)
155            else cause_type)
158def decode_io_element(io_bytes: util.Bytes,
159                      asdu_type: int
160                      ) -> tuple[common.IoElement, util.Bytes]:
161    asdu_type = _decode_asdu_type(asdu_type)
162
163    if asdu_type == common.AsduType.M_SP_NA:
164        value = common.SingleValue(io_bytes[0] & 1)
165        quality, io_bytes = decode_quality(io_bytes,
166                                           common.QualityType.INDICATION)
167
168        element = common.IoElement_M_SP_NA(value=value,
169                                           quality=quality)
170        return element, io_bytes
171
172    if asdu_type == common.AsduType.M_SP_TA:
173        value = common.SingleValue(io_bytes[0] & 1)
174        quality, io_bytes = decode_quality(io_bytes,
175                                           common.QualityType.INDICATION)
176
177        element = common.IoElement_M_SP_TA(value=value,
178                                           quality=quality)
179        return element, io_bytes
180
181    if asdu_type == common.AsduType.M_DP_NA:
182        value = common.DoubleValue(io_bytes[0] & 3)
183        quality, io_bytes = decode_quality(io_bytes,
184                                           common.QualityType.INDICATION)
185
186        element = common.IoElement_M_DP_NA(value=value,
187                                           quality=quality)
188        return element, io_bytes
189
190    if asdu_type == common.AsduType.M_DP_TA:
191        value = common.DoubleValue(io_bytes[0] & 3)
192        quality, io_bytes = decode_quality(io_bytes,
193                                           common.QualityType.INDICATION)
194
195        element = common.IoElement_M_DP_TA(value=value,
196                                           quality=quality)
197        return element, io_bytes
198
199    if asdu_type == common.AsduType.M_ST_NA:
200        value, io_bytes = decode_step_position_value(io_bytes)
201        quality, io_bytes = decode_quality(io_bytes,
202                                           common.QualityType.MEASUREMENT)
203
204        element = common.IoElement_M_ST_NA(value=value,
205                                           quality=quality)
206        return element, io_bytes
207
208    if asdu_type == common.AsduType.M_ST_TA:
209        value, io_bytes = decode_step_position_value(io_bytes)
210        quality, io_bytes = decode_quality(io_bytes,
211                                           common.QualityType.MEASUREMENT)
212
213        element = common.IoElement_M_ST_TA(value=value,
214                                           quality=quality)
215        return element, io_bytes
216
217    if asdu_type == common.AsduType.M_BO_NA:
218        value, io_bytes = decode_bitstring_value(io_bytes)
219        quality, io_bytes = decode_quality(io_bytes,
220                                           common.QualityType.MEASUREMENT)
221
222        element = common.IoElement_M_BO_NA(value=value,
223                                           quality=quality)
224        return element, io_bytes
225
226    if asdu_type == common.AsduType.M_BO_TA:
227        value, io_bytes = decode_bitstring_value(io_bytes)
228        quality, io_bytes = decode_quality(io_bytes,
229                                           common.QualityType.MEASUREMENT)
230
231        element = common.IoElement_M_BO_NA(value=value,
232                                           quality=quality)
233        return element, io_bytes
234
235    if asdu_type == common.AsduType.M_ME_NA:
236        value, io_bytes = decode_normalized_value(io_bytes)
237        quality, io_bytes = decode_quality(io_bytes,
238                                           common.QualityType.MEASUREMENT)
239
240        element = common.IoElement_M_ME_NA(value=value,
241                                           quality=quality)
242        return element, io_bytes
243
244    if asdu_type == common.AsduType.M_ME_TA:
245        value, io_bytes = decode_normalized_value(io_bytes)
246        quality, io_bytes = decode_quality(io_bytes,
247                                           common.QualityType.MEASUREMENT)
248
249        element = common.IoElement_M_ME_TA(value=value,
250                                           quality=quality)
251        return element, io_bytes
252
253    if asdu_type == common.AsduType.M_ME_NB:
254        value, io_bytes = decode_scaled_value(io_bytes)
255        quality, io_bytes = decode_quality(io_bytes,
256                                           common.QualityType.MEASUREMENT)
257
258        element = common.IoElement_M_ME_NB(value=value,
259                                           quality=quality)
260        return element, io_bytes
261
262    if asdu_type == common.AsduType.M_ME_TB:
263        value, io_bytes = decode_scaled_value(io_bytes)
264        quality, io_bytes = decode_quality(io_bytes,
265                                           common.QualityType.MEASUREMENT)
266
267        element = common.IoElement_M_ME_TB(value=value,
268                                           quality=quality)
269        return element, io_bytes
270
271    if asdu_type == common.AsduType.M_ME_NC:
272        value, io_bytes = decode_floating_value(io_bytes)
273        quality, io_bytes = decode_quality(io_bytes,
274                                           common.QualityType.MEASUREMENT)
275
276        element = common.IoElement_M_ME_NC(value=value,
277                                           quality=quality)
278        return element, io_bytes
279
280    if asdu_type == common.AsduType.M_ME_TC:
281        value, io_bytes = decode_floating_value(io_bytes)
282        quality, io_bytes = decode_quality(io_bytes,
283                                           common.QualityType.MEASUREMENT)
284
285        element = common.IoElement_M_ME_TC(value=value,
286                                           quality=quality)
287        return element, io_bytes
288
289    if asdu_type == common.AsduType.M_IT_NA:
290        value, io_bytes = decode_binary_counter_value(io_bytes)
291        quality, io_bytes = decode_quality(io_bytes,
292                                           common.QualityType.COUNTER)
293
294        element = common.IoElement_M_IT_NA(value=value,
295                                           quality=quality)
296        return element, io_bytes
297
298    if asdu_type == common.AsduType.M_IT_TA:
299        value, io_bytes = decode_binary_counter_value(io_bytes)
300        quality, io_bytes = decode_quality(io_bytes,
301                                           common.QualityType.COUNTER)
302
303        element = common.IoElement_M_IT_TA(value=value,
304                                           quality=quality)
305        return element, io_bytes
306
307    if asdu_type == common.AsduType.M_EP_TA:
308        value = common.ProtectionValue(io_bytes[0] & 0x03)
309        quality, io_bytes = decode_quality(io_bytes,
310                                           common.QualityType.PROTECTION)
311        elapsed_time = int.from_bytes(io_bytes[:2], 'little')
312        io_bytes = io_bytes[2:]
313
314        element = common.IoElement_M_EP_TA(value=value,
315                                           quality=quality,
316                                           elapsed_time=elapsed_time)
317        return element, io_bytes
318
319    if asdu_type == common.AsduType.M_EP_TB:
320        value, io_bytes = decode_protection_start_value(io_bytes)
321        quality, io_bytes = decode_quality(io_bytes,
322                                           common.QualityType.PROTECTION)
323        duration_time = int.from_bytes(io_bytes[:2], 'little')
324        io_bytes = io_bytes[2:]
325
326        element = common.IoElement_M_EP_TB(value=value,
327                                           quality=quality,
328                                           duration_time=duration_time)
329        return element, io_bytes
330
331    if asdu_type == common.AsduType.M_EP_TC:
332        value, io_bytes = decode_protection_command_value(io_bytes)
333        quality, io_bytes = decode_quality(io_bytes,
334                                           common.QualityType.PROTECTION)
335        operating_time = int.from_bytes(io_bytes[:2], 'little')
336        io_bytes = io_bytes[2:]
337
338        element = common.IoElement_M_EP_TC(value=value,
339                                           quality=quality,
340                                           operating_time=operating_time)
341        return element, io_bytes
342
343    if asdu_type == common.AsduType.M_PS_NA:
344        value, io_bytes = decode_status_value(io_bytes)
345        quality, io_bytes = decode_quality(io_bytes,
346                                           common.QualityType.MEASUREMENT)
347
348        element = common.IoElement_M_PS_NA(value=value,
349                                           quality=quality)
350        return element, io_bytes
351
352    if asdu_type == common.AsduType.M_ME_ND:
353        value, io_bytes = decode_normalized_value(io_bytes)
354
355        element = common.IoElement_M_ME_ND(value=value)
356        return element, io_bytes
357
358    if asdu_type == common.AsduType.M_SP_TB:
359        value = common.SingleValue(io_bytes[0] & 1)
360        quality, io_bytes = decode_quality(io_bytes,
361                                           common.QualityType.INDICATION)
362
363        element = common.IoElement_M_SP_TB(value=value,
364                                           quality=quality)
365        return element, io_bytes
366
367    if asdu_type == common.AsduType.M_DP_TB:
368        value = common.DoubleValue(io_bytes[0] & 3)
369        quality, io_bytes = decode_quality(io_bytes,
370                                           common.QualityType.INDICATION)
371
372        element = common.IoElement_M_DP_TB(value=value,
373                                           quality=quality)
374        return element, io_bytes
375
376    if asdu_type == common.AsduType.M_ST_TB:
377        value, io_bytes = decode_step_position_value(io_bytes)
378        quality, io_bytes = decode_quality(io_bytes,
379                                           common.QualityType.MEASUREMENT)
380
381        element = common.IoElement_M_ST_TB(value=value,
382                                           quality=quality)
383        return element, io_bytes
384
385    if asdu_type == common.AsduType.M_BO_TB:
386        value, io_bytes = decode_bitstring_value(io_bytes)
387        quality, io_bytes = decode_quality(io_bytes,
388                                           common.QualityType.MEASUREMENT)
389
390        element = common.IoElement_M_BO_TB(value=value,
391                                           quality=quality)
392        return element, io_bytes
393
394    if asdu_type == common.AsduType.M_ME_TD:
395        value, io_bytes = decode_normalized_value(io_bytes)
396        quality, io_bytes = decode_quality(io_bytes,
397                                           common.QualityType.MEASUREMENT)
398
399        element = common.IoElement_M_ME_TD(value=value,
400                                           quality=quality)
401        return element, io_bytes
402
403    if asdu_type == common.AsduType.M_ME_TE:
404        value, io_bytes = decode_scaled_value(io_bytes)
405        quality, io_bytes = decode_quality(io_bytes,
406                                           common.QualityType.MEASUREMENT)
407
408        element = common.IoElement_M_ME_TE(value=value,
409                                           quality=quality)
410        return element, io_bytes
411
412    if asdu_type == common.AsduType.M_ME_TF:
413        value, io_bytes = decode_floating_value(io_bytes)
414        quality, io_bytes = decode_quality(io_bytes,
415                                           common.QualityType.MEASUREMENT)
416
417        element = common.IoElement_M_ME_TF(value=value,
418                                           quality=quality)
419        return element, io_bytes
420
421    if asdu_type == common.AsduType.M_IT_TB:
422        value, io_bytes = decode_binary_counter_value(io_bytes)
423        quality, io_bytes = decode_quality(io_bytes,
424                                           common.QualityType.COUNTER)
425
426        element = common.IoElement_M_IT_TB(value=value,
427                                           quality=quality)
428        return element, io_bytes
429
430    if asdu_type == common.AsduType.M_EP_TD:
431        value = common.ProtectionValue(io_bytes[0] & 0x03)
432        quality, io_bytes = decode_quality(io_bytes,
433                                           common.QualityType.PROTECTION)
434        elapsed_time = int.from_bytes(io_bytes[:2], 'little')
435        io_bytes = io_bytes[2:]
436
437        element = common.IoElement_M_EP_TD(value=value,
438                                           quality=quality,
439                                           elapsed_time=elapsed_time)
440        return element, io_bytes
441
442    if asdu_type == common.AsduType.M_EP_TE:
443        value, io_bytes = decode_protection_start_value(io_bytes)
444        quality, io_bytes = decode_quality(io_bytes,
445                                           common.QualityType.PROTECTION)
446        duration_time = int.from_bytes(io_bytes[:2], 'little')
447        io_bytes = io_bytes[2:]
448
449        element = common.IoElement_M_EP_TE(value=value,
450                                           quality=quality,
451                                           duration_time=duration_time)
452        return element, io_bytes
453
454    if asdu_type == common.AsduType.M_EP_TF:
455        value, io_bytes = decode_protection_command_value(io_bytes)
456        quality, io_bytes = decode_quality(io_bytes,
457                                           common.QualityType.PROTECTION)
458        operating_time = int.from_bytes(io_bytes[:2], 'little')
459        io_bytes = io_bytes[2:]
460
461        element = common.IoElement_M_EP_TF(value=value,
462                                           quality=quality,
463                                           operating_time=operating_time)
464        return element, io_bytes
465
466    if asdu_type == common.AsduType.C_SC_NA:
467        value = common.SingleValue(io_bytes[0] & 1)
468        select = bool(io_bytes[0] & 0x80)
469        qualifier = (io_bytes[0] >> 2) & 0x1F
470        io_bytes = io_bytes[1:]
471
472        element = common.IoElement_C_SC_NA(value=value,
473                                           select=select,
474                                           qualifier=qualifier)
475        return element, io_bytes
476
477    if asdu_type == common.AsduType.C_DC_NA:
478        value = common.DoubleValue(io_bytes[0] & 3)
479        select = bool(io_bytes[0] & 0x80)
480        qualifier = (io_bytes[0] >> 2) & 0x1F
481        io_bytes = io_bytes[1:]
482
483        element = common.IoElement_C_DC_NA(value=value,
484                                           select=select,
485                                           qualifier=qualifier)
486        return element, io_bytes
487
488    if asdu_type == common.AsduType.C_RC_NA:
489        value = common.RegulatingValue(io_bytes[0] & 3)
490        select = bool(io_bytes[0] & 0x80)
491        qualifier = (io_bytes[0] >> 2) & 0x1F
492        io_bytes = io_bytes[1:]
493
494        element = common.IoElement_C_RC_NA(value=value,
495                                           select=select,
496                                           qualifier=qualifier)
497        return element, io_bytes
498
499    if asdu_type == common.AsduType.C_SE_NA:
500        value, io_bytes = decode_normalized_value(io_bytes)
501        select = bool(io_bytes[0] & 0x80)
502        io_bytes = io_bytes[1:]
503
504        element = common.IoElement_C_SE_NA(value=value,
505                                           select=select)
506        return element, io_bytes
507
508    if asdu_type == common.AsduType.C_SE_NB:
509        value, io_bytes = decode_scaled_value(io_bytes)
510        select = bool(io_bytes[0] & 0x80)
511        io_bytes = io_bytes[1:]
512
513        element = common.IoElement_C_SE_NB(value=value,
514                                           select=select)
515        return element, io_bytes
516
517    if asdu_type == common.AsduType.C_SE_NC:
518        value, io_bytes = decode_floating_value(io_bytes)
519        select = bool(io_bytes[0] & 0x80)
520        io_bytes = io_bytes[1:]
521
522        element = common.IoElement_C_SE_NC(value=value,
523                                           select=select)
524        return element, io_bytes
525
526    if asdu_type == common.AsduType.C_BO_NA:
527        value, io_bytes = decode_bitstring_value(io_bytes)
528
529        element = common.IoElement_C_BO_NA(value=value)
530        return element, io_bytes
531
532    if asdu_type == common.AsduType.M_EI_NA:
533        param_change = bool(io_bytes[0] & 0x80)
534        cause = io_bytes[0] & 0x7F
535        io_bytes = io_bytes[1:]
536
537        element = common.IoElement_M_EI_NA(param_change=param_change,
538                                           cause=cause)
539        return element, io_bytes
540
541    if asdu_type == common.AsduType.C_IC_NA:
542        qualifier, io_bytes = io_bytes[0], io_bytes[1:]
543
544        element = common.IoElement_C_IC_NA(qualifier=qualifier)
545        return element, io_bytes
546
547    if asdu_type == common.AsduType.C_CI_NA:
548        request = io_bytes[0] & 0x3F
549        freeze = common.FreezeCode(io_bytes[0] >> 6)
550        io_bytes = io_bytes[1:]
551
552        element = common.IoElement_C_CI_NA(request=request,
553                                           freeze=freeze)
554        return element, io_bytes
555
556    if asdu_type == common.AsduType.C_RD_NA:
557        element = common.IoElement_C_RD_NA()
558        return element, io_bytes
559
560    if asdu_type == common.AsduType.C_CS_NA:
561        time = encoder.decode_time(io_bytes[:7], common.TimeSize.SEVEN)
562        io_bytes = io_bytes[7:]
563
564        element = common.IoElement_C_CS_NA(time=time)
565        return element, io_bytes
566
567    if asdu_type == common.AsduType.C_TS_NA:
568        io_bytes = io_bytes[2:]
569
570        element = common.IoElement_C_TS_NA()
571        return element, io_bytes
572
573    if asdu_type == common.AsduType.C_RP_NA:
574        qualifier, io_bytes = io_bytes[0], io_bytes[1:]
575
576        element = common.IoElement_C_RP_NA(qualifier=qualifier)
577        return element, io_bytes
578
579    if asdu_type == common.AsduType.C_CD_NA:
580        time = int.from_bytes(io_bytes[:2], 'little')
581        io_bytes = io_bytes[2:]
582
583        element = common.IoElement_C_CD_NA(time=time)
584        return element, io_bytes
585
586    if asdu_type == common.AsduType.P_ME_NA:
587        value, io_bytes = decode_normalized_value(io_bytes)
588        qualifier, io_bytes = io_bytes[0], io_bytes[1:]
589
590        element = common.IoElement_P_ME_NA(value=value,
591                                           qualifier=qualifier)
592        return element, io_bytes
593
594    if asdu_type == common.AsduType.P_ME_NB:
595        value, io_bytes = decode_scaled_value(io_bytes)
596        qualifier, io_bytes = io_bytes[0], io_bytes[1:]
597
598        element = common.IoElement_P_ME_NB(value=value,
599                                           qualifier=qualifier)
600        return element, io_bytes
601
602    if asdu_type == common.AsduType.P_ME_NC:
603        value, io_bytes = decode_floating_value(io_bytes)
604        qualifier, io_bytes = io_bytes[0], io_bytes[1:]
605
606        element = common.IoElement_P_ME_NC(value=value,
607                                           qualifier=qualifier)
608        return element, io_bytes
609
610    if asdu_type == common.AsduType.P_AC_NA:
611        qualifier, io_bytes = io_bytes[0], io_bytes[1:]
612
613        element = common.IoElement_P_AC_NA(qualifier=qualifier)
614        return element, io_bytes
615
616    if asdu_type == common.AsduType.F_FR_NA:
617        file_name = int.from_bytes(io_bytes[:2], 'little')
618        file_length = int.from_bytes(io_bytes[2:5], 'little')
619        ready = bool(io_bytes[5] & 0x80)
620        io_bytes = io_bytes[6:]
621
622        element = common.IoElement_F_FR_NA(file_name=file_name,
623                                           file_length=file_length,
624                                           ready=ready)
625        return element, io_bytes
626
627    if asdu_type == common.AsduType.F_SR_NA:
628        file_name = int.from_bytes(io_bytes[:2], 'little')
629        section_name = io_bytes[2]
630        section_length = int.from_bytes(io_bytes[3:6], 'little')
631        ready = bool(io_bytes[6] & 0x80)
632        io_bytes = io_bytes[7:]
633
634        element = common.IoElement_F_SR_NA(file_name=file_name,
635                                           section_name=section_name,
636                                           section_length=section_length,
637                                           ready=ready)
638        return element, io_bytes
639
640    if asdu_type == common.AsduType.F_SC_NA:
641        file_name = int.from_bytes(io_bytes[:2], 'little')
642        section_name = io_bytes[2]
643        qualifier = io_bytes[3]
644        io_bytes = io_bytes[4:]
645
646        element = common.IoElement_F_SC_NA(file_name=file_name,
647                                           section_name=section_name,
648                                           qualifier=qualifier)
649        return element, io_bytes
650
651    if asdu_type == common.AsduType.F_LS_NA:
652        file_name = int.from_bytes(io_bytes[:2], 'little')
653        section_name = io_bytes[2]
654        last_qualifier = io_bytes[3]
655        checksum = io_bytes[4]
656        io_bytes = io_bytes[5:]
657
658        element = common.IoElement_F_LS_NA(file_name=file_name,
659                                           section_name=section_name,
660                                           last_qualifier=last_qualifier,
661                                           checksum=checksum)
662        return element, io_bytes
663
664    if asdu_type == common.AsduType.F_AF_NA:
665        file_name = int.from_bytes(io_bytes[:2], 'little')
666        section_name = io_bytes[2]
667        qualifier = io_bytes[3]
668        io_bytes = io_bytes[4:]
669
670        element = common.IoElement_F_AF_NA(file_name=file_name,
671                                           section_name=section_name,
672                                           qualifier=qualifier)
673        return element, io_bytes
674
675    if asdu_type == common.AsduType.F_SG_NA:
676        file_name = int.from_bytes(io_bytes[:2], 'little')
677        section_name = io_bytes[2]
678        length = io_bytes[3]
679        segment = io_bytes[4:4+length]
680        io_bytes = io_bytes[4+length:]
681
682        element = common.IoElement_F_SG_NA(file_name=file_name,
683                                           section_name=section_name,
684                                           segment=segment)
685        return element, io_bytes
686
687    if asdu_type == common.AsduType.F_DR_TA:
688        file_name = int.from_bytes(io_bytes[:2], 'little')
689        file_length = int.from_bytes(io_bytes[2:5], 'little')
690        more_follows = bool(io_bytes[5] & 0x20)
691        is_directory = bool(io_bytes[5] & 0x40)
692        transfer_active = bool(io_bytes[5] & 0x80)
693        creation_time = encoder.decode_time(io_bytes[6:13],
694                                            common.TimeSize.SEVEN)
695        io_bytes = io_bytes[13:]
696
697        element = common.IoElement_F_DR_TA(file_name=file_name,
698                                           file_length=file_length,
699                                           more_follows=more_follows,
700                                           is_directory=is_directory,
701                                           transfer_active=transfer_active,
702                                           creation_time=creation_time)
703        return element, io_bytes
704
705    raise ValueError('unsupported ASDU type')
708def encode_io_element(element: common.IoElement,
709                      asdu_type: int
710                      ) -> typing.Iterable[int]:
711    asdu_type = _decode_asdu_type(asdu_type)
712
713    if isinstance(element, common.IoElement_M_SP_NA):
714        quality = util.first(encode_quality(element.quality))
715        yield element.value.value | quality
716
717    elif isinstance(element, common.IoElement_M_SP_TA):
718        quality = util.first(encode_quality(element.quality))
719        yield element.value.value | quality
720
721    elif isinstance(element, common.IoElement_M_DP_NA):
722        quality = util.first(encode_quality(element.quality))
723        yield element.value.value | quality
724
725    elif isinstance(element, common.IoElement_M_DP_TA):
726        quality = util.first(encode_quality(element.quality))
727        yield element.value.value | quality
728
729    elif isinstance(element, common.IoElement_M_ST_NA):
730        yield from encode_step_position_value(element.value)
731        yield from encode_quality(element.quality)
732
733    elif isinstance(element, common.IoElement_M_ST_TA):
734        yield from encode_step_position_value(element.value)
735        yield from encode_quality(element.quality)
736
737    elif isinstance(element, common.IoElement_M_BO_NA):
738        yield from encode_bitstring_value(element.value)
739        yield from encode_quality(element.quality)
740
741    elif isinstance(element, common.IoElement_M_BO_TA):
742        yield from encode_bitstring_value(element.value)
743        yield from encode_quality(element.quality)
744
745    elif isinstance(element, common.IoElement_M_ME_NA):
746        yield from encode_normalized_value(element.value)
747        yield from encode_quality(element.quality)
748
749    elif isinstance(element, common.IoElement_M_ME_TA):
750        yield from encode_normalized_value(element.value)
751        yield from encode_quality(element.quality)
752
753    elif isinstance(element, common.IoElement_M_ME_NB):
754        yield from encode_scaled_value(element.value)
755        yield from encode_quality(element.quality)
756
757    elif isinstance(element, common.IoElement_M_ME_TB):
758        yield from encode_scaled_value(element.value)
759        yield from encode_quality(element.quality)
760
761    elif isinstance(element, common.IoElement_M_ME_NC):
762        yield from encode_floating_value(element.value)
763        yield from encode_quality(element.quality)
764
765    elif isinstance(element, common.IoElement_M_ME_TC):
766        yield from encode_floating_value(element.value)
767        yield from encode_quality(element.quality)
768
769    elif isinstance(element, common.IoElement_M_IT_NA):
770        yield from encode_binary_counter_value(element.value)
771        yield from encode_quality(element.quality)
772
773    elif isinstance(element, common.IoElement_M_IT_TA):
774        yield from encode_binary_counter_value(element.value)
775        yield from encode_quality(element.quality)
776
777    elif isinstance(element, common.IoElement_M_EP_TA):
778        quality = util.first(encode_quality(element.quality))
779        yield element.value.value | quality
780        yield from element.elapsed_time.to_bytes(2, 'little')
781
782    elif isinstance(element, common.IoElement_M_EP_TB):
783        yield from encode_protection_start_value(element.value)
784        yield from encode_quality(element.quality)
785        yield from element.duration_time.to_bytes(2, 'little')
786
787    elif isinstance(element, common.IoElement_M_EP_TC):
788        yield from encode_protection_command_value(element.value)
789        yield from encode_quality(element.quality)
790        yield from element.operating_time.to_bytes(2, 'little')
791
792    elif isinstance(element, common.IoElement_M_PS_NA):
793        yield from encode_status_value(element.value)
794        yield from encode_quality(element.quality)
795
796    elif isinstance(element, common.IoElement_M_ME_ND):
797        yield from encode_normalized_value(element.value)
798
799    elif isinstance(element, common.IoElement_M_SP_TB):
800        quality = util.first(encode_quality(element.quality))
801        yield element.value.value | quality
802
803    elif isinstance(element, common.IoElement_M_DP_TB):
804        quality = util.first(encode_quality(element.quality))
805        yield element.value.value | quality
806
807    elif isinstance(element, common.IoElement_M_ST_TB):
808        yield from encode_step_position_value(element.value)
809        yield from encode_quality(element.quality)
810
811    elif isinstance(element, common.IoElement_M_BO_TB):
812        yield from encode_bitstring_value(element.value)
813        yield from encode_quality(element.quality)
814
815    elif isinstance(element, common.IoElement_M_ME_TD):
816        yield from encode_normalized_value(element.value)
817        yield from encode_quality(element.quality)
818
819    elif isinstance(element, common.IoElement_M_ME_TE):
820        yield from encode_scaled_value(element.value)
821        yield from encode_quality(element.quality)
822
823    elif isinstance(element, common.IoElement_M_ME_TF):
824        yield from encode_floating_value(element.value)
825        yield from encode_quality(element.quality)
826
827    elif isinstance(element, common.IoElement_M_IT_TB):
828        yield from encode_binary_counter_value(element.value)
829        yield from encode_quality(element.quality)
830
831    elif isinstance(element, common.IoElement_M_EP_TD):
832        quality = util.first(encode_quality(element.quality))
833        yield element.value.value | quality
834        yield from element.elapsed_time.to_bytes(2, 'little')
835
836    elif isinstance(element, common.IoElement_M_EP_TE):
837        yield from encode_protection_start_value(element.value)
838        yield from encode_quality(element.quality)
839        yield from element.duration_time.to_bytes(2, 'little')
840
841    elif isinstance(element, common.IoElement_M_EP_TF):
842        yield from encode_protection_command_value(element.value)
843        yield from encode_quality(element.quality)
844        yield from element.operating_time.to_bytes(2, 'little')
845
846    elif isinstance(element, common.IoElement_C_SC_NA):
847        yield (element.value.value |
848               (0x80 if element.select else 0) |
849               ((element.qualifier & 0x1F) << 2))
850
851    elif isinstance(element, common.IoElement_C_DC_NA):
852        yield (element.value.value |
853               (0x80 if element.select else 0) |
854               ((element.qualifier & 0x1F) << 2))
855
856    elif isinstance(element, common.IoElement_C_RC_NA):
857        yield (element.value.value |
858               (0x80 if element.select else 0) |
859               ((element.qualifier & 0x1F) << 2))
860
861    elif isinstance(element, common.IoElement_C_SE_NA):
862        yield from encode_normalized_value(element.value)
863        yield (0x80 if element.select else 0)
864
865    elif isinstance(element, common.IoElement_C_SE_NB):
866        yield from encode_scaled_value(element.value)
867        yield (0x80 if element.select else 0)
868
869    elif isinstance(element, common.IoElement_C_SE_NC):
870        yield from encode_floating_value(element.value)
871        yield (0x80 if element.select else 0)
872
873    elif isinstance(element, common.IoElement_C_BO_NA):
874        yield from encode_bitstring_value(element.value)
875
876    elif isinstance(element, common.IoElement_M_EI_NA):
877        yield ((0x80 if element.param_change else 0x00) |
878               (element.cause & 0x7F))
879
880    elif isinstance(element, common.IoElement_C_IC_NA):
881        yield element.qualifier & 0xFF
882
883    elif isinstance(element, common.IoElement_C_CI_NA):
884        yield ((element.freeze.value << 6) |
885               (element.request & 0x3F))
886
887    elif isinstance(element, common.IoElement_C_RD_NA):
888        pass
889
890    elif isinstance(element, common.IoElement_C_CS_NA):
891        yield from encoder.encode_time(element.time, common.TimeSize.SEVEN)
892
893    elif isinstance(element, common.IoElement_C_TS_NA):
894        yield 0xAA
895        yield 0x55
896
897    elif isinstance(element, common.IoElement_C_RP_NA):
898        yield element.qualifier & 0xFF
899
900    elif isinstance(element, common.IoElement_C_CD_NA):
901        yield from element.time.to_bytes(2, 'little')
902
903    elif isinstance(element, common.IoElement_P_ME_NA):
904        yield from encode_normalized_value(element.value)
905        yield element.qualifier & 0xFF
906
907    elif isinstance(element, common.IoElement_P_ME_NB):
908        yield from encode_scaled_value(element.value)
909        yield element.qualifier & 0xFF
910
911    elif isinstance(element, common.IoElement_P_ME_NC):
912        yield from encode_floating_value(element.value)
913        yield element.qualifier & 0xFF
914
915    elif isinstance(element, common.IoElement_P_AC_NA):
916        yield element.qualifier & 0xFF
917
918    elif isinstance(element, common.IoElement_F_FR_NA):
919        yield from element.file_name.to_bytes(2, 'little')
920        yield from element.file_length.to_bytes(3, 'little')
921        yield (0x80 if element.ready else 0x00)
922
923    elif isinstance(element, common.IoElement_F_SR_NA):
924        yield from element.file_name.to_bytes(2, 'little')
925        yield element.section_name & 0xFF
926        yield from element.section_length.to_bytes(3, 'little')
927        yield (0x80 if element.ready else 0x00)
928
929    elif isinstance(element, common.IoElement_F_SC_NA):
930        yield from element.file_name.to_bytes(2, 'little')
931        yield element.section_name & 0xFF
932        yield element.qualifier & 0xFF
933
934    elif isinstance(element, common.IoElement_F_LS_NA):
935        yield from element.file_name.to_bytes(2, 'little')
936        yield element.section_name & 0xFF
937        yield element.last_qualifier & 0xFF
938        yield element.checksum & 0xFF
939
940    elif isinstance(element, common.IoElement_F_AF_NA):
941        yield from element.file_name.to_bytes(2, 'little')
942        yield element.section_name & 0xFF
943        yield element.qualifier & 0xFF
944
945    elif isinstance(element, common.IoElement_F_SG_NA):
946        yield from element.file_name.to_bytes(2, 'little')
947        yield element.section_name & 0xFF
948        yield len(element.segment)
949        yield from element.segment
950
951    elif isinstance(element, common.IoElement_F_DR_TA):
952        yield from element.file_name.to_bytes(2, 'little')
953        yield from element.file_length.to_bytes(3, 'little')
954        yield ((0x20 if element.more_follows else 0x00) |
955               (0x40 if element.is_directory else 0x00) |
956               (0x80 if element.transfer_active else 0x00))
957        yield from encoder.encode_time(element.creation_time,
958                                       common.TimeSize.SEVEN)
959
960    else:
961        raise ValueError('unsupported IO element')
def decode_quality( io_bytes: bytes | bytearray | memoryview, quality_type: QualityType) -> tuple[IndicationQuality | MeasurementQuality | CounterQuality | ProtectionQuality, bytes | bytearray | memoryview]:
 964def decode_quality(io_bytes: util.Bytes,
 965                   quality_type: common.QualityType
 966                   ) -> tuple[common.Quality, util.Bytes]:
 967    if quality_type == common.QualityType.INDICATION:
 968        invalid = bool(io_bytes[0] & 0x80)
 969        not_topical = bool(io_bytes[0] & 0x40)
 970        substituted = bool(io_bytes[0] & 0x20)
 971        blocked = bool(io_bytes[0] & 0x10)
 972        quality = common.IndicationQuality(invalid=invalid,
 973                                           not_topical=not_topical,
 974                                           substituted=substituted,
 975                                           blocked=blocked)
 976
 977    elif quality_type == common.QualityType.MEASUREMENT:
 978        invalid = bool(io_bytes[0] & 0x80)
 979        not_topical = bool(io_bytes[0] & 0x40)
 980        substituted = bool(io_bytes[0] & 0x20)
 981        blocked = bool(io_bytes[0] & 0x10)
 982        overflow = bool(io_bytes[0] & 0x01)
 983        quality = common.MeasurementQuality(invalid=invalid,
 984                                            not_topical=not_topical,
 985                                            substituted=substituted,
 986                                            blocked=blocked,
 987                                            overflow=overflow)
 988
 989    elif quality_type == common.QualityType.COUNTER:
 990        invalid = bool(io_bytes[0] & 0x80)
 991        adjusted = bool(io_bytes[0] & 0x40)
 992        overflow = bool(io_bytes[0] & 0x20)
 993        sequence = io_bytes[0] & 0x1F
 994        quality = common.CounterQuality(invalid=invalid,
 995                                        adjusted=adjusted,
 996                                        overflow=overflow,
 997                                        sequence=sequence)
 998
 999    elif quality_type == common.QualityType.PROTECTION:
1000        invalid = bool(io_bytes[0] & 0x80)
1001        not_topical = bool(io_bytes[0] & 0x40)
1002        substituted = bool(io_bytes[0] & 0x20)
1003        blocked = bool(io_bytes[0] & 0x10)
1004        time_invalid = bool(io_bytes[0] & 0x08)
1005        quality = common.ProtectionQuality(invalid=invalid,
1006                                           not_topical=not_topical,
1007                                           substituted=substituted,
1008                                           blocked=blocked,
1009                                           time_invalid=time_invalid)
1010
1011    else:
1012        raise ValueError('unsupported quality type')
1013
1014    return quality, io_bytes[1:]
def encode_quality( quality: IndicationQuality | MeasurementQuality | CounterQuality | ProtectionQuality) -> Iterable[int]:
1017def encode_quality(quality: common.Quality
1018                   ) -> typing.Iterable[int]:
1019    if isinstance(quality, common.IndicationQuality):
1020        yield ((0x80 if quality.invalid else 0) |
1021               (0x40 if quality.not_topical else 0) |
1022               (0x20 if quality.substituted else 0) |
1023               (0x10 if quality.blocked else 0))
1024
1025    elif isinstance(quality, common.MeasurementQuality):
1026        yield ((0x80 if quality.invalid else 0) |
1027               (0x40 if quality.not_topical else 0) |
1028               (0x20 if quality.substituted else 0) |
1029               (0x10 if quality.blocked else 0) |
1030               (0x01 if quality.overflow else 0))
1031
1032    elif isinstance(quality, common.CounterQuality):
1033        yield ((0x80 if quality.invalid else 0) |
1034               (0x40 if quality.adjusted else 0) |
1035               (0x20 if quality.overflow else 0) |
1036               (quality.sequence & 0x1F))
1037
1038    elif isinstance(quality, common.ProtectionQuality):
1039        yield ((0x80 if quality.invalid else 0) |
1040               (0x40 if quality.not_topical else 0) |
1041               (0x20 if quality.substituted else 0) |
1042               (0x10 if quality.blocked else 0) |
1043               (0x08 if quality.time_invalid else 0))
1044
1045    else:
1046        raise ValueError('unsupported quality')
def decode_step_position_value( io_bytes: bytes | bytearray | memoryview) -> tuple[StepPositionValue, bytes | bytearray | memoryview]:
1049def decode_step_position_value(io_bytes: util.Bytes
1050                               ) -> tuple[common.StepPositionValue,
1051                                          util.Bytes]:
1052    value = (((-1 << 7) if io_bytes[0] & 0x40 else 0) |
1053             (io_bytes[0] & 0x7F))
1054    transient = bool(io_bytes[0] & 0x80)
1055    step_position_value = common.StepPositionValue(value=value,
1056                                                   transient=transient)
1057    return step_position_value, io_bytes[1:]
def encode_step_position_value( value: StepPositionValue) -> Iterable[int]:
1060def encode_step_position_value(value: common.StepPositionValue
1061                               ) -> typing.Iterable[int]:
1062    yield ((0x80 if value.transient else 0) |
1063           (value.value & 0x7F))
def decode_bitstring_value( io_bytes: bytes | bytearray | memoryview) -> tuple[BitstringValue, bytes | bytearray | memoryview]:
1066def decode_bitstring_value(io_bytes: util.Bytes
1067                           ) -> tuple[common.BitstringValue, util.Bytes]:
1068    value = io_bytes[:4]
1069    bitstring_value = common.BitstringValue(value)
1070    return bitstring_value, io_bytes[4:]
def encode_bitstring_value( value: BitstringValue) -> Iterable[int]:
1073def encode_bitstring_value(value: common.BitstringValue
1074                           ) -> typing.Iterable[int]:
1075    yield value.value[0]
1076    yield value.value[1]
1077    yield value.value[2]
1078    yield value.value[3]
def decode_normalized_value( io_bytes: bytes | bytearray | memoryview) -> tuple[NormalizedValue, bytes | bytearray | memoryview]:
1081def decode_normalized_value(io_bytes: util.Bytes
1082                            ) -> tuple[common.NormalizedValue, util.Bytes]:
1083    value = struct.unpack('<h', io_bytes[:2])[0] / 0x7fff
1084    normalized_value = common.NormalizedValue(value)
1085    return normalized_value, io_bytes[2:]
def encode_normalized_value( value: NormalizedValue) -> Iterable[int]:
1088def encode_normalized_value(value: common.NormalizedValue
1089                            ) -> typing.Iterable[int]:
1090    yield from struct.pack('<h', round(value.value * 0x7fff))
def decode_scaled_value( io_bytes: bytes | bytearray | memoryview) -> tuple[ScaledValue, bytes | bytearray | memoryview]:
1093def decode_scaled_value(io_bytes: util.Bytes
1094                        ) -> tuple[common.ScaledValue, util.Bytes]:
1095    value = struct.unpack('<h', io_bytes[:2])[0]
1096    scaled_value = common.ScaledValue(value)
1097    return scaled_value, io_bytes[2:]
def encode_scaled_value( value: ScaledValue) -> Iterable[int]:
1100def encode_scaled_value(value: common.ScaledValue
1101                        ) -> typing.Iterable[int]:
1102    yield from struct.pack('<h', value.value)
def decode_floating_value( io_bytes: bytes | bytearray | memoryview) -> tuple[FloatingValue, bytes | bytearray | memoryview]:
1105def decode_floating_value(io_bytes: util.Bytes
1106                          ) -> tuple[common.FloatingValue, util.Bytes]:
1107    value = struct.unpack('<f', io_bytes[:4])[0]
1108    floating_value = common.FloatingValue(value)
1109    return floating_value, io_bytes[4:]
def encode_floating_value( value: FloatingValue) -> Iterable[int]:
1112def encode_floating_value(value: common.FloatingValue
1113                          ) -> typing.Iterable[int]:
1114    yield from struct.pack('<f', value.value)
def decode_binary_counter_value( io_bytes: bytes | bytearray | memoryview) -> tuple[BinaryCounterValue, bytes | bytearray | memoryview]:
1117def decode_binary_counter_value(io_bytes: util.Bytes
1118                                ) -> tuple[common.BinaryCounterValue,
1119                                           util.Bytes]:
1120    value = struct.unpack('<i', io_bytes[:4])[0]
1121    binary_counter_value = common.BinaryCounterValue(value)
1122    return binary_counter_value, io_bytes[4:]
def encode_binary_counter_value( value: BinaryCounterValue) -> Iterable[int]:
1125def encode_binary_counter_value(value: common.BinaryCounterValue
1126                                ) -> typing.Iterable[int]:
1127    yield from struct.pack('<i', value.value)
def decode_protection_start_value( io_bytes: bytes | bytearray | memoryview) -> tuple[ProtectionStartValue, bytes | bytearray | memoryview]:
1130def decode_protection_start_value(io_bytes: util.Bytes
1131                                  ) -> tuple[common.ProtectionStartValue,
1132                                             util.Bytes]:
1133    general = bool(io_bytes[0] & 0x01)
1134    l1 = bool(io_bytes[0] & 0x02)
1135    l2 = bool(io_bytes[0] & 0x04)
1136    l3 = bool(io_bytes[0] & 0x08)
1137    ie = bool(io_bytes[0] & 0x10)
1138    reverse = bool(io_bytes[0] & 0x20)
1139    protection_start_value = common.ProtectionStartValue(general=general,
1140                                                         l1=l1,
1141                                                         l2=l2,
1142                                                         l3=l3,
1143                                                         ie=ie,
1144                                                         reverse=reverse)
1145    return protection_start_value, io_bytes[1:]
def encode_protection_start_value( value: ProtectionStartValue) -> Iterable[int]:
1148def encode_protection_start_value(value: common.ProtectionStartValue
1149                                  ) -> typing.Iterable[int]:
1150    yield ((0x01 if value.general else 0x00) |
1151           (0x02 if value.l1 else 0x00) |
1152           (0x04 if value.l2 else 0x00) |
1153           (0x08 if value.l3 else 0x00) |
1154           (0x10 if value.ie else 0x00) |
1155           (0x20 if value.reverse else 0x00))
def decode_protection_command_value( io_bytes: bytes | bytearray | memoryview) -> tuple[ProtectionCommandValue, bytes | bytearray | memoryview]:
1158def decode_protection_command_value(io_bytes: util.Bytes
1159                                    ) -> tuple[common.ProtectionCommandValue,
1160                                               util.Bytes]:
1161    general = bool(io_bytes[0] & 0x01)
1162    l1 = bool(io_bytes[0] & 0x02)
1163    l2 = bool(io_bytes[0] & 0x04)
1164    l3 = bool(io_bytes[0] & 0x08)
1165    protection_command_value = common.ProtectionCommandValue(general=general,
1166                                                             l1=l1,
1167                                                             l2=l2,
1168                                                             l3=l3)
1169    return protection_command_value, io_bytes[1:]
def encode_protection_command_value( value: ProtectionCommandValue) -> Iterable[int]:
1172def encode_protection_command_value(value: common.ProtectionCommandValue
1173                                    ) -> typing.Iterable[int]:
1174    yield ((0x01 if value.general else 0x00) |
1175           (0x02 if value.l1 else 0x00) |
1176           (0x04 if value.l2 else 0x00) |
1177           (0x08 if value.l3 else 0x00))
def decode_status_value( io_bytes: bytes | bytearray | memoryview) -> tuple[StatusValue, bytes | bytearray | memoryview]:
1180def decode_status_value(io_bytes: util.Bytes
1181                        ) -> tuple[common.StatusValue, util.Bytes]:
1182    value = [bool(io_bytes[i // 8] & (1 << (i % 8)))
1183             for i in range(16)]
1184    change = [bool(io_bytes[2 + i // 8] & (1 << (i % 8)))
1185              for i in range(16)]
1186    status_value = common.StatusValue(value=value,
1187                                      change=change)
1188    return status_value, io_bytes[4:]
def encode_status_value( value: StatusValue) -> Iterable[int]:
1191def encode_status_value(value: common.StatusValue
1192                        ) -> typing.Iterable[int]:
1193    for i in [value.value, value.change]:
1194        for j in range(2):
1195            acc = 0
1196            for k in range(8):
1197                if i[j * 8 + k]:
1198                    acc = acc | (1 << k)
1199            yield acc