hat.drivers.iec60870.encodings.iec101

IEC 60870-5-101 messages

  1"""IEC 60870-5-101 messages"""
  2
  3from hat.drivers.iec60870.encodings.iec101.common import (
  4    AsduTypeError,
  5    CauseSize,
  6    AsduAddressSize,
  7    IoAddressSize,
  8    TimeSize,
  9    Time,
 10    time_from_datetime,
 11    time_to_datetime,
 12    OriginatorAddress,
 13    AsduAddress,
 14    IoAddress,
 15    OtherCauseType,
 16    AsduType,
 17    CauseType,
 18    Cause,
 19    QualityType,
 20    IndicationQuality,
 21    MeasurementQuality,
 22    CounterQuality,
 23    ProtectionQuality,
 24    Quality,
 25    FreezeCode,
 26    SingleValue,
 27    DoubleValue,
 28    RegulatingValue,
 29    StepPositionValue,
 30    BitstringValue,
 31    NormalizedValue,
 32    ScaledValue,
 33    FloatingValue,
 34    BinaryCounterValue,
 35    ProtectionValue,
 36    ProtectionStartValue,
 37    ProtectionCommandValue,
 38    StatusValue,
 39    IoElement_M_SP_NA,
 40    IoElement_M_SP_TA,
 41    IoElement_M_DP_NA,
 42    IoElement_M_DP_TA,
 43    IoElement_M_ST_NA,
 44    IoElement_M_ST_TA,
 45    IoElement_M_BO_NA,
 46    IoElement_M_BO_TA,
 47    IoElement_M_ME_NA,
 48    IoElement_M_ME_TA,
 49    IoElement_M_ME_NB,
 50    IoElement_M_ME_TB,
 51    IoElement_M_ME_NC,
 52    IoElement_M_ME_TC,
 53    IoElement_M_IT_NA,
 54    IoElement_M_IT_TA,
 55    IoElement_M_EP_TA,
 56    IoElement_M_EP_TB,
 57    IoElement_M_EP_TC,
 58    IoElement_M_PS_NA,
 59    IoElement_M_ME_ND,
 60    IoElement_M_SP_TB,
 61    IoElement_M_DP_TB,
 62    IoElement_M_ST_TB,
 63    IoElement_M_BO_TB,
 64    IoElement_M_ME_TD,
 65    IoElement_M_ME_TE,
 66    IoElement_M_ME_TF,
 67    IoElement_M_IT_TB,
 68    IoElement_M_EP_TD,
 69    IoElement_M_EP_TE,
 70    IoElement_M_EP_TF,
 71    IoElement_C_SC_NA,
 72    IoElement_C_DC_NA,
 73    IoElement_C_RC_NA,
 74    IoElement_C_SE_NA,
 75    IoElement_C_SE_NB,
 76    IoElement_C_SE_NC,
 77    IoElement_C_BO_NA,
 78    IoElement_M_EI_NA,
 79    IoElement_C_IC_NA,
 80    IoElement_C_CI_NA,
 81    IoElement_C_RD_NA,
 82    IoElement_C_CS_NA,
 83    IoElement_C_TS_NA,
 84    IoElement_C_RP_NA,
 85    IoElement_C_CD_NA,
 86    IoElement_P_ME_NA,
 87    IoElement_P_ME_NB,
 88    IoElement_P_ME_NC,
 89    IoElement_P_AC_NA,
 90    IoElement_F_FR_NA,
 91    IoElement_F_SR_NA,
 92    IoElement_F_SC_NA,
 93    IoElement_F_LS_NA,
 94    IoElement_F_AF_NA,
 95    IoElement_F_SG_NA,
 96    IoElement_F_DR_TA,
 97    IoElement,
 98    IO,
 99    ASDU)
100from hat.drivers.iec60870.encodings.iec101.encoder import (
101    Encoder,
102    asdu_type_time_sizes,
103    decode_cause,
104    encode_cause,
105    decode_cause_type,
106    encode_cause_type,
107    decode_io_element,
108    encode_io_element,
109    decode_quality,
110    encode_quality,
111    decode_step_position_value,
112    encode_step_position_value,
113    decode_bitstring_value,
114    encode_bitstring_value,
115    decode_normalized_value,
116    encode_normalized_value,
117    decode_scaled_value,
118    encode_scaled_value,
119    decode_floating_value,
120    encode_floating_value,
121    decode_binary_counter_value,
122    encode_binary_counter_value,
123    decode_protection_start_value,
124    encode_protection_start_value,
125    decode_protection_command_value,
126    encode_protection_command_value,
127    decode_status_value,
128    encode_status_value)
129
130
131__all__ = ['AsduTypeError',
132           'CauseSize',
133           'AsduAddressSize',
134           'IoAddressSize',
135           'TimeSize',
136           'Time',
137           'time_from_datetime',
138           'time_to_datetime',
139           'OriginatorAddress',
140           'AsduAddress',
141           'IoAddress',
142           'OtherCauseType',
143           'AsduType',
144           'CauseType',
145           'Cause',
146           'QualityType',
147           'IndicationQuality',
148           'MeasurementQuality',
149           'CounterQuality',
150           'ProtectionQuality',
151           'Quality',
152           'FreezeCode',
153           'SingleValue',
154           'DoubleValue',
155           'RegulatingValue',
156           'StepPositionValue',
157           'BitstringValue',
158           'NormalizedValue',
159           'ScaledValue',
160           'FloatingValue',
161           'BinaryCounterValue',
162           'ProtectionValue',
163           'ProtectionStartValue',
164           'ProtectionCommandValue',
165           'StatusValue',
166           'IoElement_M_SP_NA',
167           'IoElement_M_SP_TA',
168           'IoElement_M_DP_NA',
169           'IoElement_M_DP_TA',
170           'IoElement_M_ST_NA',
171           'IoElement_M_ST_TA',
172           'IoElement_M_BO_NA',
173           'IoElement_M_BO_TA',
174           'IoElement_M_ME_NA',
175           'IoElement_M_ME_TA',
176           'IoElement_M_ME_NB',
177           'IoElement_M_ME_TB',
178           'IoElement_M_ME_NC',
179           'IoElement_M_ME_TC',
180           'IoElement_M_IT_NA',
181           'IoElement_M_IT_TA',
182           'IoElement_M_EP_TA',
183           'IoElement_M_EP_TB',
184           'IoElement_M_EP_TC',
185           'IoElement_M_PS_NA',
186           'IoElement_M_ME_ND',
187           'IoElement_M_SP_TB',
188           'IoElement_M_DP_TB',
189           'IoElement_M_ST_TB',
190           'IoElement_M_BO_TB',
191           'IoElement_M_ME_TD',
192           'IoElement_M_ME_TE',
193           'IoElement_M_ME_TF',
194           'IoElement_M_IT_TB',
195           'IoElement_M_EP_TD',
196           'IoElement_M_EP_TE',
197           'IoElement_M_EP_TF',
198           'IoElement_C_SC_NA',
199           'IoElement_C_DC_NA',
200           'IoElement_C_RC_NA',
201           'IoElement_C_SE_NA',
202           'IoElement_C_SE_NB',
203           'IoElement_C_SE_NC',
204           'IoElement_C_BO_NA',
205           'IoElement_M_EI_NA',
206           'IoElement_C_IC_NA',
207           'IoElement_C_CI_NA',
208           'IoElement_C_RD_NA',
209           'IoElement_C_CS_NA',
210           'IoElement_C_TS_NA',
211           'IoElement_C_RP_NA',
212           'IoElement_C_CD_NA',
213           'IoElement_P_ME_NA',
214           'IoElement_P_ME_NB',
215           'IoElement_P_ME_NC',
216           'IoElement_P_AC_NA',
217           'IoElement_F_FR_NA',
218           'IoElement_F_SR_NA',
219           'IoElement_F_SC_NA',
220           'IoElement_F_LS_NA',
221           'IoElement_F_AF_NA',
222           'IoElement_F_SG_NA',
223           'IoElement_F_DR_TA',
224           'IoElement',
225           'IO',
226           'ASDU',
227           'Encoder',
228           'asdu_type_time_sizes',
229           'decode_cause',
230           'encode_cause',
231           'decode_cause_type',
232           'encode_cause_type',
233           'decode_io_element',
234           'encode_io_element',
235           'decode_quality',
236           'encode_quality',
237           'decode_step_position_value',
238           'encode_step_position_value',
239           'decode_bitstring_value',
240           'encode_bitstring_value',
241           'decode_normalized_value',
242           'encode_normalized_value',
243           'decode_scaled_value',
244           'encode_scaled_value',
245           'decode_floating_value',
246           'encode_floating_value',
247           'decode_binary_counter_value',
248           'encode_binary_counter_value',
249           'decode_protection_start_value',
250           'encode_protection_start_value',
251           'decode_protection_command_value',
252           'encode_protection_command_value',
253           'decode_status_value',
254           'encode_status_value']
class AsduTypeError(builtins.Exception):
12class AsduTypeError(Exception):
13    pass

Common base class for all non-exit exceptions.

class CauseSize(enum.Enum):
 8class CauseSize(enum.Enum):
 9    ONE = 1
10    TWO = 2

An enumeration.

ONE = <CauseSize.ONE: 1>
TWO = <CauseSize.TWO: 2>
class AsduAddressSize(enum.Enum):
13class AsduAddressSize(enum.Enum):
14    ONE = 1
15    TWO = 2

An enumeration.

ONE = <AsduAddressSize.ONE: 1>
TWO = <AsduAddressSize.TWO: 2>
class IoAddressSize(enum.Enum):
18class IoAddressSize(enum.Enum):
19    ONE = 1
20    TWO = 2
21    THREE = 3

An enumeration.

ONE = <IoAddressSize.ONE: 1>
TWO = <IoAddressSize.TWO: 2>
THREE = <IoAddressSize.THREE: 3>
class TimeSize(enum.Enum):
24class TimeSize(enum.Enum):
25    TWO = 2
26    THREE = 3
27    FOUR = 4
28    SEVEN = 7

An enumeration.

TWO = <TimeSize.TWO: 2>
THREE = <TimeSize.THREE: 3>
FOUR = <TimeSize.FOUR: 4>
SEVEN = <TimeSize.SEVEN: 7>
class Time(typing.NamedTuple):
31class Time(typing.NamedTuple):
32    size: TimeSize
33    milliseconds: int
34    """milliseconds in range [0, 59999]"""
35    invalid: bool | None
36    """available for size THREE, FOUR, SEVEN"""
37    minutes: int | None
38    """available for size THREE, FOUR, SEVEN (minutes in range [0, 59])"""
39    summer_time: bool | None
40    """available for size FOUR, SEVEN"""
41    hours: int | None
42    """available for size FOUR, SEVEN (hours in range [0, 23])"""
43    day_of_week: int | None
44    """available for size SEVEN (day_of_week in range [1, 7])"""
45    day_of_month: int | None
46    """available for size SEVEN (day_of_month in range [1, 31])"""
47    months: int | None
48    """available for size SEVEN (months in range [1, 12])"""
49    years: int | None
50    """available for size SEVEN (years in range [0, 99])"""

Time(size, milliseconds, invalid, minutes, summer_time, hours, day_of_week, day_of_month, months, years)

Time( size: TimeSize, milliseconds: int, invalid: bool | None, minutes: int | None, summer_time: bool | None, hours: int | None, day_of_week: int | None, day_of_month: int | None, months: int | None, years: int | None)

Create new instance of Time(size, milliseconds, invalid, minutes, summer_time, hours, day_of_week, day_of_month, months, years)

size: TimeSize

Alias for field number 0

milliseconds: int

milliseconds in range [0, 59999]

invalid: bool | None

available for size THREE, FOUR, SEVEN

minutes: int | None

available for size THREE, FOUR, SEVEN (minutes in range [0, 59])

summer_time: bool | None

available for size FOUR, SEVEN

hours: int | None

available for size FOUR, SEVEN (hours in range [0, 23])

day_of_week: int | None

available for size SEVEN (day_of_week in range [1, 7])

day_of_month: int | None

available for size SEVEN (day_of_month in range [1, 31])

months: int | None

available for size SEVEN (months in range [1, 12])

years: int | None

available for size SEVEN (years in range [0, 99])

def time_from_datetime( dt: datetime.datetime, invalid: bool = False) -> Time:
66def time_from_datetime(dt: datetime.datetime,
67                       invalid: bool = False
68                       ) -> Time:
69    """Create Time from datetime.datetime"""
70    # TODO document edge cases (local time, os implementation, ...)
71    #  rounding microseconds to the nearest millisecond
72    dt_rounded = (
73        dt.replace(microsecond=0) +
74        datetime.timedelta(milliseconds=round(dt.microsecond / 1000)))
75    local_time = time.localtime(dt_rounded.timestamp())
76
77    return Time(
78        size=TimeSize.SEVEN,
79        milliseconds=(local_time.tm_sec * 1000 +
80                      dt_rounded.microsecond // 1000),
81        invalid=invalid,
82        minutes=local_time.tm_min,
83        summer_time=bool(local_time.tm_isdst),
84        hours=local_time.tm_hour,
85        day_of_week=local_time.tm_wday + 1,
86        day_of_month=local_time.tm_mday,
87        months=local_time.tm_mon,
88        years=local_time.tm_year % 100)

Create Time from datetime.datetime

def time_to_datetime(t: Time) -> datetime.datetime:
 91def time_to_datetime(t: Time
 92                     ) -> datetime.datetime:
 93    """Convert Time to datetime.datetime"""
 94    # TODO document edge cases (local time, os implementation, ...)
 95    # TODO maybe allow diferent time size (use now for time)
 96    if t.size != TimeSize.SEVEN:
 97        raise ValueError('unsupported time size')
 98
 99    local_dt = datetime.datetime(
100        year=2000 + t.years if t.years < 70 else 1900 + t.years,
101        month=t.months,
102        day=t.day_of_month,
103        hour=t.hours,
104        minute=t.minutes,
105        second=int(t.milliseconds / 1000),
106        microsecond=(t.milliseconds % 1000) * 1000,
107        fold=not t.summer_time)
108
109    return local_dt.astimezone(tz=datetime.timezone.utc)

Convert Time to datetime.datetime

OriginatorAddress = <class 'int'>
AsduAddress = <class 'int'>
IoAddress = <class 'int'>
OtherCauseType = <class 'int'>
class AsduType(enum.Enum):
29class AsduType(enum.Enum):
30    M_SP_NA = 1
31    M_SP_TA = 2
32    M_DP_NA = 3
33    M_DP_TA = 4
34    M_ST_NA = 5
35    M_ST_TA = 6
36    M_BO_NA = 7
37    M_BO_TA = 8
38    M_ME_NA = 9
39    M_ME_TA = 10
40    M_ME_NB = 11
41    M_ME_TB = 12
42    M_ME_NC = 13
43    M_ME_TC = 14
44    M_IT_NA = 15
45    M_IT_TA = 16
46    M_EP_TA = 17
47    M_EP_TB = 18
48    M_EP_TC = 19
49    M_PS_NA = 20
50    M_ME_ND = 21
51    M_SP_TB = 30
52    M_DP_TB = 31
53    M_ST_TB = 32
54    M_BO_TB = 33
55    M_ME_TD = 34
56    M_ME_TE = 35
57    M_ME_TF = 36
58    M_IT_TB = 37
59    M_EP_TD = 38
60    M_EP_TE = 39
61    M_EP_TF = 40
62    C_SC_NA = 45
63    C_DC_NA = 46
64    C_RC_NA = 47
65    C_SE_NA = 48
66    C_SE_NB = 49
67    C_SE_NC = 50
68    C_BO_NA = 51
69    M_EI_NA = 70
70    C_IC_NA = 100
71    C_CI_NA = 101
72    C_RD_NA = 102
73    C_CS_NA = 103
74    C_TS_NA = 104
75    C_RP_NA = 105
76    C_CD_NA = 106
77    P_ME_NA = 110
78    P_ME_NB = 111
79    P_ME_NC = 112
80    P_AC_NA = 113
81    F_FR_NA = 120
82    F_SR_NA = 121
83    F_SC_NA = 122
84    F_LS_NA = 123
85    F_AF_NA = 124
86    F_SG_NA = 125
87    F_DR_TA = 126

An enumeration.

M_SP_NA = <AsduType.M_SP_NA: 1>
M_SP_TA = <AsduType.M_SP_TA: 2>
M_DP_NA = <AsduType.M_DP_NA: 3>
M_DP_TA = <AsduType.M_DP_TA: 4>
M_ST_NA = <AsduType.M_ST_NA: 5>
M_ST_TA = <AsduType.M_ST_TA: 6>
M_BO_NA = <AsduType.M_BO_NA: 7>
M_BO_TA = <AsduType.M_BO_TA: 8>
M_ME_NA = <AsduType.M_ME_NA: 9>
M_ME_TA = <AsduType.M_ME_TA: 10>
M_ME_NB = <AsduType.M_ME_NB: 11>
M_ME_TB = <AsduType.M_ME_TB: 12>
M_ME_NC = <AsduType.M_ME_NC: 13>
M_ME_TC = <AsduType.M_ME_TC: 14>
M_IT_NA = <AsduType.M_IT_NA: 15>
M_IT_TA = <AsduType.M_IT_TA: 16>
M_EP_TA = <AsduType.M_EP_TA: 17>
M_EP_TB = <AsduType.M_EP_TB: 18>
M_EP_TC = <AsduType.M_EP_TC: 19>
M_PS_NA = <AsduType.M_PS_NA: 20>
M_ME_ND = <AsduType.M_ME_ND: 21>
M_SP_TB = <AsduType.M_SP_TB: 30>
M_DP_TB = <AsduType.M_DP_TB: 31>
M_ST_TB = <AsduType.M_ST_TB: 32>
M_BO_TB = <AsduType.M_BO_TB: 33>
M_ME_TD = <AsduType.M_ME_TD: 34>
M_ME_TE = <AsduType.M_ME_TE: 35>
M_ME_TF = <AsduType.M_ME_TF: 36>
M_IT_TB = <AsduType.M_IT_TB: 37>
M_EP_TD = <AsduType.M_EP_TD: 38>
M_EP_TE = <AsduType.M_EP_TE: 39>
M_EP_TF = <AsduType.M_EP_TF: 40>
C_SC_NA = <AsduType.C_SC_NA: 45>
C_DC_NA = <AsduType.C_DC_NA: 46>
C_RC_NA = <AsduType.C_RC_NA: 47>
C_SE_NA = <AsduType.C_SE_NA: 48>
C_SE_NB = <AsduType.C_SE_NB: 49>
C_SE_NC = <AsduType.C_SE_NC: 50>
C_BO_NA = <AsduType.C_BO_NA: 51>
M_EI_NA = <AsduType.M_EI_NA: 70>
C_IC_NA = <AsduType.C_IC_NA: 100>
C_CI_NA = <AsduType.C_CI_NA: 101>
C_RD_NA = <AsduType.C_RD_NA: 102>
C_CS_NA = <AsduType.C_CS_NA: 103>
C_TS_NA = <AsduType.C_TS_NA: 104>
C_RP_NA = <AsduType.C_RP_NA: 105>
C_CD_NA = <AsduType.C_CD_NA: 106>
P_ME_NA = <AsduType.P_ME_NA: 110>
P_ME_NB = <AsduType.P_ME_NB: 111>
P_ME_NC = <AsduType.P_ME_NC: 112>
P_AC_NA = <AsduType.P_AC_NA: 113>
F_FR_NA = <AsduType.F_FR_NA: 120>
F_SR_NA = <AsduType.F_SR_NA: 121>
F_SC_NA = <AsduType.F_SC_NA: 122>
F_LS_NA = <AsduType.F_LS_NA: 123>
F_AF_NA = <AsduType.F_AF_NA: 124>
F_SG_NA = <AsduType.F_SG_NA: 125>
F_DR_TA = <AsduType.F_DR_TA: 126>
class CauseType(enum.Enum):
 90class CauseType(enum.Enum):
 91    UNDEFINED = 0
 92    PERIODIC = 1
 93    BACKGROUND_SCAN = 2
 94    SPONTANEOUS = 3
 95    INITIALIZED = 4
 96    REQUEST = 5
 97    ACTIVATION = 6
 98    ACTIVATION_CONFIRMATION = 7
 99    DEACTIVATION = 8
100    DEACTIVATION_CONFIRMATION = 9
101    ACTIVATION_TERMINATION = 10
102    REMOTE_COMMAND = 11
103    LOCAL_COMMAND = 12
104    FILE_TRANSFER = 13
105    INTERROGATED_STATION = 20
106    INTERROGATED_GROUP01 = 21
107    INTERROGATED_GROUP02 = 22
108    INTERROGATED_GROUP03 = 23
109    INTERROGATED_GROUP04 = 24
110    INTERROGATED_GROUP05 = 25
111    INTERROGATED_GROUP06 = 26
112    INTERROGATED_GROUP07 = 27
113    INTERROGATED_GROUP08 = 28
114    INTERROGATED_GROUP09 = 29
115    INTERROGATED_GROUP10 = 30
116    INTERROGATED_GROUP11 = 31
117    INTERROGATED_GROUP12 = 32
118    INTERROGATED_GROUP13 = 33
119    INTERROGATED_GROUP14 = 34
120    INTERROGATED_GROUP15 = 35
121    INTERROGATED_GROUP16 = 36
122    INTERROGATED_COUNTER = 37
123    INTERROGATED_COUNTER01 = 38
124    INTERROGATED_COUNTER02 = 39
125    INTERROGATED_COUNTER03 = 40
126    INTERROGATED_COUNTER04 = 41
127    UNKNOWN_TYPE = 44
128    UNKNOWN_CAUSE = 45
129    UNKNOWN_ASDU_ADDRESS = 46
130    UNKNOWN_IO_ADDRESS = 47

An enumeration.

UNDEFINED = <CauseType.UNDEFINED: 0>
PERIODIC = <CauseType.PERIODIC: 1>
BACKGROUND_SCAN = <CauseType.BACKGROUND_SCAN: 2>
SPONTANEOUS = <CauseType.SPONTANEOUS: 3>
INITIALIZED = <CauseType.INITIALIZED: 4>
REQUEST = <CauseType.REQUEST: 5>
ACTIVATION = <CauseType.ACTIVATION: 6>
ACTIVATION_CONFIRMATION = <CauseType.ACTIVATION_CONFIRMATION: 7>
DEACTIVATION = <CauseType.DEACTIVATION: 8>
DEACTIVATION_CONFIRMATION = <CauseType.DEACTIVATION_CONFIRMATION: 9>
ACTIVATION_TERMINATION = <CauseType.ACTIVATION_TERMINATION: 10>
REMOTE_COMMAND = <CauseType.REMOTE_COMMAND: 11>
LOCAL_COMMAND = <CauseType.LOCAL_COMMAND: 12>
FILE_TRANSFER = <CauseType.FILE_TRANSFER: 13>
INTERROGATED_STATION = <CauseType.INTERROGATED_STATION: 20>
INTERROGATED_GROUP01 = <CauseType.INTERROGATED_GROUP01: 21>
INTERROGATED_GROUP02 = <CauseType.INTERROGATED_GROUP02: 22>
INTERROGATED_GROUP03 = <CauseType.INTERROGATED_GROUP03: 23>
INTERROGATED_GROUP04 = <CauseType.INTERROGATED_GROUP04: 24>
INTERROGATED_GROUP05 = <CauseType.INTERROGATED_GROUP05: 25>
INTERROGATED_GROUP06 = <CauseType.INTERROGATED_GROUP06: 26>
INTERROGATED_GROUP07 = <CauseType.INTERROGATED_GROUP07: 27>
INTERROGATED_GROUP08 = <CauseType.INTERROGATED_GROUP08: 28>
INTERROGATED_GROUP09 = <CauseType.INTERROGATED_GROUP09: 29>
INTERROGATED_GROUP10 = <CauseType.INTERROGATED_GROUP10: 30>
INTERROGATED_GROUP11 = <CauseType.INTERROGATED_GROUP11: 31>
INTERROGATED_GROUP12 = <CauseType.INTERROGATED_GROUP12: 32>
INTERROGATED_GROUP13 = <CauseType.INTERROGATED_GROUP13: 33>
INTERROGATED_GROUP14 = <CauseType.INTERROGATED_GROUP14: 34>
INTERROGATED_GROUP15 = <CauseType.INTERROGATED_GROUP15: 35>
INTERROGATED_GROUP16 = <CauseType.INTERROGATED_GROUP16: 36>
INTERROGATED_COUNTER = <CauseType.INTERROGATED_COUNTER: 37>
INTERROGATED_COUNTER01 = <CauseType.INTERROGATED_COUNTER01: 38>
INTERROGATED_COUNTER02 = <CauseType.INTERROGATED_COUNTER02: 39>
INTERROGATED_COUNTER03 = <CauseType.INTERROGATED_COUNTER03: 40>
INTERROGATED_COUNTER04 = <CauseType.INTERROGATED_COUNTER04: 41>
UNKNOWN_TYPE = <CauseType.UNKNOWN_TYPE: 44>
UNKNOWN_CAUSE = <CauseType.UNKNOWN_CAUSE: 45>
UNKNOWN_ASDU_ADDRESS = <CauseType.UNKNOWN_ASDU_ADDRESS: 46>
UNKNOWN_IO_ADDRESS = <CauseType.UNKNOWN_IO_ADDRESS: 47>
class Cause(typing.NamedTuple):
133class Cause(typing.NamedTuple):
134    type: CauseType | OtherCauseType
135    is_negative_confirm: bool
136    is_test: bool
137    originator_address: OriginatorAddress

Cause(type, is_negative_confirm, is_test, originator_address)

Cause( type: CauseType | int, is_negative_confirm: bool, is_test: bool, originator_address: int)

Create new instance of Cause(type, is_negative_confirm, is_test, originator_address)

type: CauseType | int

Alias for field number 0

is_negative_confirm: bool

Alias for field number 1

is_test: bool

Alias for field number 2

originator_address: int

Alias for field number 3

class QualityType(enum.Enum):
140class QualityType(enum.Enum):
141    INDICATION = 0
142    MEASUREMENT = 1
143    COUNTER = 2
144    PROTECTION = 3

An enumeration.

INDICATION = <QualityType.INDICATION: 0>
MEASUREMENT = <QualityType.MEASUREMENT: 1>
COUNTER = <QualityType.COUNTER: 2>
PROTECTION = <QualityType.PROTECTION: 3>
class IndicationQuality(typing.NamedTuple):
147class IndicationQuality(typing.NamedTuple):
148    invalid: bool
149    not_topical: bool
150    substituted: bool
151    blocked: bool

IndicationQuality(invalid, not_topical, substituted, blocked)

IndicationQuality(invalid: bool, not_topical: bool, substituted: bool, blocked: bool)

Create new instance of IndicationQuality(invalid, not_topical, substituted, blocked)

invalid: bool

Alias for field number 0

not_topical: bool

Alias for field number 1

substituted: bool

Alias for field number 2

blocked: bool

Alias for field number 3

class MeasurementQuality(typing.NamedTuple):
154class MeasurementQuality(typing.NamedTuple):
155    invalid: bool
156    not_topical: bool
157    substituted: bool
158    blocked: bool
159    overflow: bool

MeasurementQuality(invalid, not_topical, substituted, blocked, overflow)

MeasurementQuality( invalid: bool, not_topical: bool, substituted: bool, blocked: bool, overflow: bool)

Create new instance of MeasurementQuality(invalid, not_topical, substituted, blocked, overflow)

invalid: bool

Alias for field number 0

not_topical: bool

Alias for field number 1

substituted: bool

Alias for field number 2

blocked: bool

Alias for field number 3

overflow: bool

Alias for field number 4

class CounterQuality(typing.NamedTuple):
162class CounterQuality(typing.NamedTuple):
163    invalid: bool
164    adjusted: bool
165    overflow: bool
166    sequence: int
167    """sequence in range [0, 31]"""

CounterQuality(invalid, adjusted, overflow, sequence)

CounterQuality(invalid: bool, adjusted: bool, overflow: bool, sequence: int)

Create new instance of CounterQuality(invalid, adjusted, overflow, sequence)

invalid: bool

Alias for field number 0

adjusted: bool

Alias for field number 1

overflow: bool

Alias for field number 2

sequence: int

sequence in range [0, 31]

class ProtectionQuality(typing.NamedTuple):
170class ProtectionQuality(typing.NamedTuple):
171    invalid: bool
172    not_topical: bool
173    substituted: bool
174    blocked: bool
175    time_invalid: bool

ProtectionQuality(invalid, not_topical, substituted, blocked, time_invalid)

ProtectionQuality( invalid: bool, not_topical: bool, substituted: bool, blocked: bool, time_invalid: bool)

Create new instance of ProtectionQuality(invalid, not_topical, substituted, blocked, time_invalid)

invalid: bool

Alias for field number 0

not_topical: bool

Alias for field number 1

substituted: bool

Alias for field number 2

blocked: bool

Alias for field number 3

time_invalid: bool

Alias for field number 4

class FreezeCode(enum.Enum):
184class FreezeCode(enum.Enum):
185    READ = 0
186    FREEZE = 1
187    FREEZE_AND_RESET = 2
188    RESET = 3

An enumeration.

READ = <FreezeCode.READ: 0>
FREEZE = <FreezeCode.FREEZE: 1>
FREEZE_AND_RESET = <FreezeCode.FREEZE_AND_RESET: 2>
RESET = <FreezeCode.RESET: 3>
class SingleValue(enum.Enum):
191class SingleValue(enum.Enum):
192    OFF = 0
193    ON = 1

An enumeration.

OFF = <SingleValue.OFF: 0>
ON = <SingleValue.ON: 1>
class DoubleValue(enum.Enum):
196class DoubleValue(enum.Enum):
197    """DoubleDataValue
198
199    `FAULT` stands for value 3, defined in the protocol as *INDETERMINATE*.
200    This is in order to make it more distinguishable from ``INTERMEDIATE``.
201
202    """
203    INTERMEDIATE = 0
204    OFF = 1
205    ON = 2
206    FAULT = 3

DoubleDataValue

FAULT stands for value 3, defined in the protocol as INDETERMINATE. This is in order to make it more distinguishable from INTERMEDIATE.

INTERMEDIATE = <DoubleValue.INTERMEDIATE: 0>
OFF = <DoubleValue.OFF: 1>
ON = <DoubleValue.ON: 2>
FAULT = <DoubleValue.FAULT: 3>
class RegulatingValue(enum.Enum):
209class RegulatingValue(enum.Enum):
210    LOWER = 1
211    HIGHER = 2

An enumeration.

LOWER = <RegulatingValue.LOWER: 1>
HIGHER = <RegulatingValue.HIGHER: 2>
class StepPositionValue(typing.NamedTuple):
214class StepPositionValue(typing.NamedTuple):
215    value: int
216    """value in range [-64, 63]"""
217    transient: bool

StepPositionValue(value, transient)

StepPositionValue(value: int, transient: bool)

Create new instance of StepPositionValue(value, transient)

value: int

value in range [-64, 63]

transient: bool

Alias for field number 1

class BitstringValue(typing.NamedTuple):
220class BitstringValue(typing.NamedTuple):
221    value: util.Bytes
222    """bitstring encoded as 4 bytes"""

BitstringValue(value,)

BitstringValue(value: bytes | bytearray | memoryview)

Create new instance of BitstringValue(value,)

value: bytes | bytearray | memoryview

bitstring encoded as 4 bytes

class NormalizedValue(typing.NamedTuple):
225class NormalizedValue(typing.NamedTuple):
226    value: float
227    """value in range [-1.0, 1.0)"""

NormalizedValue(value,)

NormalizedValue(value: float)

Create new instance of NormalizedValue(value,)

value: float

value in range [-1.0, 1.0)

class ScaledValue(typing.NamedTuple):
230class ScaledValue(typing.NamedTuple):
231    value: int
232    """value in range [-2^15, 2^15-1]"""

ScaledValue(value,)

ScaledValue(value: int)

Create new instance of ScaledValue(value,)

value: int

value in range [-2^15, 2^15-1]

class FloatingValue(typing.NamedTuple):
235class FloatingValue(typing.NamedTuple):
236    value: float

FloatingValue(value,)

FloatingValue(value: float)

Create new instance of FloatingValue(value,)

value: float

Alias for field number 0

class BinaryCounterValue(typing.NamedTuple):
239class BinaryCounterValue(typing.NamedTuple):
240    value: int
241    """value in range [-2^31, 2^31-1]"""

BinaryCounterValue(value,)

BinaryCounterValue(value: int)

Create new instance of BinaryCounterValue(value,)

value: int

value in range [-2^31, 2^31-1]

class ProtectionValue(enum.Enum):
244class ProtectionValue(enum.Enum):
245    OFF = 1
246    ON = 2

An enumeration.

OFF = <ProtectionValue.OFF: 1>
ON = <ProtectionValue.ON: 2>
class ProtectionStartValue(typing.NamedTuple):
249class ProtectionStartValue(typing.NamedTuple):
250    general: bool
251    l1: bool
252    l2: bool
253    l3: bool
254    ie: bool
255    reverse: bool

ProtectionStartValue(general, l1, l2, l3, ie, reverse)

ProtectionStartValue(general: bool, l1: bool, l2: bool, l3: bool, ie: bool, reverse: bool)

Create new instance of ProtectionStartValue(general, l1, l2, l3, ie, reverse)

general: bool

Alias for field number 0

l1: bool

Alias for field number 1

l2: bool

Alias for field number 2

l3: bool

Alias for field number 3

ie: bool

Alias for field number 4

reverse: bool

Alias for field number 5

class ProtectionCommandValue(typing.NamedTuple):
258class ProtectionCommandValue(typing.NamedTuple):
259    general: bool
260    l1: bool
261    l2: bool
262    l3: bool

ProtectionCommandValue(general, l1, l2, l3)

ProtectionCommandValue(general: bool, l1: bool, l2: bool, l3: bool)

Create new instance of ProtectionCommandValue(general, l1, l2, l3)

general: bool

Alias for field number 0

l1: bool

Alias for field number 1

l2: bool

Alias for field number 2

l3: bool

Alias for field number 3

class StatusValue(typing.NamedTuple):
265class StatusValue(typing.NamedTuple):
266    value: list[bool]
267    """value length is 16"""
268    change: list[bool]
269    """change length is 16"""

StatusValue(value, change)

StatusValue(value: list[bool], change: list[bool])

Create new instance of StatusValue(value, change)

value: list[bool]

value length is 16

change: list[bool]

change length is 16

class IoElement_M_SP_NA(typing.NamedTuple):
272class IoElement_M_SP_NA(typing.NamedTuple):
273    value: SingleValue
274    quality: IndicationQuality

IoElement_M_SP_NA(value, quality)

IoElement_M_SP_NA( value: SingleValue, quality: IndicationQuality)

Create new instance of IoElement_M_SP_NA(value, quality)

value: SingleValue

Alias for field number 0

Alias for field number 1

class IoElement_M_SP_TA(typing.NamedTuple):
277class IoElement_M_SP_TA(typing.NamedTuple):
278    value: SingleValue
279    quality: IndicationQuality

IoElement_M_SP_TA(value, quality)

IoElement_M_SP_TA( value: SingleValue, quality: IndicationQuality)

Create new instance of IoElement_M_SP_TA(value, quality)

value: SingleValue

Alias for field number 0

Alias for field number 1

class IoElement_M_DP_NA(typing.NamedTuple):
282class IoElement_M_DP_NA(typing.NamedTuple):
283    value: DoubleValue
284    quality: IndicationQuality

IoElement_M_DP_NA(value, quality)

IoElement_M_DP_NA( value: DoubleValue, quality: IndicationQuality)

Create new instance of IoElement_M_DP_NA(value, quality)

value: DoubleValue

Alias for field number 0

Alias for field number 1

class IoElement_M_DP_TA(typing.NamedTuple):
287class IoElement_M_DP_TA(typing.NamedTuple):
288    value: DoubleValue
289    quality: IndicationQuality

IoElement_M_DP_TA(value, quality)

IoElement_M_DP_TA( value: DoubleValue, quality: IndicationQuality)

Create new instance of IoElement_M_DP_TA(value, quality)

value: DoubleValue

Alias for field number 0

Alias for field number 1

class IoElement_M_ST_NA(typing.NamedTuple):
292class IoElement_M_ST_NA(typing.NamedTuple):
293    value: StepPositionValue
294    quality: MeasurementQuality

IoElement_M_ST_NA(value, quality)

IoElement_M_ST_NA( value: StepPositionValue, quality: MeasurementQuality)

Create new instance of IoElement_M_ST_NA(value, quality)

Alias for field number 0

Alias for field number 1

class IoElement_M_ST_TA(typing.NamedTuple):
297class IoElement_M_ST_TA(typing.NamedTuple):
298    value: StepPositionValue
299    quality: MeasurementQuality

IoElement_M_ST_TA(value, quality)

IoElement_M_ST_TA( value: StepPositionValue, quality: MeasurementQuality)

Create new instance of IoElement_M_ST_TA(value, quality)

Alias for field number 0

Alias for field number 1

class IoElement_M_BO_NA(typing.NamedTuple):
302class IoElement_M_BO_NA(typing.NamedTuple):
303    value: BitstringValue
304    quality: MeasurementQuality

IoElement_M_BO_NA(value, quality)

IoElement_M_BO_NA( value: BitstringValue, quality: MeasurementQuality)

Create new instance of IoElement_M_BO_NA(value, quality)

Alias for field number 0

Alias for field number 1

class IoElement_M_BO_TA(typing.NamedTuple):
307class IoElement_M_BO_TA(typing.NamedTuple):
308    value: BitstringValue
309    quality: MeasurementQuality

IoElement_M_BO_TA(value, quality)

IoElement_M_BO_TA( value: BitstringValue, quality: MeasurementQuality)

Create new instance of IoElement_M_BO_TA(value, quality)

Alias for field number 0

Alias for field number 1

class IoElement_M_ME_NA(typing.NamedTuple):
312class IoElement_M_ME_NA(typing.NamedTuple):
313    value: NormalizedValue
314    quality: MeasurementQuality

IoElement_M_ME_NA(value, quality)

IoElement_M_ME_NA( value: NormalizedValue, quality: MeasurementQuality)

Create new instance of IoElement_M_ME_NA(value, quality)

Alias for field number 0

Alias for field number 1

class IoElement_M_ME_TA(typing.NamedTuple):
317class IoElement_M_ME_TA(typing.NamedTuple):
318    value: NormalizedValue
319    quality: MeasurementQuality

IoElement_M_ME_TA(value, quality)

IoElement_M_ME_TA( value: NormalizedValue, quality: MeasurementQuality)

Create new instance of IoElement_M_ME_TA(value, quality)

Alias for field number 0

Alias for field number 1

class IoElement_M_ME_NB(typing.NamedTuple):
322class IoElement_M_ME_NB(typing.NamedTuple):
323    value: ScaledValue
324    quality: MeasurementQuality

IoElement_M_ME_NB(value, quality)

IoElement_M_ME_NB( value: ScaledValue, quality: MeasurementQuality)

Create new instance of IoElement_M_ME_NB(value, quality)

value: ScaledValue

Alias for field number 0

Alias for field number 1

class IoElement_M_ME_TB(typing.NamedTuple):
327class IoElement_M_ME_TB(typing.NamedTuple):
328    value: ScaledValue
329    quality: MeasurementQuality

IoElement_M_ME_TB(value, quality)

IoElement_M_ME_TB( value: ScaledValue, quality: MeasurementQuality)

Create new instance of IoElement_M_ME_TB(value, quality)

value: ScaledValue

Alias for field number 0

Alias for field number 1

class IoElement_M_ME_NC(typing.NamedTuple):
332class IoElement_M_ME_NC(typing.NamedTuple):
333    value: FloatingValue
334    quality: MeasurementQuality

IoElement_M_ME_NC(value, quality)

IoElement_M_ME_NC( value: FloatingValue, quality: MeasurementQuality)

Create new instance of IoElement_M_ME_NC(value, quality)

value: FloatingValue

Alias for field number 0

Alias for field number 1

class IoElement_M_ME_TC(typing.NamedTuple):
337class IoElement_M_ME_TC(typing.NamedTuple):
338    value: FloatingValue
339    quality: MeasurementQuality

IoElement_M_ME_TC(value, quality)

IoElement_M_ME_TC( value: FloatingValue, quality: MeasurementQuality)

Create new instance of IoElement_M_ME_TC(value, quality)

value: FloatingValue

Alias for field number 0

Alias for field number 1

class IoElement_M_IT_NA(typing.NamedTuple):
342class IoElement_M_IT_NA(typing.NamedTuple):
343    value: BinaryCounterValue
344    quality: CounterQuality

IoElement_M_IT_NA(value, quality)

IoElement_M_IT_NA( value: BinaryCounterValue, quality: CounterQuality)

Create new instance of IoElement_M_IT_NA(value, quality)

Alias for field number 0

quality: CounterQuality

Alias for field number 1

class IoElement_M_IT_TA(typing.NamedTuple):
347class IoElement_M_IT_TA(typing.NamedTuple):
348    value: BinaryCounterValue
349    quality: CounterQuality

IoElement_M_IT_TA(value, quality)

IoElement_M_IT_TA( value: BinaryCounterValue, quality: CounterQuality)

Create new instance of IoElement_M_IT_TA(value, quality)

Alias for field number 0

quality: CounterQuality

Alias for field number 1

class IoElement_M_EP_TA(typing.NamedTuple):
352class IoElement_M_EP_TA(typing.NamedTuple):
353    value: ProtectionValue
354    quality: ProtectionQuality
355    elapsed_time: int
356    """elapsed_time in range [0, 65535]"""

IoElement_M_EP_TA(value, quality, elapsed_time)

IoElement_M_EP_TA( value: ProtectionValue, quality: ProtectionQuality, elapsed_time: int)

Create new instance of IoElement_M_EP_TA(value, quality, elapsed_time)

Alias for field number 0

Alias for field number 1

elapsed_time: int

elapsed_time in range [0, 65535]

class IoElement_M_EP_TB(typing.NamedTuple):
359class IoElement_M_EP_TB(typing.NamedTuple):
360    value: ProtectionStartValue
361    quality: ProtectionQuality
362    duration_time: int
363    """duration_time in range [0, 65535]"""

IoElement_M_EP_TB(value, quality, duration_time)

IoElement_M_EP_TB( value: ProtectionStartValue, quality: ProtectionQuality, duration_time: int)

Create new instance of IoElement_M_EP_TB(value, quality, duration_time)

Alias for field number 0

Alias for field number 1

duration_time: int

duration_time in range [0, 65535]

class IoElement_M_EP_TC(typing.NamedTuple):
366class IoElement_M_EP_TC(typing.NamedTuple):
367    value: ProtectionCommandValue
368    quality: ProtectionQuality
369    operating_time: int
370    """operating_time in range [0, 65535]"""

IoElement_M_EP_TC(value, quality, operating_time)

IoElement_M_EP_TC( value: ProtectionCommandValue, quality: ProtectionQuality, operating_time: int)

Create new instance of IoElement_M_EP_TC(value, quality, operating_time)

Alias for field number 0

Alias for field number 1

operating_time: int

operating_time in range [0, 65535]

class IoElement_M_PS_NA(typing.NamedTuple):
373class IoElement_M_PS_NA(typing.NamedTuple):
374    value: StatusValue
375    quality: MeasurementQuality

IoElement_M_PS_NA(value, quality)

IoElement_M_PS_NA( value: StatusValue, quality: MeasurementQuality)

Create new instance of IoElement_M_PS_NA(value, quality)

value: StatusValue

Alias for field number 0

Alias for field number 1

class IoElement_M_ME_ND(typing.NamedTuple):
378class IoElement_M_ME_ND(typing.NamedTuple):
379    value: NormalizedValue

IoElement_M_ME_ND(value,)

IoElement_M_ME_ND(value: NormalizedValue)

Create new instance of IoElement_M_ME_ND(value,)

Alias for field number 0

class IoElement_M_SP_TB(typing.NamedTuple):
382class IoElement_M_SP_TB(typing.NamedTuple):
383    value: SingleValue
384    quality: IndicationQuality

IoElement_M_SP_TB(value, quality)

IoElement_M_SP_TB( value: SingleValue, quality: IndicationQuality)

Create new instance of IoElement_M_SP_TB(value, quality)

value: SingleValue

Alias for field number 0

Alias for field number 1

class IoElement_M_DP_TB(typing.NamedTuple):
387class IoElement_M_DP_TB(typing.NamedTuple):
388    value: DoubleValue
389    quality: IndicationQuality

IoElement_M_DP_TB(value, quality)

IoElement_M_DP_TB( value: DoubleValue, quality: IndicationQuality)

Create new instance of IoElement_M_DP_TB(value, quality)

value: DoubleValue

Alias for field number 0

Alias for field number 1

class IoElement_M_ST_TB(typing.NamedTuple):
392class IoElement_M_ST_TB(typing.NamedTuple):
393    value: StepPositionValue
394    quality: MeasurementQuality

IoElement_M_ST_TB(value, quality)

IoElement_M_ST_TB( value: StepPositionValue, quality: MeasurementQuality)

Create new instance of IoElement_M_ST_TB(value, quality)

Alias for field number 0

Alias for field number 1

class IoElement_M_BO_TB(typing.NamedTuple):
397class IoElement_M_BO_TB(typing.NamedTuple):
398    value: BitstringValue
399    quality: MeasurementQuality

IoElement_M_BO_TB(value, quality)

IoElement_M_BO_TB( value: BitstringValue, quality: MeasurementQuality)

Create new instance of IoElement_M_BO_TB(value, quality)

Alias for field number 0

Alias for field number 1

class IoElement_M_ME_TD(typing.NamedTuple):
402class IoElement_M_ME_TD(typing.NamedTuple):
403    value: NormalizedValue
404    quality: MeasurementQuality

IoElement_M_ME_TD(value, quality)

IoElement_M_ME_TD( value: NormalizedValue, quality: MeasurementQuality)

Create new instance of IoElement_M_ME_TD(value, quality)

Alias for field number 0

Alias for field number 1

class IoElement_M_ME_TE(typing.NamedTuple):
407class IoElement_M_ME_TE(typing.NamedTuple):
408    value: ScaledValue
409    quality: MeasurementQuality

IoElement_M_ME_TE(value, quality)

IoElement_M_ME_TE( value: ScaledValue, quality: MeasurementQuality)

Create new instance of IoElement_M_ME_TE(value, quality)

value: ScaledValue

Alias for field number 0

Alias for field number 1

class IoElement_M_ME_TF(typing.NamedTuple):
412class IoElement_M_ME_TF(typing.NamedTuple):
413    value: FloatingValue
414    quality: MeasurementQuality

IoElement_M_ME_TF(value, quality)

IoElement_M_ME_TF( value: FloatingValue, quality: MeasurementQuality)

Create new instance of IoElement_M_ME_TF(value, quality)

value: FloatingValue

Alias for field number 0

Alias for field number 1

class IoElement_M_IT_TB(typing.NamedTuple):
417class IoElement_M_IT_TB(typing.NamedTuple):
418    value: BinaryCounterValue
419    quality: CounterQuality

IoElement_M_IT_TB(value, quality)

IoElement_M_IT_TB( value: BinaryCounterValue, quality: CounterQuality)

Create new instance of IoElement_M_IT_TB(value, quality)

Alias for field number 0

quality: CounterQuality

Alias for field number 1

class IoElement_M_EP_TD(typing.NamedTuple):
422class IoElement_M_EP_TD(typing.NamedTuple):
423    value: ProtectionValue
424    quality: ProtectionQuality
425    elapsed_time: int
426    """elapsed_time in range [0, 65535]"""

IoElement_M_EP_TD(value, quality, elapsed_time)

IoElement_M_EP_TD( value: ProtectionValue, quality: ProtectionQuality, elapsed_time: int)

Create new instance of IoElement_M_EP_TD(value, quality, elapsed_time)

Alias for field number 0

Alias for field number 1

elapsed_time: int

elapsed_time in range [0, 65535]

class IoElement_M_EP_TE(typing.NamedTuple):
429class IoElement_M_EP_TE(typing.NamedTuple):
430    value: ProtectionStartValue
431    quality: ProtectionQuality
432    duration_time: int
433    """duration_time in range [0, 65535]"""

IoElement_M_EP_TE(value, quality, duration_time)

IoElement_M_EP_TE( value: ProtectionStartValue, quality: ProtectionQuality, duration_time: int)

Create new instance of IoElement_M_EP_TE(value, quality, duration_time)

Alias for field number 0

Alias for field number 1

duration_time: int

duration_time in range [0, 65535]

class IoElement_M_EP_TF(typing.NamedTuple):
436class IoElement_M_EP_TF(typing.NamedTuple):
437    value: ProtectionCommandValue
438    quality: ProtectionQuality
439    operating_time: int
440    """operating_time in range [0, 65535]"""

IoElement_M_EP_TF(value, quality, operating_time)

IoElement_M_EP_TF( value: ProtectionCommandValue, quality: ProtectionQuality, operating_time: int)

Create new instance of IoElement_M_EP_TF(value, quality, operating_time)

Alias for field number 0

Alias for field number 1

operating_time: int

operating_time in range [0, 65535]

class IoElement_C_SC_NA(typing.NamedTuple):
443class IoElement_C_SC_NA(typing.NamedTuple):
444    value: SingleValue
445    select: bool
446    qualifier: int
447    """qualifier in range [0, 31]"""

IoElement_C_SC_NA(value, select, qualifier)

IoElement_C_SC_NA( value: SingleValue, select: bool, qualifier: int)

Create new instance of IoElement_C_SC_NA(value, select, qualifier)

value: SingleValue

Alias for field number 0

select: bool

Alias for field number 1

qualifier: int

qualifier in range [0, 31]

class IoElement_C_DC_NA(typing.NamedTuple):
450class IoElement_C_DC_NA(typing.NamedTuple):
451    value: DoubleValue
452    select: bool
453    qualifier: int
454    """qualifier in range [0, 31]"""

IoElement_C_DC_NA(value, select, qualifier)

IoElement_C_DC_NA( value: DoubleValue, select: bool, qualifier: int)

Create new instance of IoElement_C_DC_NA(value, select, qualifier)

value: DoubleValue

Alias for field number 0

select: bool

Alias for field number 1

qualifier: int

qualifier in range [0, 31]

class IoElement_C_RC_NA(typing.NamedTuple):
457class IoElement_C_RC_NA(typing.NamedTuple):
458    value: RegulatingValue
459    select: bool
460    qualifier: int
461    """qualifier in range [0, 31]"""

IoElement_C_RC_NA(value, select, qualifier)

IoElement_C_RC_NA( value: RegulatingValue, select: bool, qualifier: int)

Create new instance of IoElement_C_RC_NA(value, select, qualifier)

Alias for field number 0

select: bool

Alias for field number 1

qualifier: int

qualifier in range [0, 31]

class IoElement_C_SE_NA(typing.NamedTuple):
464class IoElement_C_SE_NA(typing.NamedTuple):
465    value: NormalizedValue
466    select: bool

IoElement_C_SE_NA(value, select)

IoElement_C_SE_NA( value: NormalizedValue, select: bool)

Create new instance of IoElement_C_SE_NA(value, select)

Alias for field number 0

select: bool

Alias for field number 1

class IoElement_C_SE_NB(typing.NamedTuple):
469class IoElement_C_SE_NB(typing.NamedTuple):
470    value: ScaledValue
471    select: bool

IoElement_C_SE_NB(value, select)

IoElement_C_SE_NB( value: ScaledValue, select: bool)

Create new instance of IoElement_C_SE_NB(value, select)

value: ScaledValue

Alias for field number 0

select: bool

Alias for field number 1

class IoElement_C_SE_NC(typing.NamedTuple):
474class IoElement_C_SE_NC(typing.NamedTuple):
475    value: FloatingValue
476    select: bool

IoElement_C_SE_NC(value, select)

IoElement_C_SE_NC( value: FloatingValue, select: bool)

Create new instance of IoElement_C_SE_NC(value, select)

value: FloatingValue

Alias for field number 0

select: bool

Alias for field number 1

class IoElement_C_BO_NA(typing.NamedTuple):
479class IoElement_C_BO_NA(typing.NamedTuple):
480    value: BitstringValue

IoElement_C_BO_NA(value,)

IoElement_C_BO_NA(value: BitstringValue)

Create new instance of IoElement_C_BO_NA(value,)

Alias for field number 0

class IoElement_M_EI_NA(typing.NamedTuple):
483class IoElement_M_EI_NA(typing.NamedTuple):
484    param_change: bool
485    cause: int
486    """cause in range [0, 127]"""

IoElement_M_EI_NA(param_change, cause)

IoElement_M_EI_NA(param_change: bool, cause: int)

Create new instance of IoElement_M_EI_NA(param_change, cause)

param_change: bool

Alias for field number 0

cause: int

cause in range [0, 127]

class IoElement_C_IC_NA(typing.NamedTuple):
489class IoElement_C_IC_NA(typing.NamedTuple):
490    qualifier: int
491    """qualifier in range [0, 255]"""

IoElement_C_IC_NA(qualifier,)

IoElement_C_IC_NA(qualifier: int)

Create new instance of IoElement_C_IC_NA(qualifier,)

qualifier: int

qualifier in range [0, 255]

class IoElement_C_CI_NA(typing.NamedTuple):
494class IoElement_C_CI_NA(typing.NamedTuple):
495    request: int
496    """request in range [0, 63]"""
497    freeze: FreezeCode

IoElement_C_CI_NA(request, freeze)

IoElement_C_CI_NA( request: int, freeze: FreezeCode)

Create new instance of IoElement_C_CI_NA(request, freeze)

request: int

request in range [0, 63]

freeze: FreezeCode

Alias for field number 1

class IoElement_C_RD_NA(typing.NamedTuple):
500class IoElement_C_RD_NA(typing.NamedTuple):
501    pass

IoElement_C_RD_NA()

IoElement_C_RD_NA()

Create new instance of IoElement_C_RD_NA()

class IoElement_C_CS_NA(typing.NamedTuple):
504class IoElement_C_CS_NA(typing.NamedTuple):
505    time: Time
506    """time size is SEVEN"""

IoElement_C_CS_NA(time,)

IoElement_C_CS_NA(time: Time)

Create new instance of IoElement_C_CS_NA(time,)

time: Time

time size is SEVEN

class IoElement_C_TS_NA(typing.NamedTuple):
509class IoElement_C_TS_NA(typing.NamedTuple):
510    pass

IoElement_C_TS_NA()

IoElement_C_TS_NA()

Create new instance of IoElement_C_TS_NA()

class IoElement_C_RP_NA(typing.NamedTuple):
513class IoElement_C_RP_NA(typing.NamedTuple):
514    qualifier: int
515    """qualifier in range [0, 255]"""

IoElement_C_RP_NA(qualifier,)

IoElement_C_RP_NA(qualifier: int)

Create new instance of IoElement_C_RP_NA(qualifier,)

qualifier: int

qualifier in range [0, 255]

class IoElement_C_CD_NA(typing.NamedTuple):
518class IoElement_C_CD_NA(typing.NamedTuple):
519    time: int
520    """time in range [0, 65535]"""

IoElement_C_CD_NA(time,)

IoElement_C_CD_NA(time: int)

Create new instance of IoElement_C_CD_NA(time,)

time: int

time in range [0, 65535]

class IoElement_P_ME_NA(typing.NamedTuple):
523class IoElement_P_ME_NA(typing.NamedTuple):
524    value: NormalizedValue
525    qualifier: int
526    """qualifier in range [0, 255]"""

IoElement_P_ME_NA(value, qualifier)

IoElement_P_ME_NA( value: NormalizedValue, qualifier: int)

Create new instance of IoElement_P_ME_NA(value, qualifier)

Alias for field number 0

qualifier: int

qualifier in range [0, 255]

class IoElement_P_ME_NB(typing.NamedTuple):
529class IoElement_P_ME_NB(typing.NamedTuple):
530    value: ScaledValue
531    qualifier: int
532    """qualifier in range [0, 255]"""

IoElement_P_ME_NB(value, qualifier)

IoElement_P_ME_NB( value: ScaledValue, qualifier: int)

Create new instance of IoElement_P_ME_NB(value, qualifier)

value: ScaledValue

Alias for field number 0

qualifier: int

qualifier in range [0, 255]

class IoElement_P_ME_NC(typing.NamedTuple):
535class IoElement_P_ME_NC(typing.NamedTuple):
536    value: FloatingValue
537    qualifier: int
538    """qualifier in range [0, 255]"""

IoElement_P_ME_NC(value, qualifier)

IoElement_P_ME_NC( value: FloatingValue, qualifier: int)

Create new instance of IoElement_P_ME_NC(value, qualifier)

value: FloatingValue

Alias for field number 0

qualifier: int

qualifier in range [0, 255]

class IoElement_P_AC_NA(typing.NamedTuple):
541class IoElement_P_AC_NA(typing.NamedTuple):
542    qualifier: int
543    """qualifier in range [0, 255]"""

IoElement_P_AC_NA(qualifier,)

IoElement_P_AC_NA(qualifier: int)

Create new instance of IoElement_P_AC_NA(qualifier,)

qualifier: int

qualifier in range [0, 255]

class IoElement_F_FR_NA(typing.NamedTuple):
546class IoElement_F_FR_NA(typing.NamedTuple):
547    file_name: int
548    """file_name in range [0, 65535]"""
549    file_length: int
550    """file_length in range [0, 16777215]"""
551    ready: bool

IoElement_F_FR_NA(file_name, file_length, ready)

IoElement_F_FR_NA(file_name: int, file_length: int, ready: bool)

Create new instance of IoElement_F_FR_NA(file_name, file_length, ready)

file_name: int

file_name in range [0, 65535]

file_length: int

file_length in range [0, 16777215]

ready: bool

Alias for field number 2

class IoElement_F_SR_NA(typing.NamedTuple):
554class IoElement_F_SR_NA(typing.NamedTuple):
555    file_name: int
556    """file_name in range [0, 65535]"""
557    section_name: int
558    """section_name in range [0, 255]"""
559    section_length: int
560    """section_length in range [0, 16777215]"""
561    ready: bool

IoElement_F_SR_NA(file_name, section_name, section_length, ready)

IoElement_F_SR_NA(file_name: int, section_name: int, section_length: int, ready: bool)

Create new instance of IoElement_F_SR_NA(file_name, section_name, section_length, ready)

file_name: int

file_name in range [0, 65535]

section_name: int

section_name in range [0, 255]

section_length: int

section_length in range [0, 16777215]

ready: bool

Alias for field number 3

class IoElement_F_SC_NA(typing.NamedTuple):
564class IoElement_F_SC_NA(typing.NamedTuple):
565    file_name: int
566    """file_name in range [0, 65535]"""
567    section_name: int
568    """section_name in range [0, 255]"""
569    qualifier: int
570    """qualifier in range [0, 255]"""

IoElement_F_SC_NA(file_name, section_name, qualifier)

IoElement_F_SC_NA(file_name: int, section_name: int, qualifier: int)

Create new instance of IoElement_F_SC_NA(file_name, section_name, qualifier)

file_name: int

file_name in range [0, 65535]

section_name: int

section_name in range [0, 255]

qualifier: int

qualifier in range [0, 255]

class IoElement_F_LS_NA(typing.NamedTuple):
573class IoElement_F_LS_NA(typing.NamedTuple):
574    file_name: int
575    """file_name in range [0, 65535]"""
576    section_name: int
577    """section_name in range [0, 255]"""
578    last_qualifier: int
579    """last_qualifier in range [0, 255]"""
580    checksum: int
581    """checksum in range [0, 255]"""

IoElement_F_LS_NA(file_name, section_name, last_qualifier, checksum)

IoElement_F_LS_NA( file_name: int, section_name: int, last_qualifier: int, checksum: int)

Create new instance of IoElement_F_LS_NA(file_name, section_name, last_qualifier, checksum)

file_name: int

file_name in range [0, 65535]

section_name: int

section_name in range [0, 255]

last_qualifier: int

last_qualifier in range [0, 255]

checksum: int

checksum in range [0, 255]

class IoElement_F_AF_NA(typing.NamedTuple):
584class IoElement_F_AF_NA(typing.NamedTuple):
585    file_name: int
586    """file_name in range [0, 65535]"""
587    section_name: int
588    """section_name in range [0, 255]"""
589    qualifier: int
590    """qualifier in range [0, 255]"""

IoElement_F_AF_NA(file_name, section_name, qualifier)

IoElement_F_AF_NA(file_name: int, section_name: int, qualifier: int)

Create new instance of IoElement_F_AF_NA(file_name, section_name, qualifier)

file_name: int

file_name in range [0, 65535]

section_name: int

section_name in range [0, 255]

qualifier: int

qualifier in range [0, 255]

class IoElement_F_SG_NA(typing.NamedTuple):
593class IoElement_F_SG_NA(typing.NamedTuple):
594    file_name: int
595    """file_name in range [0, 65535]"""
596    section_name: int
597    """section_name in range [0, 255]"""
598    segment: util.Bytes

IoElement_F_SG_NA(file_name, section_name, segment)

IoElement_F_SG_NA( file_name: int, section_name: int, segment: bytes | bytearray | memoryview)

Create new instance of IoElement_F_SG_NA(file_name, section_name, segment)

file_name: int

file_name in range [0, 65535]

section_name: int

section_name in range [0, 255]

segment: bytes | bytearray | memoryview

Alias for field number 2

class IoElement_F_DR_TA(typing.NamedTuple):
601class IoElement_F_DR_TA(typing.NamedTuple):
602    file_name: int
603    """file_name in range [0, 65535]"""
604    file_length: int
605    """file_length in range [0, 16777215]"""
606    more_follows: bool
607    is_directory: bool
608    transfer_active: bool
609    creation_time: Time

IoElement_F_DR_TA(file_name, file_length, more_follows, is_directory, transfer_active, creation_time)

IoElement_F_DR_TA( file_name: int, file_length: int, more_follows: bool, is_directory: bool, transfer_active: bool, creation_time: Time)

Create new instance of IoElement_F_DR_TA(file_name, file_length, more_follows, is_directory, transfer_active, creation_time)

file_name: int

file_name in range [0, 65535]

file_length: int

file_length in range [0, 16777215]

more_follows: bool

Alias for field number 2

is_directory: bool

Alias for field number 3

transfer_active: bool

Alias for field number 4

creation_time: Time

Alias for field number 5

class IO(typing.NamedTuple):
672class IO(typing.NamedTuple):
673    address: IoAddress
674    elements: list[IoElement]
675    time: Time | None

IO(address, elements, time)

address: int

Alias for field number 0

time: Time | None

Alias for field number 2

class ASDU(typing.NamedTuple):
678class ASDU(typing.NamedTuple):
679    type: AsduType
680    cause: Cause
681    address: AsduAddress
682    ios: list[IO]

ASDU(type, cause, address, ios)

ASDU( type: AsduType, cause: Cause, address: int, ios: list[IO])

Create new instance of ASDU(type, cause, address, ios)

type: AsduType

Alias for field number 0

cause: Cause

Alias for field number 1

address: int

Alias for field number 2

ios: list[IO]

Alias for field number 3

class Encoder:
12class Encoder:
13
14    def __init__(self,
15                 cause_size: common.CauseSize,
16                 asdu_address_size: common.AsduAddressSize,
17                 io_address_size: common.IoAddressSize,
18                 max_asdu_size: int = 252):
19        self._cause_size = cause_size
20        self._max_asdu_size = max_asdu_size
21        self._encoder = encoder.Encoder(
22            cause_size=cause_size,
23            asdu_address_size=asdu_address_size,
24            io_address_size=io_address_size,
25            asdu_type_time_sizes=asdu_type_time_sizes,
26            inverted_sequence_bit=False,
27            decode_io_element_cb=decode_io_element,
28            encode_io_element_cb=encode_io_element)
29
30    @property
31    def max_asdu_size(self) -> int:
32        return self._max_asdu_size
33
34    @property
35    def cause_size(self) -> common.CauseSize:
36        return self._encoder.cause_size
37
38    @property
39    def asdu_address_size(self) -> common.AsduAddressSize:
40        return self._encoder.asdu_address_size
41
42    @property
43    def io_address_size(self) -> common.IoAddressSize:
44        return self._encoder.io_address_size
45
46    def decode_asdu(self,
47                    asdu_bytes: util.Bytes
48                    ) -> tuple[common.ASDU, util.Bytes]:
49        asdu, rest = self._encoder.decode_asdu(asdu_bytes)
50
51        asdu_type = _decode_asdu_type(asdu.type)
52
53        cause = decode_cause(asdu.cause, self._cause_size)
54        address = asdu.address
55        ios = [common.IO(address=io.address,
56                         elements=io.elements,
57                         time=io.time)
58               for io in asdu.ios]
59
60        asdu = common.ASDU(type=asdu_type,
61                           cause=cause,
62                           address=address,
63                           ios=ios)
64        return asdu, rest
65
66    def encode_asdu(self, asdu: common.ASDU) -> util.Bytes:
67        asdu_type = asdu.type.value
68        cause = encode_cause(asdu.cause, self._cause_size)
69        address = asdu.address
70        ios = [encoder.common.IO(address=io.address,
71                                 elements=io.elements,
72                                 time=io.time)
73               for io in asdu.ios]
74
75        asdu = encoder.common.ASDU(type=asdu_type,
76                                   cause=cause,
77                                   address=address,
78                                   ios=ios)
79
80        return self._encoder.encode_asdu(asdu)
Encoder( cause_size: CauseSize, asdu_address_size: AsduAddressSize, io_address_size: IoAddressSize, max_asdu_size: int = 252)
14    def __init__(self,
15                 cause_size: common.CauseSize,
16                 asdu_address_size: common.AsduAddressSize,
17                 io_address_size: common.IoAddressSize,
18                 max_asdu_size: int = 252):
19        self._cause_size = cause_size
20        self._max_asdu_size = max_asdu_size
21        self._encoder = encoder.Encoder(
22            cause_size=cause_size,
23            asdu_address_size=asdu_address_size,
24            io_address_size=io_address_size,
25            asdu_type_time_sizes=asdu_type_time_sizes,
26            inverted_sequence_bit=False,
27            decode_io_element_cb=decode_io_element,
28            encode_io_element_cb=encode_io_element)
max_asdu_size: int
30    @property
31    def max_asdu_size(self) -> int:
32        return self._max_asdu_size
cause_size: CauseSize
34    @property
35    def cause_size(self) -> common.CauseSize:
36        return self._encoder.cause_size
asdu_address_size: AsduAddressSize
38    @property
39    def asdu_address_size(self) -> common.AsduAddressSize:
40        return self._encoder.asdu_address_size
io_address_size: IoAddressSize
42    @property
43    def io_address_size(self) -> common.IoAddressSize:
44        return self._encoder.io_address_size
def decode_asdu( self, asdu_bytes: bytes | bytearray | memoryview) -> tuple[ASDU, bytes | bytearray | memoryview]:
46    def decode_asdu(self,
47                    asdu_bytes: util.Bytes
48                    ) -> tuple[common.ASDU, util.Bytes]:
49        asdu, rest = self._encoder.decode_asdu(asdu_bytes)
50
51        asdu_type = _decode_asdu_type(asdu.type)
52
53        cause = decode_cause(asdu.cause, self._cause_size)
54        address = asdu.address
55        ios = [common.IO(address=io.address,
56                         elements=io.elements,
57                         time=io.time)
58               for io in asdu.ios]
59
60        asdu = common.ASDU(type=asdu_type,
61                           cause=cause,
62                           address=address,
63                           ios=ios)
64        return asdu, rest
def encode_asdu( self, asdu: ASDU) -> bytes | bytearray | memoryview:
66    def encode_asdu(self, asdu: common.ASDU) -> util.Bytes:
67        asdu_type = asdu.type.value
68        cause = encode_cause(asdu.cause, self._cause_size)
69        address = asdu.address
70        ios = [encoder.common.IO(address=io.address,
71                                 elements=io.elements,
72                                 time=io.time)
73               for io in asdu.ios]
74
75        asdu = encoder.common.ASDU(type=asdu_type,
76                                   cause=cause,
77                                   address=address,
78                                   ios=ios)
79
80        return self._encoder.encode_asdu(asdu)
asdu_type_time_sizes = {2: <TimeSize.THREE: 3>, 4: <TimeSize.THREE: 3>, 6: <TimeSize.THREE: 3>, 8: <TimeSize.THREE: 3>, 10: <TimeSize.THREE: 3>, 12: <TimeSize.THREE: 3>, 14: <TimeSize.THREE: 3>, 16: <TimeSize.THREE: 3>, 17: <TimeSize.THREE: 3>, 18: <TimeSize.THREE: 3>, 19: <TimeSize.THREE: 3>, 30: <TimeSize.SEVEN: 7>, 31: <TimeSize.SEVEN: 7>, 32: <TimeSize.SEVEN: 7>, 33: <TimeSize.SEVEN: 7>, 34: <TimeSize.SEVEN: 7>, 35: <TimeSize.SEVEN: 7>, 36: <TimeSize.SEVEN: 7>, 37: <TimeSize.SEVEN: 7>, 38: <TimeSize.SEVEN: 7>, 39: <TimeSize.SEVEN: 7>, 40: <TimeSize.SEVEN: 7>}
def decode_cause( cause: int, cause_size: CauseSize) -> Cause:
107def decode_cause(cause: int,
108                 cause_size: common.CauseSize
109                 ) -> common.Cause:
110    cause_type = decode_cause_type(cause & 0x3F)
111    is_negative_confirm = bool(cause & 0x40)
112    is_test = bool(cause & 0x80)
113
114    if cause_size == common.CauseSize.ONE:
115        originator_address = 0
116
117    elif cause_size == common.CauseSize.TWO:
118        originator_address = cause >> 8
119
120    else:
121        raise ValueError('unsupported cause size')
122
123    return common.Cause(type=cause_type,
124                        is_negative_confirm=is_negative_confirm,
125                        is_test=is_test,
126                        originator_address=originator_address)
def encode_cause( cause: Cause, cause_size: CauseSize) -> int:
129def encode_cause(cause: common.Cause,
130                 cause_size: common.CauseSize
131                 ) -> int:
132    result = ((0x80 if cause.is_test else 0) |
133              (0x40 if cause.is_negative_confirm else 0) |
134              encode_cause_type(cause.type))
135
136    if cause_size == common.CauseSize.ONE:
137        return result
138
139    if cause_size == common.CauseSize.TWO:
140        return result | (cause.originator_address << 8)
141
142    raise ValueError('unsupported cause size')
def decode_cause_type( value: int) -> CauseType | int:
145def decode_cause_type(value: int
146                      ) -> common.CauseType | common.OtherCauseType:
147    with contextlib.suppress(ValueError):
148        return common.CauseType(value)
149    return value
def encode_cause_type( cause_type: CauseType | int) -> int:
152def encode_cause_type(cause_type: common.CauseType | common.OtherCauseType
153                      ) -> int:
154    return (cause_type.value if isinstance(cause_type, common.CauseType)
155            else cause_type)
158def decode_io_element(io_bytes: util.Bytes,
159                      asdu_type: int
160                      ) -> tuple[common.IoElement, util.Bytes]:
161    asdu_type = _decode_asdu_type(asdu_type)
162
163    if asdu_type == common.AsduType.M_SP_NA:
164        value = common.SingleValue(io_bytes[0] & 1)
165        quality, io_bytes = decode_quality(io_bytes,
166                                           common.QualityType.INDICATION)
167
168        element = common.IoElement_M_SP_NA(value=value,
169                                           quality=quality)
170        return element, io_bytes
171
172    if asdu_type == common.AsduType.M_SP_TA:
173        value = common.SingleValue(io_bytes[0] & 1)
174        quality, io_bytes = decode_quality(io_bytes,
175                                           common.QualityType.INDICATION)
176
177        element = common.IoElement_M_SP_TA(value=value,
178                                           quality=quality)
179        return element, io_bytes
180
181    if asdu_type == common.AsduType.M_DP_NA:
182        value = common.DoubleValue(io_bytes[0] & 3)
183        quality, io_bytes = decode_quality(io_bytes,
184                                           common.QualityType.INDICATION)
185
186        element = common.IoElement_M_DP_NA(value=value,
187                                           quality=quality)
188        return element, io_bytes
189
190    if asdu_type == common.AsduType.M_DP_TA:
191        value = common.DoubleValue(io_bytes[0] & 3)
192        quality, io_bytes = decode_quality(io_bytes,
193                                           common.QualityType.INDICATION)
194
195        element = common.IoElement_M_DP_TA(value=value,
196                                           quality=quality)
197        return element, io_bytes
198
199    if asdu_type == common.AsduType.M_ST_NA:
200        value, io_bytes = decode_step_position_value(io_bytes)
201        quality, io_bytes = decode_quality(io_bytes,
202                                           common.QualityType.MEASUREMENT)
203
204        element = common.IoElement_M_ST_NA(value=value,
205                                           quality=quality)
206        return element, io_bytes
207
208    if asdu_type == common.AsduType.M_ST_TA:
209        value, io_bytes = decode_step_position_value(io_bytes)
210        quality, io_bytes = decode_quality(io_bytes,
211                                           common.QualityType.MEASUREMENT)
212
213        element = common.IoElement_M_ST_TA(value=value,
214                                           quality=quality)
215        return element, io_bytes
216
217    if asdu_type == common.AsduType.M_BO_NA:
218        value, io_bytes = decode_bitstring_value(io_bytes)
219        quality, io_bytes = decode_quality(io_bytes,
220                                           common.QualityType.MEASUREMENT)
221
222        element = common.IoElement_M_BO_NA(value=value,
223                                           quality=quality)
224        return element, io_bytes
225
226    if asdu_type == common.AsduType.M_BO_TA:
227        value, io_bytes = decode_bitstring_value(io_bytes)
228        quality, io_bytes = decode_quality(io_bytes,
229                                           common.QualityType.MEASUREMENT)
230
231        element = common.IoElement_M_BO_NA(value=value,
232                                           quality=quality)
233        return element, io_bytes
234
235    if asdu_type == common.AsduType.M_ME_NA:
236        value, io_bytes = decode_normalized_value(io_bytes)
237        quality, io_bytes = decode_quality(io_bytes,
238                                           common.QualityType.MEASUREMENT)
239
240        element = common.IoElement_M_ME_NA(value=value,
241                                           quality=quality)
242        return element, io_bytes
243
244    if asdu_type == common.AsduType.M_ME_TA:
245        value, io_bytes = decode_normalized_value(io_bytes)
246        quality, io_bytes = decode_quality(io_bytes,
247                                           common.QualityType.MEASUREMENT)
248
249        element = common.IoElement_M_ME_TA(value=value,
250                                           quality=quality)
251        return element, io_bytes
252
253    if asdu_type == common.AsduType.M_ME_NB:
254        value, io_bytes = decode_scaled_value(io_bytes)
255        quality, io_bytes = decode_quality(io_bytes,
256                                           common.QualityType.MEASUREMENT)
257
258        element = common.IoElement_M_ME_NB(value=value,
259                                           quality=quality)
260        return element, io_bytes
261
262    if asdu_type == common.AsduType.M_ME_TB:
263        value, io_bytes = decode_scaled_value(io_bytes)
264        quality, io_bytes = decode_quality(io_bytes,
265                                           common.QualityType.MEASUREMENT)
266
267        element = common.IoElement_M_ME_TB(value=value,
268                                           quality=quality)
269        return element, io_bytes
270
271    if asdu_type == common.AsduType.M_ME_NC:
272        value, io_bytes = decode_floating_value(io_bytes)
273        quality, io_bytes = decode_quality(io_bytes,
274                                           common.QualityType.MEASUREMENT)
275
276        element = common.IoElement_M_ME_NC(value=value,
277                                           quality=quality)
278        return element, io_bytes
279
280    if asdu_type == common.AsduType.M_ME_TC:
281        value, io_bytes = decode_floating_value(io_bytes)
282        quality, io_bytes = decode_quality(io_bytes,
283                                           common.QualityType.MEASUREMENT)
284
285        element = common.IoElement_M_ME_TC(value=value,
286                                           quality=quality)
287        return element, io_bytes
288
289    if asdu_type == common.AsduType.M_IT_NA:
290        value, io_bytes = decode_binary_counter_value(io_bytes)
291        quality, io_bytes = decode_quality(io_bytes,
292                                           common.QualityType.COUNTER)
293
294        element = common.IoElement_M_IT_NA(value=value,
295                                           quality=quality)
296        return element, io_bytes
297
298    if asdu_type == common.AsduType.M_IT_TA:
299        value, io_bytes = decode_binary_counter_value(io_bytes)
300        quality, io_bytes = decode_quality(io_bytes,
301                                           common.QualityType.COUNTER)
302
303        element = common.IoElement_M_IT_TA(value=value,
304                                           quality=quality)
305        return element, io_bytes
306
307    if asdu_type == common.AsduType.M_EP_TA:
308        value = common.ProtectionValue(io_bytes[0] & 0x03)
309        quality, io_bytes = decode_quality(io_bytes,
310                                           common.QualityType.PROTECTION)
311        elapsed_time = int.from_bytes(io_bytes[:2], 'little')
312        io_bytes = io_bytes[2:]
313
314        element = common.IoElement_M_EP_TA(value=value,
315                                           quality=quality,
316                                           elapsed_time=elapsed_time)
317        return element, io_bytes
318
319    if asdu_type == common.AsduType.M_EP_TB:
320        value, io_bytes = decode_protection_start_value(io_bytes)
321        quality, io_bytes = decode_quality(io_bytes,
322                                           common.QualityType.PROTECTION)
323        duration_time = int.from_bytes(io_bytes[:2], 'little')
324        io_bytes = io_bytes[2:]
325
326        element = common.IoElement_M_EP_TB(value=value,
327                                           quality=quality,
328                                           duration_time=duration_time)
329        return element, io_bytes
330
331    if asdu_type == common.AsduType.M_EP_TC:
332        value, io_bytes = decode_protection_command_value(io_bytes)
333        quality, io_bytes = decode_quality(io_bytes,
334                                           common.QualityType.PROTECTION)
335        operating_time = int.from_bytes(io_bytes[:2], 'little')
336        io_bytes = io_bytes[2:]
337
338        element = common.IoElement_M_EP_TC(value=value,
339                                           quality=quality,
340                                           operating_time=operating_time)
341        return element, io_bytes
342
343    if asdu_type == common.AsduType.M_PS_NA:
344        value, io_bytes = decode_status_value(io_bytes)
345        quality, io_bytes = decode_quality(io_bytes,
346                                           common.QualityType.MEASUREMENT)
347
348        element = common.IoElement_M_PS_NA(value=value,
349                                           quality=quality)
350        return element, io_bytes
351
352    if asdu_type == common.AsduType.M_ME_ND:
353        value, io_bytes = decode_normalized_value(io_bytes)
354
355        element = common.IoElement_M_ME_ND(value=value)
356        return element, io_bytes
357
358    if asdu_type == common.AsduType.M_SP_TB:
359        value = common.SingleValue(io_bytes[0] & 1)
360        quality, io_bytes = decode_quality(io_bytes,
361                                           common.QualityType.INDICATION)
362
363        element = common.IoElement_M_SP_TB(value=value,
364                                           quality=quality)
365        return element, io_bytes
366
367    if asdu_type == common.AsduType.M_DP_TB:
368        value = common.DoubleValue(io_bytes[0] & 3)
369        quality, io_bytes = decode_quality(io_bytes,
370                                           common.QualityType.INDICATION)
371
372        element = common.IoElement_M_DP_TB(value=value,
373                                           quality=quality)
374        return element, io_bytes
375
376    if asdu_type == common.AsduType.M_ST_TB:
377        value, io_bytes = decode_step_position_value(io_bytes)
378        quality, io_bytes = decode_quality(io_bytes,
379                                           common.QualityType.MEASUREMENT)
380
381        element = common.IoElement_M_ST_TB(value=value,
382                                           quality=quality)
383        return element, io_bytes
384
385    if asdu_type == common.AsduType.M_BO_TB:
386        value, io_bytes = decode_bitstring_value(io_bytes)
387        quality, io_bytes = decode_quality(io_bytes,
388                                           common.QualityType.MEASUREMENT)
389
390        element = common.IoElement_M_BO_TB(value=value,
391                                           quality=quality)
392        return element, io_bytes
393
394    if asdu_type == common.AsduType.M_ME_TD:
395        value, io_bytes = decode_normalized_value(io_bytes)
396        quality, io_bytes = decode_quality(io_bytes,
397                                           common.QualityType.MEASUREMENT)
398
399        element = common.IoElement_M_ME_TD(value=value,
400                                           quality=quality)
401        return element, io_bytes
402
403    if asdu_type == common.AsduType.M_ME_TE:
404        value, io_bytes = decode_scaled_value(io_bytes)
405        quality, io_bytes = decode_quality(io_bytes,
406                                           common.QualityType.MEASUREMENT)
407
408        element = common.IoElement_M_ME_TE(value=value,
409                                           quality=quality)
410        return element, io_bytes
411
412    if asdu_type == common.AsduType.M_ME_TF:
413        value, io_bytes = decode_floating_value(io_bytes)
414        quality, io_bytes = decode_quality(io_bytes,
415                                           common.QualityType.MEASUREMENT)
416
417        element = common.IoElement_M_ME_TF(value=value,
418                                           quality=quality)
419        return element, io_bytes
420
421    if asdu_type == common.AsduType.M_IT_TB:
422        value, io_bytes = decode_binary_counter_value(io_bytes)
423        quality, io_bytes = decode_quality(io_bytes,
424                                           common.QualityType.COUNTER)
425
426        element = common.IoElement_M_IT_TB(value=value,
427                                           quality=quality)
428        return element, io_bytes
429
430    if asdu_type == common.AsduType.M_EP_TD:
431        value = common.ProtectionValue(io_bytes[0] & 0x03)
432        quality, io_bytes = decode_quality(io_bytes,
433                                           common.QualityType.PROTECTION)
434        elapsed_time = int.from_bytes(io_bytes[:2], 'little')
435        io_bytes = io_bytes[2:]
436
437        element = common.IoElement_M_EP_TD(value=value,
438                                           quality=quality,
439                                           elapsed_time=elapsed_time)
440        return element, io_bytes
441
442    if asdu_type == common.AsduType.M_EP_TE:
443        value, io_bytes = decode_protection_start_value(io_bytes)
444        quality, io_bytes = decode_quality(io_bytes,
445                                           common.QualityType.PROTECTION)
446        duration_time = int.from_bytes(io_bytes[:2], 'little')
447        io_bytes = io_bytes[2:]
448
449        element = common.IoElement_M_EP_TE(value=value,
450                                           quality=quality,
451                                           duration_time=duration_time)
452        return element, io_bytes
453
454    if asdu_type == common.AsduType.M_EP_TF:
455        value, io_bytes = decode_protection_command_value(io_bytes)
456        quality, io_bytes = decode_quality(io_bytes,
457                                           common.QualityType.PROTECTION)
458        operating_time = int.from_bytes(io_bytes[:2], 'little')
459        io_bytes = io_bytes[2:]
460
461        element = common.IoElement_M_EP_TF(value=value,
462                                           quality=quality,
463                                           operating_time=operating_time)
464        return element, io_bytes
465
466    if asdu_type == common.AsduType.C_SC_NA:
467        value = common.SingleValue(io_bytes[0] & 1)
468        select = bool(io_bytes[0] & 0x80)
469        qualifier = (io_bytes[0] >> 2) & 0x1F
470        io_bytes = io_bytes[1:]
471
472        element = common.IoElement_C_SC_NA(value=value,
473                                           select=select,
474                                           qualifier=qualifier)
475        return element, io_bytes
476
477    if asdu_type == common.AsduType.C_DC_NA:
478        value = common.DoubleValue(io_bytes[0] & 3)
479        select = bool(io_bytes[0] & 0x80)
480        qualifier = (io_bytes[0] >> 2) & 0x1F
481        io_bytes = io_bytes[1:]
482
483        element = common.IoElement_C_DC_NA(value=value,
484                                           select=select,
485                                           qualifier=qualifier)
486        return element, io_bytes
487
488    if asdu_type == common.AsduType.C_RC_NA:
489        value = common.RegulatingValue(io_bytes[0] & 3)
490        select = bool(io_bytes[0] & 0x80)
491        qualifier = (io_bytes[0] >> 2) & 0x1F
492        io_bytes = io_bytes[1:]
493
494        element = common.IoElement_C_RC_NA(value=value,
495                                           select=select,
496                                           qualifier=qualifier)
497        return element, io_bytes
498
499    if asdu_type == common.AsduType.C_SE_NA:
500        value, io_bytes = decode_normalized_value(io_bytes)
501        select = bool(io_bytes[0] & 0x80)
502        io_bytes = io_bytes[1:]
503
504        element = common.IoElement_C_SE_NA(value=value,
505                                           select=select)
506        return element, io_bytes
507
508    if asdu_type == common.AsduType.C_SE_NB:
509        value, io_bytes = decode_scaled_value(io_bytes)
510        select = bool(io_bytes[0] & 0x80)
511        io_bytes = io_bytes[1:]
512
513        element = common.IoElement_C_SE_NB(value=value,
514                                           select=select)
515        return element, io_bytes
516
517    if asdu_type == common.AsduType.C_SE_NC:
518        value, io_bytes = decode_floating_value(io_bytes)
519        select = bool(io_bytes[0] & 0x80)
520        io_bytes = io_bytes[1:]
521
522        element = common.IoElement_C_SE_NC(value=value,
523                                           select=select)
524        return element, io_bytes
525
526    if asdu_type == common.AsduType.C_BO_NA:
527        value, io_bytes = decode_bitstring_value(io_bytes)
528
529        element = common.IoElement_C_BO_NA(value=value)
530        return element, io_bytes
531
532    if asdu_type == common.AsduType.M_EI_NA:
533        param_change = bool(io_bytes[0] & 0x80)
534        cause = io_bytes[0] & 0x7F
535        io_bytes = io_bytes[1:]
536
537        element = common.IoElement_M_EI_NA(param_change=param_change,
538                                           cause=cause)
539        return element, io_bytes
540
541    if asdu_type == common.AsduType.C_IC_NA:
542        qualifier, io_bytes = io_bytes[0], io_bytes[1:]
543
544        element = common.IoElement_C_IC_NA(qualifier=qualifier)
545        return element, io_bytes
546
547    if asdu_type == common.AsduType.C_CI_NA:
548        request = io_bytes[0] & 0x3F
549        freeze = common.FreezeCode(io_bytes[0] >> 6)
550        io_bytes = io_bytes[1:]
551
552        element = common.IoElement_C_CI_NA(request=request,
553                                           freeze=freeze)
554        return element, io_bytes
555
556    if asdu_type == common.AsduType.C_RD_NA:
557        element = common.IoElement_C_RD_NA()
558        return element, io_bytes
559
560    if asdu_type == common.AsduType.C_CS_NA:
561        time = encoder.decode_time(io_bytes[:7], common.TimeSize.SEVEN)
562        io_bytes = io_bytes[7:]
563
564        element = common.IoElement_C_CS_NA(time=time)
565        return element, io_bytes
566
567    if asdu_type == common.AsduType.C_TS_NA:
568        io_bytes = io_bytes[2:]
569
570        element = common.IoElement_C_TS_NA()
571        return element, io_bytes
572
573    if asdu_type == common.AsduType.C_RP_NA:
574        qualifier, io_bytes = io_bytes[0], io_bytes[1:]
575
576        element = common.IoElement_C_RP_NA(qualifier=qualifier)
577        return element, io_bytes
578
579    if asdu_type == common.AsduType.C_CD_NA:
580        time = int.from_bytes(io_bytes[:2], 'little')
581        io_bytes = io_bytes[2:]
582
583        element = common.IoElement_C_CD_NA(time=time)
584        return element, io_bytes
585
586    if asdu_type == common.AsduType.P_ME_NA:
587        value, io_bytes = decode_normalized_value(io_bytes)
588        qualifier, io_bytes = io_bytes[0], io_bytes[1:]
589
590        element = common.IoElement_P_ME_NA(value=value,
591                                           qualifier=qualifier)
592        return element, io_bytes
593
594    if asdu_type == common.AsduType.P_ME_NB:
595        value, io_bytes = decode_scaled_value(io_bytes)
596        qualifier, io_bytes = io_bytes[0], io_bytes[1:]
597
598        element = common.IoElement_P_ME_NB(value=value,
599                                           qualifier=qualifier)
600        return element, io_bytes
601
602    if asdu_type == common.AsduType.P_ME_NC:
603        value, io_bytes = decode_floating_value(io_bytes)
604        qualifier, io_bytes = io_bytes[0], io_bytes[1:]
605
606        element = common.IoElement_P_ME_NC(value=value,
607                                           qualifier=qualifier)
608        return element, io_bytes
609
610    if asdu_type == common.AsduType.P_AC_NA:
611        qualifier, io_bytes = io_bytes[0], io_bytes[1:]
612
613        element = common.IoElement_P_AC_NA(qualifier=qualifier)
614        return element, io_bytes
615
616    if asdu_type == common.AsduType.F_FR_NA:
617        file_name = int.from_bytes(io_bytes[:2], 'little')
618        file_length = int.from_bytes(io_bytes[2:5], 'little')
619        ready = bool(io_bytes[5] & 0x80)
620        io_bytes = io_bytes[6:]
621
622        element = common.IoElement_F_FR_NA(file_name=file_name,
623                                           file_length=file_length,
624                                           ready=ready)
625        return element, io_bytes
626
627    if asdu_type == common.AsduType.F_SR_NA:
628        file_name = int.from_bytes(io_bytes[:2], 'little')
629        section_name = io_bytes[2]
630        section_length = int.from_bytes(io_bytes[3:6], 'little')
631        ready = bool(io_bytes[6] & 0x80)
632        io_bytes = io_bytes[7:]
633
634        element = common.IoElement_F_SR_NA(file_name=file_name,
635                                           section_name=section_name,
636                                           section_length=section_length,
637                                           ready=ready)
638        return element, io_bytes
639
640    if asdu_type == common.AsduType.F_SC_NA:
641        file_name = int.from_bytes(io_bytes[:2], 'little')
642        section_name = io_bytes[2]
643        qualifier = io_bytes[3]
644        io_bytes = io_bytes[4:]
645
646        element = common.IoElement_F_SC_NA(file_name=file_name,
647                                           section_name=section_name,
648                                           qualifier=qualifier)
649        return element, io_bytes
650
651    if asdu_type == common.AsduType.F_LS_NA:
652        file_name = int.from_bytes(io_bytes[:2], 'little')
653        section_name = io_bytes[2]
654        last_qualifier = io_bytes[3]
655        checksum = io_bytes[4]
656        io_bytes = io_bytes[5:]
657
658        element = common.IoElement_F_LS_NA(file_name=file_name,
659                                           section_name=section_name,
660                                           last_qualifier=last_qualifier,
661                                           checksum=checksum)
662        return element, io_bytes
663
664    if asdu_type == common.AsduType.F_AF_NA:
665        file_name = int.from_bytes(io_bytes[:2], 'little')
666        section_name = io_bytes[2]
667        qualifier = io_bytes[3]
668        io_bytes = io_bytes[4:]
669
670        element = common.IoElement_F_AF_NA(file_name=file_name,
671                                           section_name=section_name,
672                                           qualifier=qualifier)
673        return element, io_bytes
674
675    if asdu_type == common.AsduType.F_SG_NA:
676        file_name = int.from_bytes(io_bytes[:2], 'little')
677        section_name = io_bytes[2]
678        length = io_bytes[3]
679        segment = io_bytes[4:4+length]
680        io_bytes = io_bytes[4+length:]
681
682        element = common.IoElement_F_SG_NA(file_name=file_name,
683                                           section_name=section_name,
684                                           segment=segment)
685        return element, io_bytes
686
687    if asdu_type == common.AsduType.F_DR_TA:
688        file_name = int.from_bytes(io_bytes[:2], 'little')
689        file_length = int.from_bytes(io_bytes[2:5], 'little')
690        more_follows = bool(io_bytes[5] & 0x20)
691        is_directory = bool(io_bytes[5] & 0x40)
692        transfer_active = bool(io_bytes[5] & 0x80)
693        creation_time = encoder.decode_time(io_bytes[6:13],
694                                            common.TimeSize.SEVEN)
695        io_bytes = io_bytes[13:]
696
697        element = common.IoElement_F_DR_TA(file_name=file_name,
698                                           file_length=file_length,
699                                           more_follows=more_follows,
700                                           is_directory=is_directory,
701                                           transfer_active=transfer_active,
702                                           creation_time=creation_time)
703        return element, io_bytes
704
705    raise ValueError('unsupported ASDU type')
708def encode_io_element(element: common.IoElement,
709                      asdu_type: int
710                      ) -> typing.Iterable[int]:
711    asdu_type = _decode_asdu_type(asdu_type)
712
713    if isinstance(element, common.IoElement_M_SP_NA):
714        quality = util.first(encode_quality(element.quality))
715        yield element.value.value | quality
716
717    elif isinstance(element, common.IoElement_M_SP_TA):
718        quality = util.first(encode_quality(element.quality))
719        yield element.value.value | quality
720
721    elif isinstance(element, common.IoElement_M_DP_NA):
722        quality = util.first(encode_quality(element.quality))
723        yield element.value.value | quality
724
725    elif isinstance(element, common.IoElement_M_DP_TA):
726        quality = util.first(encode_quality(element.quality))
727        yield element.value.value | quality
728
729    elif isinstance(element, common.IoElement_M_ST_NA):
730        yield from encode_step_position_value(element.value)
731        yield from encode_quality(element.quality)
732
733    elif isinstance(element, common.IoElement_M_ST_TA):
734        yield from encode_step_position_value(element.value)
735        yield from encode_quality(element.quality)
736
737    elif isinstance(element, common.IoElement_M_BO_NA):
738        yield from encode_bitstring_value(element.value)
739        yield from encode_quality(element.quality)
740
741    elif isinstance(element, common.IoElement_M_BO_TA):
742        yield from encode_bitstring_value(element.value)
743        yield from encode_quality(element.quality)
744
745    elif isinstance(element, common.IoElement_M_ME_NA):
746        yield from encode_normalized_value(element.value)
747        yield from encode_quality(element.quality)
748
749    elif isinstance(element, common.IoElement_M_ME_TA):
750        yield from encode_normalized_value(element.value)
751        yield from encode_quality(element.quality)
752
753    elif isinstance(element, common.IoElement_M_ME_NB):
754        yield from encode_scaled_value(element.value)
755        yield from encode_quality(element.quality)
756
757    elif isinstance(element, common.IoElement_M_ME_TB):
758        yield from encode_scaled_value(element.value)
759        yield from encode_quality(element.quality)
760
761    elif isinstance(element, common.IoElement_M_ME_NC):
762        yield from encode_floating_value(element.value)
763        yield from encode_quality(element.quality)
764
765    elif isinstance(element, common.IoElement_M_ME_TC):
766        yield from encode_floating_value(element.value)
767        yield from encode_quality(element.quality)
768
769    elif isinstance(element, common.IoElement_M_IT_NA):
770        yield from encode_binary_counter_value(element.value)
771        yield from encode_quality(element.quality)
772
773    elif isinstance(element, common.IoElement_M_IT_TA):
774        yield from encode_binary_counter_value(element.value)
775        yield from encode_quality(element.quality)
776
777    elif isinstance(element, common.IoElement_M_EP_TA):
778        quality = util.first(encode_quality(element.quality))
779        yield element.value.value | quality
780        yield from element.elapsed_time.to_bytes(2, 'little')
781
782    elif isinstance(element, common.IoElement_M_EP_TB):
783        yield from encode_protection_start_value(element.value)
784        yield from encode_quality(element.quality)
785        yield from element.duration_time.to_bytes(2, 'little')
786
787    elif isinstance(element, common.IoElement_M_EP_TC):
788        yield from encode_protection_command_value(element.value)
789        yield from encode_quality(element.quality)
790        yield from element.operating_time.to_bytes(2, 'little')
791
792    elif isinstance(element, common.IoElement_M_PS_NA):
793        yield from encode_status_value(element.value)
794        yield from encode_quality(element.quality)
795
796    elif isinstance(element, common.IoElement_M_ME_ND):
797        yield from encode_normalized_value(element.value)
798
799    elif isinstance(element, common.IoElement_M_SP_TB):
800        quality = util.first(encode_quality(element.quality))
801        yield element.value.value | quality
802
803    elif isinstance(element, common.IoElement_M_DP_TB):
804        quality = util.first(encode_quality(element.quality))
805        yield element.value.value | quality
806
807    elif isinstance(element, common.IoElement_M_ST_TB):
808        yield from encode_step_position_value(element.value)
809        yield from encode_quality(element.quality)
810
811    elif isinstance(element, common.IoElement_M_BO_TB):
812        yield from encode_bitstring_value(element.value)
813        yield from encode_quality(element.quality)
814
815    elif isinstance(element, common.IoElement_M_ME_TD):
816        yield from encode_normalized_value(element.value)
817        yield from encode_quality(element.quality)
818
819    elif isinstance(element, common.IoElement_M_ME_TE):
820        yield from encode_scaled_value(element.value)
821        yield from encode_quality(element.quality)
822
823    elif isinstance(element, common.IoElement_M_ME_TF):
824        yield from encode_floating_value(element.value)
825        yield from encode_quality(element.quality)
826
827    elif isinstance(element, common.IoElement_M_IT_TB):
828        yield from encode_binary_counter_value(element.value)
829        yield from encode_quality(element.quality)
830
831    elif isinstance(element, common.IoElement_M_EP_TD):
832        quality = util.first(encode_quality(element.quality))
833        yield element.value.value | quality
834        yield from element.elapsed_time.to_bytes(2, 'little')
835
836    elif isinstance(element, common.IoElement_M_EP_TE):
837        yield from encode_protection_start_value(element.value)
838        yield from encode_quality(element.quality)
839        yield from element.duration_time.to_bytes(2, 'little')
840
841    elif isinstance(element, common.IoElement_M_EP_TF):
842        yield from encode_protection_command_value(element.value)
843        yield from encode_quality(element.quality)
844        yield from element.operating_time.to_bytes(2, 'little')
845
846    elif isinstance(element, common.IoElement_C_SC_NA):
847        yield (element.value.value |
848               (0x80 if element.select else 0) |
849               ((element.qualifier & 0x1F) << 2))
850
851    elif isinstance(element, common.IoElement_C_DC_NA):
852        yield (element.value.value |
853               (0x80 if element.select else 0) |
854               ((element.qualifier & 0x1F) << 2))
855
856    elif isinstance(element, common.IoElement_C_RC_NA):
857        yield (element.value.value |
858               (0x80 if element.select else 0) |
859               ((element.qualifier & 0x1F) << 2))
860
861    elif isinstance(element, common.IoElement_C_SE_NA):
862        yield from encode_normalized_value(element.value)
863        yield (0x80 if element.select else 0)
864
865    elif isinstance(element, common.IoElement_C_SE_NB):
866        yield from encode_scaled_value(element.value)
867        yield (0x80 if element.select else 0)
868
869    elif isinstance(element, common.IoElement_C_SE_NC):
870        yield from encode_floating_value(element.value)
871        yield (0x80 if element.select else 0)
872
873    elif isinstance(element, common.IoElement_C_BO_NA):
874        yield from encode_bitstring_value(element.value)
875
876    elif isinstance(element, common.IoElement_M_EI_NA):
877        yield ((0x80 if element.param_change else 0x00) |
878               (element.cause & 0x7F))
879
880    elif isinstance(element, common.IoElement_C_IC_NA):
881        yield element.qualifier & 0xFF
882
883    elif isinstance(element, common.IoElement_C_CI_NA):
884        yield ((element.freeze.value << 6) |
885               (element.request & 0x3F))
886
887    elif isinstance(element, common.IoElement_C_RD_NA):
888        pass
889
890    elif isinstance(element, common.IoElement_C_CS_NA):
891        yield from encoder.encode_time(element.time, common.TimeSize.SEVEN)
892
893    elif isinstance(element, common.IoElement_C_TS_NA):
894        yield 0xAA
895        yield 0x55
896
897    elif isinstance(element, common.IoElement_C_RP_NA):
898        yield element.qualifier & 0xFF
899
900    elif isinstance(element, common.IoElement_C_CD_NA):
901        yield from element.time.to_bytes(2, 'little')
902
903    elif isinstance(element, common.IoElement_P_ME_NA):
904        yield from encode_normalized_value(element.value)
905        yield element.qualifier & 0xFF
906
907    elif isinstance(element, common.IoElement_P_ME_NB):
908        yield from encode_scaled_value(element.value)
909        yield element.qualifier & 0xFF
910
911    elif isinstance(element, common.IoElement_P_ME_NC):
912        yield from encode_floating_value(element.value)
913        yield element.qualifier & 0xFF
914
915    elif isinstance(element, common.IoElement_P_AC_NA):
916        yield element.qualifier & 0xFF
917
918    elif isinstance(element, common.IoElement_F_FR_NA):
919        yield from element.file_name.to_bytes(2, 'little')
920        yield from element.file_length.to_bytes(3, 'little')
921        yield (0x80 if element.ready else 0x00)
922
923    elif isinstance(element, common.IoElement_F_SR_NA):
924        yield from element.file_name.to_bytes(2, 'little')
925        yield element.section_name & 0xFF
926        yield from element.section_length.to_bytes(3, 'little')
927        yield (0x80 if element.ready else 0x00)
928
929    elif isinstance(element, common.IoElement_F_SC_NA):
930        yield from element.file_name.to_bytes(2, 'little')
931        yield element.section_name & 0xFF
932        yield element.qualifier & 0xFF
933
934    elif isinstance(element, common.IoElement_F_LS_NA):
935        yield from element.file_name.to_bytes(2, 'little')
936        yield element.section_name & 0xFF
937        yield element.last_qualifier & 0xFF
938        yield element.checksum & 0xFF
939
940    elif isinstance(element, common.IoElement_F_AF_NA):
941        yield from element.file_name.to_bytes(2, 'little')
942        yield element.section_name & 0xFF
943        yield element.qualifier & 0xFF
944
945    elif isinstance(element, common.IoElement_F_SG_NA):
946        yield from element.file_name.to_bytes(2, 'little')
947        yield element.section_name & 0xFF
948        yield len(element.segment)
949        yield from element.segment
950
951    elif isinstance(element, common.IoElement_F_DR_TA):
952        yield from element.file_name.to_bytes(2, 'little')
953        yield from element.file_length.to_bytes(3, 'little')
954        yield ((0x20 if element.more_follows else 0x00) |
955               (0x40 if element.is_directory else 0x00) |
956               (0x80 if element.transfer_active else 0x00))
957        yield from encoder.encode_time(element.creation_time,
958                                       common.TimeSize.SEVEN)
959
960    else:
961        raise ValueError('unsupported IO element')
def decode_quality( io_bytes: bytes | bytearray | memoryview, quality_type: QualityType) -> tuple[IndicationQuality | MeasurementQuality | CounterQuality | ProtectionQuality, bytes | bytearray | memoryview]:
 964def decode_quality(io_bytes: util.Bytes,
 965                   quality_type: common.QualityType
 966                   ) -> tuple[common.Quality, util.Bytes]:
 967    if quality_type == common.QualityType.INDICATION:
 968        invalid = bool(io_bytes[0] & 0x80)
 969        not_topical = bool(io_bytes[0] & 0x40)
 970        substituted = bool(io_bytes[0] & 0x20)
 971        blocked = bool(io_bytes[0] & 0x10)
 972        quality = common.IndicationQuality(invalid=invalid,
 973                                           not_topical=not_topical,
 974                                           substituted=substituted,
 975                                           blocked=blocked)
 976
 977    elif quality_type == common.QualityType.MEASUREMENT:
 978        invalid = bool(io_bytes[0] & 0x80)
 979        not_topical = bool(io_bytes[0] & 0x40)
 980        substituted = bool(io_bytes[0] & 0x20)
 981        blocked = bool(io_bytes[0] & 0x10)
 982        overflow = bool(io_bytes[0] & 0x01)
 983        quality = common.MeasurementQuality(invalid=invalid,
 984                                            not_topical=not_topical,
 985                                            substituted=substituted,
 986                                            blocked=blocked,
 987                                            overflow=overflow)
 988
 989    elif quality_type == common.QualityType.COUNTER:
 990        invalid = bool(io_bytes[0] & 0x80)
 991        adjusted = bool(io_bytes[0] & 0x40)
 992        overflow = bool(io_bytes[0] & 0x20)
 993        sequence = io_bytes[0] & 0x1F
 994        quality = common.CounterQuality(invalid=invalid,
 995                                        adjusted=adjusted,
 996                                        overflow=overflow,
 997                                        sequence=sequence)
 998
 999    elif quality_type == common.QualityType.PROTECTION:
1000        invalid = bool(io_bytes[0] & 0x80)
1001        not_topical = bool(io_bytes[0] & 0x40)
1002        substituted = bool(io_bytes[0] & 0x20)
1003        blocked = bool(io_bytes[0] & 0x10)
1004        time_invalid = bool(io_bytes[0] & 0x08)
1005        quality = common.ProtectionQuality(invalid=invalid,
1006                                           not_topical=not_topical,
1007                                           substituted=substituted,
1008                                           blocked=blocked,
1009                                           time_invalid=time_invalid)
1010
1011    else:
1012        raise ValueError('unsupported quality type')
1013
1014    return quality, io_bytes[1:]
def encode_quality( quality: IndicationQuality | MeasurementQuality | CounterQuality | ProtectionQuality) -> Iterable[int]:
1017def encode_quality(quality: common.Quality
1018                   ) -> typing.Iterable[int]:
1019    if isinstance(quality, common.IndicationQuality):
1020        yield ((0x80 if quality.invalid else 0) |
1021               (0x40 if quality.not_topical else 0) |
1022               (0x20 if quality.substituted else 0) |
1023               (0x10 if quality.blocked else 0))
1024
1025    elif isinstance(quality, common.MeasurementQuality):
1026        yield ((0x80 if quality.invalid else 0) |
1027               (0x40 if quality.not_topical else 0) |
1028               (0x20 if quality.substituted else 0) |
1029               (0x10 if quality.blocked else 0) |
1030               (0x01 if quality.overflow else 0))
1031
1032    elif isinstance(quality, common.CounterQuality):
1033        yield ((0x80 if quality.invalid else 0) |
1034               (0x40 if quality.adjusted else 0) |
1035               (0x20 if quality.overflow else 0) |
1036               (quality.sequence & 0x1F))
1037
1038    elif isinstance(quality, common.ProtectionQuality):
1039        yield ((0x80 if quality.invalid else 0) |
1040               (0x40 if quality.not_topical else 0) |
1041               (0x20 if quality.substituted else 0) |
1042               (0x10 if quality.blocked else 0) |
1043               (0x08 if quality.time_invalid else 0))
1044
1045    else:
1046        raise ValueError('unsupported quality')
def decode_step_position_value( io_bytes: bytes | bytearray | memoryview) -> tuple[StepPositionValue, bytes | bytearray | memoryview]:
1049def decode_step_position_value(io_bytes: util.Bytes
1050                               ) -> tuple[common.StepPositionValue,
1051                                          util.Bytes]:
1052    value = (((-1 << 7) if io_bytes[0] & 0x40 else 0) |
1053             (io_bytes[0] & 0x7F))
1054    transient = bool(io_bytes[0] & 0x80)
1055    step_position_value = common.StepPositionValue(value=value,
1056                                                   transient=transient)
1057    return step_position_value, io_bytes[1:]
def encode_step_position_value( value: StepPositionValue) -> Iterable[int]:
1060def encode_step_position_value(value: common.StepPositionValue
1061                               ) -> typing.Iterable[int]:
1062    yield ((0x80 if value.transient else 0) |
1063           (value.value & 0x7F))
def decode_bitstring_value( io_bytes: bytes | bytearray | memoryview) -> tuple[BitstringValue, bytes | bytearray | memoryview]:
1066def decode_bitstring_value(io_bytes: util.Bytes
1067                           ) -> tuple[common.BitstringValue, util.Bytes]:
1068    value = io_bytes[:4]
1069    bitstring_value = common.BitstringValue(value)
1070    return bitstring_value, io_bytes[4:]
def encode_bitstring_value( value: BitstringValue) -> Iterable[int]:
1073def encode_bitstring_value(value: common.BitstringValue
1074                           ) -> typing.Iterable[int]:
1075    yield value.value[0]
1076    yield value.value[1]
1077    yield value.value[2]
1078    yield value.value[3]
def decode_normalized_value( io_bytes: bytes | bytearray | memoryview) -> tuple[NormalizedValue, bytes | bytearray | memoryview]:
1081def decode_normalized_value(io_bytes: util.Bytes
1082                            ) -> tuple[common.NormalizedValue, util.Bytes]:
1083    value = struct.unpack('<h', io_bytes[:2])[0] / 0x7fff
1084    normalized_value = common.NormalizedValue(value)
1085    return normalized_value, io_bytes[2:]
def encode_normalized_value( value: NormalizedValue) -> Iterable[int]:
1088def encode_normalized_value(value: common.NormalizedValue
1089                            ) -> typing.Iterable[int]:
1090    yield from struct.pack('<h', round(value.value * 0x7fff))
def decode_scaled_value( io_bytes: bytes | bytearray | memoryview) -> tuple[ScaledValue, bytes | bytearray | memoryview]:
1093def decode_scaled_value(io_bytes: util.Bytes
1094                        ) -> tuple[common.ScaledValue, util.Bytes]:
1095    value = struct.unpack('<h', io_bytes[:2])[0]
1096    scaled_value = common.ScaledValue(value)
1097    return scaled_value, io_bytes[2:]
def encode_scaled_value( value: ScaledValue) -> Iterable[int]:
1100def encode_scaled_value(value: common.ScaledValue
1101                        ) -> typing.Iterable[int]:
1102    yield from struct.pack('<h', value.value)
def decode_floating_value( io_bytes: bytes | bytearray | memoryview) -> tuple[FloatingValue, bytes | bytearray | memoryview]:
1105def decode_floating_value(io_bytes: util.Bytes
1106                          ) -> tuple[common.FloatingValue, util.Bytes]:
1107    value = struct.unpack('<f', io_bytes[:4])[0]
1108    floating_value = common.FloatingValue(value)
1109    return floating_value, io_bytes[4:]
def encode_floating_value( value: FloatingValue) -> Iterable[int]:
1112def encode_floating_value(value: common.FloatingValue
1113                          ) -> typing.Iterable[int]:
1114    yield from struct.pack('<f', value.value)
def decode_binary_counter_value( io_bytes: bytes | bytearray | memoryview) -> tuple[BinaryCounterValue, bytes | bytearray | memoryview]:
1117def decode_binary_counter_value(io_bytes: util.Bytes
1118                                ) -> tuple[common.BinaryCounterValue,
1119                                           util.Bytes]:
1120    value = struct.unpack('<i', io_bytes[:4])[0]
1121    binary_counter_value = common.BinaryCounterValue(value)
1122    return binary_counter_value, io_bytes[4:]
def encode_binary_counter_value( value: BinaryCounterValue) -> Iterable[int]:
1125def encode_binary_counter_value(value: common.BinaryCounterValue
1126                                ) -> typing.Iterable[int]:
1127    yield from struct.pack('<i', value.value)
def decode_protection_start_value( io_bytes: bytes | bytearray | memoryview) -> tuple[ProtectionStartValue, bytes | bytearray | memoryview]:
1130def decode_protection_start_value(io_bytes: util.Bytes
1131                                  ) -> tuple[common.ProtectionStartValue,
1132                                             util.Bytes]:
1133    general = bool(io_bytes[0] & 0x01)
1134    l1 = bool(io_bytes[0] & 0x02)
1135    l2 = bool(io_bytes[0] & 0x04)
1136    l3 = bool(io_bytes[0] & 0x08)
1137    ie = bool(io_bytes[0] & 0x10)
1138    reverse = bool(io_bytes[0] & 0x20)
1139    protection_start_value = common.ProtectionStartValue(general=general,
1140                                                         l1=l1,
1141                                                         l2=l2,
1142                                                         l3=l3,
1143                                                         ie=ie,
1144                                                         reverse=reverse)
1145    return protection_start_value, io_bytes[1:]
def encode_protection_start_value( value: ProtectionStartValue) -> Iterable[int]:
1148def encode_protection_start_value(value: common.ProtectionStartValue
1149                                  ) -> typing.Iterable[int]:
1150    yield ((0x01 if value.general else 0x00) |
1151           (0x02 if value.l1 else 0x00) |
1152           (0x04 if value.l2 else 0x00) |
1153           (0x08 if value.l3 else 0x00) |
1154           (0x10 if value.ie else 0x00) |
1155           (0x20 if value.reverse else 0x00))
def decode_protection_command_value( io_bytes: bytes | bytearray | memoryview) -> tuple[ProtectionCommandValue, bytes | bytearray | memoryview]:
1158def decode_protection_command_value(io_bytes: util.Bytes
1159                                    ) -> tuple[common.ProtectionCommandValue,
1160                                               util.Bytes]:
1161    general = bool(io_bytes[0] & 0x01)
1162    l1 = bool(io_bytes[0] & 0x02)
1163    l2 = bool(io_bytes[0] & 0x04)
1164    l3 = bool(io_bytes[0] & 0x08)
1165    protection_command_value = common.ProtectionCommandValue(general=general,
1166                                                             l1=l1,
1167                                                             l2=l2,
1168                                                             l3=l3)
1169    return protection_command_value, io_bytes[1:]
def encode_protection_command_value( value: ProtectionCommandValue) -> Iterable[int]:
1172def encode_protection_command_value(value: common.ProtectionCommandValue
1173                                    ) -> typing.Iterable[int]:
1174    yield ((0x01 if value.general else 0x00) |
1175           (0x02 if value.l1 else 0x00) |
1176           (0x04 if value.l2 else 0x00) |
1177           (0x08 if value.l3 else 0x00))
def decode_status_value( io_bytes: bytes | bytearray | memoryview) -> tuple[StatusValue, bytes | bytearray | memoryview]:
1180def decode_status_value(io_bytes: util.Bytes
1181                        ) -> tuple[common.StatusValue, util.Bytes]:
1182    value = [bool(io_bytes[i // 8] & (1 << (i % 8)))
1183             for i in range(16)]
1184    change = [bool(io_bytes[2 + i // 8] & (1 << (i % 8)))
1185              for i in range(16)]
1186    status_value = common.StatusValue(value=value,
1187                                      change=change)
1188    return status_value, io_bytes[4:]
def encode_status_value( value: StatusValue) -> Iterable[int]:
1191def encode_status_value(value: common.StatusValue
1192                        ) -> typing.Iterable[int]:
1193    for i in [value.value, value.change]:
1194        for j in range(2):
1195            acc = 0
1196            for k in range(8):
1197                if i[j * 8 + k]:
1198                    acc = acc | (1 << k)
1199            yield acc