hat.drivers.iec60870.encodings.iec101
IEC 60870-5-101 messages
1"""IEC 60870-5-101 messages""" 2 3from hat.drivers.iec60870.encodings.iec101.common import ( 4 AsduTypeError, 5 CauseSize, 6 AsduAddressSize, 7 IoAddressSize, 8 TimeSize, 9 Time, 10 time_from_datetime, 11 time_to_datetime, 12 OriginatorAddress, 13 AsduAddress, 14 IoAddress, 15 OtherCauseType, 16 AsduType, 17 CauseType, 18 Cause, 19 QualityType, 20 IndicationQuality, 21 MeasurementQuality, 22 CounterQuality, 23 ProtectionQuality, 24 Quality, 25 FreezeCode, 26 SingleValue, 27 DoubleValue, 28 RegulatingValue, 29 StepPositionValue, 30 BitstringValue, 31 NormalizedValue, 32 ScaledValue, 33 FloatingValue, 34 BinaryCounterValue, 35 ProtectionValue, 36 ProtectionStartValue, 37 ProtectionCommandValue, 38 StatusValue, 39 IoElement_M_SP_NA, 40 IoElement_M_SP_TA, 41 IoElement_M_DP_NA, 42 IoElement_M_DP_TA, 43 IoElement_M_ST_NA, 44 IoElement_M_ST_TA, 45 IoElement_M_BO_NA, 46 IoElement_M_BO_TA, 47 IoElement_M_ME_NA, 48 IoElement_M_ME_TA, 49 IoElement_M_ME_NB, 50 IoElement_M_ME_TB, 51 IoElement_M_ME_NC, 52 IoElement_M_ME_TC, 53 IoElement_M_IT_NA, 54 IoElement_M_IT_TA, 55 IoElement_M_EP_TA, 56 IoElement_M_EP_TB, 57 IoElement_M_EP_TC, 58 IoElement_M_PS_NA, 59 IoElement_M_ME_ND, 60 IoElement_M_SP_TB, 61 IoElement_M_DP_TB, 62 IoElement_M_ST_TB, 63 IoElement_M_BO_TB, 64 IoElement_M_ME_TD, 65 IoElement_M_ME_TE, 66 IoElement_M_ME_TF, 67 IoElement_M_IT_TB, 68 IoElement_M_EP_TD, 69 IoElement_M_EP_TE, 70 IoElement_M_EP_TF, 71 IoElement_C_SC_NA, 72 IoElement_C_DC_NA, 73 IoElement_C_RC_NA, 74 IoElement_C_SE_NA, 75 IoElement_C_SE_NB, 76 IoElement_C_SE_NC, 77 IoElement_C_BO_NA, 78 IoElement_M_EI_NA, 79 IoElement_C_IC_NA, 80 IoElement_C_CI_NA, 81 IoElement_C_RD_NA, 82 IoElement_C_CS_NA, 83 IoElement_C_TS_NA, 84 IoElement_C_RP_NA, 85 IoElement_C_CD_NA, 86 IoElement_P_ME_NA, 87 IoElement_P_ME_NB, 88 IoElement_P_ME_NC, 89 IoElement_P_AC_NA, 90 IoElement_F_FR_NA, 91 IoElement_F_SR_NA, 92 IoElement_F_SC_NA, 93 IoElement_F_LS_NA, 94 IoElement_F_AF_NA, 95 IoElement_F_SG_NA, 96 IoElement_F_DR_TA, 97 IoElement, 98 IO, 99 ASDU) 100from hat.drivers.iec60870.encodings.iec101.encoder import ( 101 Encoder, 102 asdu_type_time_sizes, 103 decode_cause, 104 encode_cause, 105 decode_cause_type, 106 encode_cause_type, 107 decode_io_element, 108 encode_io_element, 109 decode_quality, 110 encode_quality, 111 decode_step_position_value, 112 encode_step_position_value, 113 decode_bitstring_value, 114 encode_bitstring_value, 115 decode_normalized_value, 116 encode_normalized_value, 117 decode_scaled_value, 118 encode_scaled_value, 119 decode_floating_value, 120 encode_floating_value, 121 decode_binary_counter_value, 122 encode_binary_counter_value, 123 decode_protection_start_value, 124 encode_protection_start_value, 125 decode_protection_command_value, 126 encode_protection_command_value, 127 decode_status_value, 128 encode_status_value) 129 130 131__all__ = ['AsduTypeError', 132 'CauseSize', 133 'AsduAddressSize', 134 'IoAddressSize', 135 'TimeSize', 136 'Time', 137 'time_from_datetime', 138 'time_to_datetime', 139 'OriginatorAddress', 140 'AsduAddress', 141 'IoAddress', 142 'OtherCauseType', 143 'AsduType', 144 'CauseType', 145 'Cause', 146 'QualityType', 147 'IndicationQuality', 148 'MeasurementQuality', 149 'CounterQuality', 150 'ProtectionQuality', 151 'Quality', 152 'FreezeCode', 153 'SingleValue', 154 'DoubleValue', 155 'RegulatingValue', 156 'StepPositionValue', 157 'BitstringValue', 158 'NormalizedValue', 159 'ScaledValue', 160 'FloatingValue', 161 'BinaryCounterValue', 162 'ProtectionValue', 163 'ProtectionStartValue', 164 'ProtectionCommandValue', 165 'StatusValue', 166 'IoElement_M_SP_NA', 167 'IoElement_M_SP_TA', 168 'IoElement_M_DP_NA', 169 'IoElement_M_DP_TA', 170 'IoElement_M_ST_NA', 171 'IoElement_M_ST_TA', 172 'IoElement_M_BO_NA', 173 'IoElement_M_BO_TA', 174 'IoElement_M_ME_NA', 175 'IoElement_M_ME_TA', 176 'IoElement_M_ME_NB', 177 'IoElement_M_ME_TB', 178 'IoElement_M_ME_NC', 179 'IoElement_M_ME_TC', 180 'IoElement_M_IT_NA', 181 'IoElement_M_IT_TA', 182 'IoElement_M_EP_TA', 183 'IoElement_M_EP_TB', 184 'IoElement_M_EP_TC', 185 'IoElement_M_PS_NA', 186 'IoElement_M_ME_ND', 187 'IoElement_M_SP_TB', 188 'IoElement_M_DP_TB', 189 'IoElement_M_ST_TB', 190 'IoElement_M_BO_TB', 191 'IoElement_M_ME_TD', 192 'IoElement_M_ME_TE', 193 'IoElement_M_ME_TF', 194 'IoElement_M_IT_TB', 195 'IoElement_M_EP_TD', 196 'IoElement_M_EP_TE', 197 'IoElement_M_EP_TF', 198 'IoElement_C_SC_NA', 199 'IoElement_C_DC_NA', 200 'IoElement_C_RC_NA', 201 'IoElement_C_SE_NA', 202 'IoElement_C_SE_NB', 203 'IoElement_C_SE_NC', 204 'IoElement_C_BO_NA', 205 'IoElement_M_EI_NA', 206 'IoElement_C_IC_NA', 207 'IoElement_C_CI_NA', 208 'IoElement_C_RD_NA', 209 'IoElement_C_CS_NA', 210 'IoElement_C_TS_NA', 211 'IoElement_C_RP_NA', 212 'IoElement_C_CD_NA', 213 'IoElement_P_ME_NA', 214 'IoElement_P_ME_NB', 215 'IoElement_P_ME_NC', 216 'IoElement_P_AC_NA', 217 'IoElement_F_FR_NA', 218 'IoElement_F_SR_NA', 219 'IoElement_F_SC_NA', 220 'IoElement_F_LS_NA', 221 'IoElement_F_AF_NA', 222 'IoElement_F_SG_NA', 223 'IoElement_F_DR_TA', 224 'IoElement', 225 'IO', 226 'ASDU', 227 'Encoder', 228 'asdu_type_time_sizes', 229 'decode_cause', 230 'encode_cause', 231 'decode_cause_type', 232 'encode_cause_type', 233 'decode_io_element', 234 'encode_io_element', 235 'decode_quality', 236 'encode_quality', 237 'decode_step_position_value', 238 'encode_step_position_value', 239 'decode_bitstring_value', 240 'encode_bitstring_value', 241 'decode_normalized_value', 242 'encode_normalized_value', 243 'decode_scaled_value', 244 'encode_scaled_value', 245 'decode_floating_value', 246 'encode_floating_value', 247 'decode_binary_counter_value', 248 'encode_binary_counter_value', 249 'decode_protection_start_value', 250 'encode_protection_start_value', 251 'decode_protection_command_value', 252 'encode_protection_command_value', 253 'decode_status_value', 254 'encode_status_value']
Common base class for all non-exit exceptions.
An enumeration.
An enumeration.
An enumeration.
An enumeration.
31class Time(typing.NamedTuple): 32 size: TimeSize 33 milliseconds: int 34 """milliseconds in range [0, 59999]""" 35 invalid: bool | None 36 """available for size THREE, FOUR, SEVEN""" 37 minutes: int | None 38 """available for size THREE, FOUR, SEVEN (minutes in range [0, 59])""" 39 summer_time: bool | None 40 """available for size FOUR, SEVEN""" 41 hours: int | None 42 """available for size FOUR, SEVEN (hours in range [0, 23])""" 43 day_of_week: int | None 44 """available for size SEVEN (day_of_week in range [1, 7])""" 45 day_of_month: int | None 46 """available for size SEVEN (day_of_month in range [1, 31])""" 47 months: int | None 48 """available for size SEVEN (months in range [1, 12])""" 49 years: int | None 50 """available for size SEVEN (years in range [0, 99])"""
Time(size, milliseconds, invalid, minutes, summer_time, hours, day_of_week, day_of_month, months, years)
Create new instance of Time(size, milliseconds, invalid, minutes, summer_time, hours, day_of_week, day_of_month, months, years)
66def time_from_datetime(dt: datetime.datetime, 67 invalid: bool = False 68 ) -> Time: 69 """Create Time from datetime.datetime""" 70 # TODO document edge cases (local time, os implementation, ...) 71 # rounding microseconds to the nearest millisecond 72 dt_rounded = ( 73 dt.replace(microsecond=0) + 74 datetime.timedelta(milliseconds=round(dt.microsecond / 1000))) 75 local_time = time.localtime(dt_rounded.timestamp()) 76 77 return Time( 78 size=TimeSize.SEVEN, 79 milliseconds=(local_time.tm_sec * 1000 + 80 dt_rounded.microsecond // 1000), 81 invalid=invalid, 82 minutes=local_time.tm_min, 83 summer_time=bool(local_time.tm_isdst), 84 hours=local_time.tm_hour, 85 day_of_week=local_time.tm_wday + 1, 86 day_of_month=local_time.tm_mday, 87 months=local_time.tm_mon, 88 years=local_time.tm_year % 100)
Create Time from datetime.datetime
91def time_to_datetime(t: Time 92 ) -> datetime.datetime: 93 """Convert Time to datetime.datetime""" 94 # TODO document edge cases (local time, os implementation, ...) 95 # TODO maybe allow diferent time size (use now for time) 96 if t.size != TimeSize.SEVEN: 97 raise ValueError('unsupported time size') 98 99 local_dt = datetime.datetime( 100 year=2000 + t.years if t.years < 70 else 1900 + t.years, 101 month=t.months, 102 day=t.day_of_month, 103 hour=t.hours, 104 minute=t.minutes, 105 second=int(t.milliseconds / 1000), 106 microsecond=(t.milliseconds % 1000) * 1000, 107 fold=not t.summer_time) 108 109 return local_dt.astimezone(tz=datetime.timezone.utc)
Convert Time to datetime.datetime
29class AsduType(enum.Enum): 30 M_SP_NA = 1 31 M_SP_TA = 2 32 M_DP_NA = 3 33 M_DP_TA = 4 34 M_ST_NA = 5 35 M_ST_TA = 6 36 M_BO_NA = 7 37 M_BO_TA = 8 38 M_ME_NA = 9 39 M_ME_TA = 10 40 M_ME_NB = 11 41 M_ME_TB = 12 42 M_ME_NC = 13 43 M_ME_TC = 14 44 M_IT_NA = 15 45 M_IT_TA = 16 46 M_EP_TA = 17 47 M_EP_TB = 18 48 M_EP_TC = 19 49 M_PS_NA = 20 50 M_ME_ND = 21 51 M_SP_TB = 30 52 M_DP_TB = 31 53 M_ST_TB = 32 54 M_BO_TB = 33 55 M_ME_TD = 34 56 M_ME_TE = 35 57 M_ME_TF = 36 58 M_IT_TB = 37 59 M_EP_TD = 38 60 M_EP_TE = 39 61 M_EP_TF = 40 62 C_SC_NA = 45 63 C_DC_NA = 46 64 C_RC_NA = 47 65 C_SE_NA = 48 66 C_SE_NB = 49 67 C_SE_NC = 50 68 C_BO_NA = 51 69 M_EI_NA = 70 70 C_IC_NA = 100 71 C_CI_NA = 101 72 C_RD_NA = 102 73 C_CS_NA = 103 74 C_TS_NA = 104 75 C_RP_NA = 105 76 C_CD_NA = 106 77 P_ME_NA = 110 78 P_ME_NB = 111 79 P_ME_NC = 112 80 P_AC_NA = 113 81 F_FR_NA = 120 82 F_SR_NA = 121 83 F_SC_NA = 122 84 F_LS_NA = 123 85 F_AF_NA = 124 86 F_SG_NA = 125 87 F_DR_TA = 126
An enumeration.
90class CauseType(enum.Enum): 91 UNDEFINED = 0 92 PERIODIC = 1 93 BACKGROUND_SCAN = 2 94 SPONTANEOUS = 3 95 INITIALIZED = 4 96 REQUEST = 5 97 ACTIVATION = 6 98 ACTIVATION_CONFIRMATION = 7 99 DEACTIVATION = 8 100 DEACTIVATION_CONFIRMATION = 9 101 ACTIVATION_TERMINATION = 10 102 REMOTE_COMMAND = 11 103 LOCAL_COMMAND = 12 104 FILE_TRANSFER = 13 105 INTERROGATED_STATION = 20 106 INTERROGATED_GROUP01 = 21 107 INTERROGATED_GROUP02 = 22 108 INTERROGATED_GROUP03 = 23 109 INTERROGATED_GROUP04 = 24 110 INTERROGATED_GROUP05 = 25 111 INTERROGATED_GROUP06 = 26 112 INTERROGATED_GROUP07 = 27 113 INTERROGATED_GROUP08 = 28 114 INTERROGATED_GROUP09 = 29 115 INTERROGATED_GROUP10 = 30 116 INTERROGATED_GROUP11 = 31 117 INTERROGATED_GROUP12 = 32 118 INTERROGATED_GROUP13 = 33 119 INTERROGATED_GROUP14 = 34 120 INTERROGATED_GROUP15 = 35 121 INTERROGATED_GROUP16 = 36 122 INTERROGATED_COUNTER = 37 123 INTERROGATED_COUNTER01 = 38 124 INTERROGATED_COUNTER02 = 39 125 INTERROGATED_COUNTER03 = 40 126 INTERROGATED_COUNTER04 = 41 127 UNKNOWN_TYPE = 44 128 UNKNOWN_CAUSE = 45 129 UNKNOWN_ASDU_ADDRESS = 46 130 UNKNOWN_IO_ADDRESS = 47
An enumeration.
133class Cause(typing.NamedTuple): 134 type: CauseType | OtherCauseType 135 is_negative_confirm: bool 136 is_test: bool 137 originator_address: OriginatorAddress
Cause(type, is_negative_confirm, is_test, originator_address)
Create new instance of Cause(type, is_negative_confirm, is_test, originator_address)
140class QualityType(enum.Enum): 141 INDICATION = 0 142 MEASUREMENT = 1 143 COUNTER = 2 144 PROTECTION = 3
An enumeration.
147class IndicationQuality(typing.NamedTuple): 148 invalid: bool 149 not_topical: bool 150 substituted: bool 151 blocked: bool
IndicationQuality(invalid, not_topical, substituted, blocked)
154class MeasurementQuality(typing.NamedTuple): 155 invalid: bool 156 not_topical: bool 157 substituted: bool 158 blocked: bool 159 overflow: bool
MeasurementQuality(invalid, not_topical, substituted, blocked, overflow)
162class CounterQuality(typing.NamedTuple): 163 invalid: bool 164 adjusted: bool 165 overflow: bool 166 sequence: int 167 """sequence in range [0, 31]"""
CounterQuality(invalid, adjusted, overflow, sequence)
170class ProtectionQuality(typing.NamedTuple): 171 invalid: bool 172 not_topical: bool 173 substituted: bool 174 blocked: bool 175 time_invalid: bool
ProtectionQuality(invalid, not_topical, substituted, blocked, time_invalid)
An enumeration.
An enumeration.
196class DoubleValue(enum.Enum): 197 """DoubleDataValue 198 199 `FAULT` stands for value 3, defined in the protocol as *INDETERMINATE*. 200 This is in order to make it more distinguishable from ``INTERMEDIATE``. 201 202 """ 203 INTERMEDIATE = 0 204 OFF = 1 205 ON = 2 206 FAULT = 3
DoubleDataValue
FAULT
stands for value 3, defined in the protocol as INDETERMINATE.
This is in order to make it more distinguishable from INTERMEDIATE
.
An enumeration.
214class StepPositionValue(typing.NamedTuple): 215 value: int 216 """value in range [-64, 63]""" 217 transient: bool
StepPositionValue(value, transient)
220class BitstringValue(typing.NamedTuple): 221 value: util.Bytes 222 """bitstring encoded as 4 bytes"""
BitstringValue(value,)
NormalizedValue(value,)
ScaledValue(value,)
FloatingValue(value,)
239class BinaryCounterValue(typing.NamedTuple): 240 value: int 241 """value in range [-2^31, 2^31-1]"""
BinaryCounterValue(value,)
An enumeration.
249class ProtectionStartValue(typing.NamedTuple): 250 general: bool 251 l1: bool 252 l2: bool 253 l3: bool 254 ie: bool 255 reverse: bool
ProtectionStartValue(general, l1, l2, l3, ie, reverse)
258class ProtectionCommandValue(typing.NamedTuple): 259 general: bool 260 l1: bool 261 l2: bool 262 l3: bool
ProtectionCommandValue(general, l1, l2, l3)
265class StatusValue(typing.NamedTuple): 266 value: list[bool] 267 """value length is 16""" 268 change: list[bool] 269 """change length is 16"""
StatusValue(value, change)
272class IoElement_M_SP_NA(typing.NamedTuple): 273 value: SingleValue 274 quality: IndicationQuality
IoElement_M_SP_NA(value, quality)
Create new instance of IoElement_M_SP_NA(value, quality)
277class IoElement_M_SP_TA(typing.NamedTuple): 278 value: SingleValue 279 quality: IndicationQuality
IoElement_M_SP_TA(value, quality)
Create new instance of IoElement_M_SP_TA(value, quality)
282class IoElement_M_DP_NA(typing.NamedTuple): 283 value: DoubleValue 284 quality: IndicationQuality
IoElement_M_DP_NA(value, quality)
Create new instance of IoElement_M_DP_NA(value, quality)
287class IoElement_M_DP_TA(typing.NamedTuple): 288 value: DoubleValue 289 quality: IndicationQuality
IoElement_M_DP_TA(value, quality)
Create new instance of IoElement_M_DP_TA(value, quality)
292class IoElement_M_ST_NA(typing.NamedTuple): 293 value: StepPositionValue 294 quality: MeasurementQuality
IoElement_M_ST_NA(value, quality)
Create new instance of IoElement_M_ST_NA(value, quality)
297class IoElement_M_ST_TA(typing.NamedTuple): 298 value: StepPositionValue 299 quality: MeasurementQuality
IoElement_M_ST_TA(value, quality)
Create new instance of IoElement_M_ST_TA(value, quality)
302class IoElement_M_BO_NA(typing.NamedTuple): 303 value: BitstringValue 304 quality: MeasurementQuality
IoElement_M_BO_NA(value, quality)
Create new instance of IoElement_M_BO_NA(value, quality)
307class IoElement_M_BO_TA(typing.NamedTuple): 308 value: BitstringValue 309 quality: MeasurementQuality
IoElement_M_BO_TA(value, quality)
Create new instance of IoElement_M_BO_TA(value, quality)
312class IoElement_M_ME_NA(typing.NamedTuple): 313 value: NormalizedValue 314 quality: MeasurementQuality
IoElement_M_ME_NA(value, quality)
Create new instance of IoElement_M_ME_NA(value, quality)
317class IoElement_M_ME_TA(typing.NamedTuple): 318 value: NormalizedValue 319 quality: MeasurementQuality
IoElement_M_ME_TA(value, quality)
Create new instance of IoElement_M_ME_TA(value, quality)
322class IoElement_M_ME_NB(typing.NamedTuple): 323 value: ScaledValue 324 quality: MeasurementQuality
IoElement_M_ME_NB(value, quality)
Create new instance of IoElement_M_ME_NB(value, quality)
327class IoElement_M_ME_TB(typing.NamedTuple): 328 value: ScaledValue 329 quality: MeasurementQuality
IoElement_M_ME_TB(value, quality)
Create new instance of IoElement_M_ME_TB(value, quality)
332class IoElement_M_ME_NC(typing.NamedTuple): 333 value: FloatingValue 334 quality: MeasurementQuality
IoElement_M_ME_NC(value, quality)
Create new instance of IoElement_M_ME_NC(value, quality)
337class IoElement_M_ME_TC(typing.NamedTuple): 338 value: FloatingValue 339 quality: MeasurementQuality
IoElement_M_ME_TC(value, quality)
Create new instance of IoElement_M_ME_TC(value, quality)
342class IoElement_M_IT_NA(typing.NamedTuple): 343 value: BinaryCounterValue 344 quality: CounterQuality
IoElement_M_IT_NA(value, quality)
Create new instance of IoElement_M_IT_NA(value, quality)
347class IoElement_M_IT_TA(typing.NamedTuple): 348 value: BinaryCounterValue 349 quality: CounterQuality
IoElement_M_IT_TA(value, quality)
Create new instance of IoElement_M_IT_TA(value, quality)
352class IoElement_M_EP_TA(typing.NamedTuple): 353 value: ProtectionValue 354 quality: ProtectionQuality 355 elapsed_time: int 356 """elapsed_time in range [0, 65535]"""
IoElement_M_EP_TA(value, quality, elapsed_time)
Create new instance of IoElement_M_EP_TA(value, quality, elapsed_time)
359class IoElement_M_EP_TB(typing.NamedTuple): 360 value: ProtectionStartValue 361 quality: ProtectionQuality 362 duration_time: int 363 """duration_time in range [0, 65535]"""
IoElement_M_EP_TB(value, quality, duration_time)
Create new instance of IoElement_M_EP_TB(value, quality, duration_time)
366class IoElement_M_EP_TC(typing.NamedTuple): 367 value: ProtectionCommandValue 368 quality: ProtectionQuality 369 operating_time: int 370 """operating_time in range [0, 65535]"""
IoElement_M_EP_TC(value, quality, operating_time)
Create new instance of IoElement_M_EP_TC(value, quality, operating_time)
373class IoElement_M_PS_NA(typing.NamedTuple): 374 value: StatusValue 375 quality: MeasurementQuality
IoElement_M_PS_NA(value, quality)
Create new instance of IoElement_M_PS_NA(value, quality)
IoElement_M_ME_ND(value,)
382class IoElement_M_SP_TB(typing.NamedTuple): 383 value: SingleValue 384 quality: IndicationQuality
IoElement_M_SP_TB(value, quality)
Create new instance of IoElement_M_SP_TB(value, quality)
387class IoElement_M_DP_TB(typing.NamedTuple): 388 value: DoubleValue 389 quality: IndicationQuality
IoElement_M_DP_TB(value, quality)
Create new instance of IoElement_M_DP_TB(value, quality)
392class IoElement_M_ST_TB(typing.NamedTuple): 393 value: StepPositionValue 394 quality: MeasurementQuality
IoElement_M_ST_TB(value, quality)
Create new instance of IoElement_M_ST_TB(value, quality)
397class IoElement_M_BO_TB(typing.NamedTuple): 398 value: BitstringValue 399 quality: MeasurementQuality
IoElement_M_BO_TB(value, quality)
Create new instance of IoElement_M_BO_TB(value, quality)
402class IoElement_M_ME_TD(typing.NamedTuple): 403 value: NormalizedValue 404 quality: MeasurementQuality
IoElement_M_ME_TD(value, quality)
Create new instance of IoElement_M_ME_TD(value, quality)
407class IoElement_M_ME_TE(typing.NamedTuple): 408 value: ScaledValue 409 quality: MeasurementQuality
IoElement_M_ME_TE(value, quality)
Create new instance of IoElement_M_ME_TE(value, quality)
412class IoElement_M_ME_TF(typing.NamedTuple): 413 value: FloatingValue 414 quality: MeasurementQuality
IoElement_M_ME_TF(value, quality)
Create new instance of IoElement_M_ME_TF(value, quality)
417class IoElement_M_IT_TB(typing.NamedTuple): 418 value: BinaryCounterValue 419 quality: CounterQuality
IoElement_M_IT_TB(value, quality)
Create new instance of IoElement_M_IT_TB(value, quality)
422class IoElement_M_EP_TD(typing.NamedTuple): 423 value: ProtectionValue 424 quality: ProtectionQuality 425 elapsed_time: int 426 """elapsed_time in range [0, 65535]"""
IoElement_M_EP_TD(value, quality, elapsed_time)
Create new instance of IoElement_M_EP_TD(value, quality, elapsed_time)
429class IoElement_M_EP_TE(typing.NamedTuple): 430 value: ProtectionStartValue 431 quality: ProtectionQuality 432 duration_time: int 433 """duration_time in range [0, 65535]"""
IoElement_M_EP_TE(value, quality, duration_time)
Create new instance of IoElement_M_EP_TE(value, quality, duration_time)
436class IoElement_M_EP_TF(typing.NamedTuple): 437 value: ProtectionCommandValue 438 quality: ProtectionQuality 439 operating_time: int 440 """operating_time in range [0, 65535]"""
IoElement_M_EP_TF(value, quality, operating_time)
Create new instance of IoElement_M_EP_TF(value, quality, operating_time)
443class IoElement_C_SC_NA(typing.NamedTuple): 444 value: SingleValue 445 select: bool 446 qualifier: int 447 """qualifier in range [0, 31]"""
IoElement_C_SC_NA(value, select, qualifier)
Create new instance of IoElement_C_SC_NA(value, select, qualifier)
450class IoElement_C_DC_NA(typing.NamedTuple): 451 value: DoubleValue 452 select: bool 453 qualifier: int 454 """qualifier in range [0, 31]"""
IoElement_C_DC_NA(value, select, qualifier)
Create new instance of IoElement_C_DC_NA(value, select, qualifier)
457class IoElement_C_RC_NA(typing.NamedTuple): 458 value: RegulatingValue 459 select: bool 460 qualifier: int 461 """qualifier in range [0, 31]"""
IoElement_C_RC_NA(value, select, qualifier)
Create new instance of IoElement_C_RC_NA(value, select, qualifier)
IoElement_C_SE_NA(value, select)
Create new instance of IoElement_C_SE_NA(value, select)
IoElement_C_SE_NB(value, select)
Create new instance of IoElement_C_SE_NB(value, select)
IoElement_C_SE_NC(value, select)
Create new instance of IoElement_C_SE_NC(value, select)
IoElement_C_BO_NA(value,)
483class IoElement_M_EI_NA(typing.NamedTuple): 484 param_change: bool 485 cause: int 486 """cause in range [0, 127]"""
IoElement_M_EI_NA(param_change, cause)
489class IoElement_C_IC_NA(typing.NamedTuple): 490 qualifier: int 491 """qualifier in range [0, 255]"""
IoElement_C_IC_NA(qualifier,)
494class IoElement_C_CI_NA(typing.NamedTuple): 495 request: int 496 """request in range [0, 63]""" 497 freeze: FreezeCode
IoElement_C_CI_NA(request, freeze)
Create new instance of IoElement_C_CI_NA(request, freeze)
IoElement_C_RD_NA()
IoElement_C_CS_NA(time,)
IoElement_C_TS_NA()
513class IoElement_C_RP_NA(typing.NamedTuple): 514 qualifier: int 515 """qualifier in range [0, 255]"""
IoElement_C_RP_NA(qualifier,)
IoElement_C_CD_NA(time,)
523class IoElement_P_ME_NA(typing.NamedTuple): 524 value: NormalizedValue 525 qualifier: int 526 """qualifier in range [0, 255]"""
IoElement_P_ME_NA(value, qualifier)
Create new instance of IoElement_P_ME_NA(value, qualifier)
529class IoElement_P_ME_NB(typing.NamedTuple): 530 value: ScaledValue 531 qualifier: int 532 """qualifier in range [0, 255]"""
IoElement_P_ME_NB(value, qualifier)
Create new instance of IoElement_P_ME_NB(value, qualifier)
535class IoElement_P_ME_NC(typing.NamedTuple): 536 value: FloatingValue 537 qualifier: int 538 """qualifier in range [0, 255]"""
IoElement_P_ME_NC(value, qualifier)
Create new instance of IoElement_P_ME_NC(value, qualifier)
541class IoElement_P_AC_NA(typing.NamedTuple): 542 qualifier: int 543 """qualifier in range [0, 255]"""
IoElement_P_AC_NA(qualifier,)
546class IoElement_F_FR_NA(typing.NamedTuple): 547 file_name: int 548 """file_name in range [0, 65535]""" 549 file_length: int 550 """file_length in range [0, 16777215]""" 551 ready: bool
IoElement_F_FR_NA(file_name, file_length, ready)
554class IoElement_F_SR_NA(typing.NamedTuple): 555 file_name: int 556 """file_name in range [0, 65535]""" 557 section_name: int 558 """section_name in range [0, 255]""" 559 section_length: int 560 """section_length in range [0, 16777215]""" 561 ready: bool
IoElement_F_SR_NA(file_name, section_name, section_length, ready)
564class IoElement_F_SC_NA(typing.NamedTuple): 565 file_name: int 566 """file_name in range [0, 65535]""" 567 section_name: int 568 """section_name in range [0, 255]""" 569 qualifier: int 570 """qualifier in range [0, 255]"""
IoElement_F_SC_NA(file_name, section_name, qualifier)
573class IoElement_F_LS_NA(typing.NamedTuple): 574 file_name: int 575 """file_name in range [0, 65535]""" 576 section_name: int 577 """section_name in range [0, 255]""" 578 last_qualifier: int 579 """last_qualifier in range [0, 255]""" 580 checksum: int 581 """checksum in range [0, 255]"""
IoElement_F_LS_NA(file_name, section_name, last_qualifier, checksum)
584class IoElement_F_AF_NA(typing.NamedTuple): 585 file_name: int 586 """file_name in range [0, 65535]""" 587 section_name: int 588 """section_name in range [0, 255]""" 589 qualifier: int 590 """qualifier in range [0, 255]"""
IoElement_F_AF_NA(file_name, section_name, qualifier)
593class IoElement_F_SG_NA(typing.NamedTuple): 594 file_name: int 595 """file_name in range [0, 65535]""" 596 section_name: int 597 """section_name in range [0, 255]""" 598 segment: util.Bytes
IoElement_F_SG_NA(file_name, section_name, segment)
601class IoElement_F_DR_TA(typing.NamedTuple): 602 file_name: int 603 """file_name in range [0, 65535]""" 604 file_length: int 605 """file_length in range [0, 16777215]""" 606 more_follows: bool 607 is_directory: bool 608 transfer_active: bool 609 creation_time: Time
IoElement_F_DR_TA(file_name, file_length, more_follows, is_directory, transfer_active, creation_time)
Create new instance of IoElement_F_DR_TA(file_name, file_length, more_follows, is_directory, transfer_active, creation_time)
672class IO(typing.NamedTuple): 673 address: IoAddress 674 elements: list[IoElement] 675 time: Time | None
IO(address, elements, time)
Create new instance of IO(address, elements, time)
Alias for field number 1
678class ASDU(typing.NamedTuple): 679 type: AsduType 680 cause: Cause 681 address: AsduAddress 682 ios: list[IO]
ASDU(type, cause, address, ios)
12class Encoder: 13 14 def __init__(self, 15 cause_size: common.CauseSize, 16 asdu_address_size: common.AsduAddressSize, 17 io_address_size: common.IoAddressSize, 18 max_asdu_size: int = 252): 19 self._cause_size = cause_size 20 self._max_asdu_size = max_asdu_size 21 self._encoder = encoder.Encoder( 22 cause_size=cause_size, 23 asdu_address_size=asdu_address_size, 24 io_address_size=io_address_size, 25 asdu_type_time_sizes=asdu_type_time_sizes, 26 inverted_sequence_bit=False, 27 decode_io_element_cb=decode_io_element, 28 encode_io_element_cb=encode_io_element) 29 30 @property 31 def max_asdu_size(self) -> int: 32 return self._max_asdu_size 33 34 @property 35 def cause_size(self) -> common.CauseSize: 36 return self._encoder.cause_size 37 38 @property 39 def asdu_address_size(self) -> common.AsduAddressSize: 40 return self._encoder.asdu_address_size 41 42 @property 43 def io_address_size(self) -> common.IoAddressSize: 44 return self._encoder.io_address_size 45 46 def decode_asdu(self, 47 asdu_bytes: util.Bytes 48 ) -> tuple[common.ASDU, util.Bytes]: 49 asdu, rest = self._encoder.decode_asdu(asdu_bytes) 50 51 asdu_type = _decode_asdu_type(asdu.type) 52 53 cause = decode_cause(asdu.cause, self._cause_size) 54 address = asdu.address 55 ios = [common.IO(address=io.address, 56 elements=io.elements, 57 time=io.time) 58 for io in asdu.ios] 59 60 asdu = common.ASDU(type=asdu_type, 61 cause=cause, 62 address=address, 63 ios=ios) 64 return asdu, rest 65 66 def encode_asdu(self, asdu: common.ASDU) -> util.Bytes: 67 asdu_type = asdu.type.value 68 cause = encode_cause(asdu.cause, self._cause_size) 69 address = asdu.address 70 ios = [encoder.common.IO(address=io.address, 71 elements=io.elements, 72 time=io.time) 73 for io in asdu.ios] 74 75 asdu = encoder.common.ASDU(type=asdu_type, 76 cause=cause, 77 address=address, 78 ios=ios) 79 80 return self._encoder.encode_asdu(asdu)
14 def __init__(self, 15 cause_size: common.CauseSize, 16 asdu_address_size: common.AsduAddressSize, 17 io_address_size: common.IoAddressSize, 18 max_asdu_size: int = 252): 19 self._cause_size = cause_size 20 self._max_asdu_size = max_asdu_size 21 self._encoder = encoder.Encoder( 22 cause_size=cause_size, 23 asdu_address_size=asdu_address_size, 24 io_address_size=io_address_size, 25 asdu_type_time_sizes=asdu_type_time_sizes, 26 inverted_sequence_bit=False, 27 decode_io_element_cb=decode_io_element, 28 encode_io_element_cb=encode_io_element)
46 def decode_asdu(self, 47 asdu_bytes: util.Bytes 48 ) -> tuple[common.ASDU, util.Bytes]: 49 asdu, rest = self._encoder.decode_asdu(asdu_bytes) 50 51 asdu_type = _decode_asdu_type(asdu.type) 52 53 cause = decode_cause(asdu.cause, self._cause_size) 54 address = asdu.address 55 ios = [common.IO(address=io.address, 56 elements=io.elements, 57 time=io.time) 58 for io in asdu.ios] 59 60 asdu = common.ASDU(type=asdu_type, 61 cause=cause, 62 address=address, 63 ios=ios) 64 return asdu, rest
66 def encode_asdu(self, asdu: common.ASDU) -> util.Bytes: 67 asdu_type = asdu.type.value 68 cause = encode_cause(asdu.cause, self._cause_size) 69 address = asdu.address 70 ios = [encoder.common.IO(address=io.address, 71 elements=io.elements, 72 time=io.time) 73 for io in asdu.ios] 74 75 asdu = encoder.common.ASDU(type=asdu_type, 76 cause=cause, 77 address=address, 78 ios=ios) 79 80 return self._encoder.encode_asdu(asdu)
107def decode_cause(cause: int, 108 cause_size: common.CauseSize 109 ) -> common.Cause: 110 cause_type = decode_cause_type(cause & 0x3F) 111 is_negative_confirm = bool(cause & 0x40) 112 is_test = bool(cause & 0x80) 113 114 if cause_size == common.CauseSize.ONE: 115 originator_address = 0 116 117 elif cause_size == common.CauseSize.TWO: 118 originator_address = cause >> 8 119 120 else: 121 raise ValueError('unsupported cause size') 122 123 return common.Cause(type=cause_type, 124 is_negative_confirm=is_negative_confirm, 125 is_test=is_test, 126 originator_address=originator_address)
129def encode_cause(cause: common.Cause, 130 cause_size: common.CauseSize 131 ) -> int: 132 result = ((0x80 if cause.is_test else 0) | 133 (0x40 if cause.is_negative_confirm else 0) | 134 encode_cause_type(cause.type)) 135 136 if cause_size == common.CauseSize.ONE: 137 return result 138 139 if cause_size == common.CauseSize.TWO: 140 return result | (cause.originator_address << 8) 141 142 raise ValueError('unsupported cause size')
158def decode_io_element(io_bytes: util.Bytes, 159 asdu_type: int 160 ) -> tuple[common.IoElement, util.Bytes]: 161 asdu_type = _decode_asdu_type(asdu_type) 162 163 if asdu_type == common.AsduType.M_SP_NA: 164 value = common.SingleValue(io_bytes[0] & 1) 165 quality, io_bytes = decode_quality(io_bytes, 166 common.QualityType.INDICATION) 167 168 element = common.IoElement_M_SP_NA(value=value, 169 quality=quality) 170 return element, io_bytes 171 172 if asdu_type == common.AsduType.M_SP_TA: 173 value = common.SingleValue(io_bytes[0] & 1) 174 quality, io_bytes = decode_quality(io_bytes, 175 common.QualityType.INDICATION) 176 177 element = common.IoElement_M_SP_TA(value=value, 178 quality=quality) 179 return element, io_bytes 180 181 if asdu_type == common.AsduType.M_DP_NA: 182 value = common.DoubleValue(io_bytes[0] & 3) 183 quality, io_bytes = decode_quality(io_bytes, 184 common.QualityType.INDICATION) 185 186 element = common.IoElement_M_DP_NA(value=value, 187 quality=quality) 188 return element, io_bytes 189 190 if asdu_type == common.AsduType.M_DP_TA: 191 value = common.DoubleValue(io_bytes[0] & 3) 192 quality, io_bytes = decode_quality(io_bytes, 193 common.QualityType.INDICATION) 194 195 element = common.IoElement_M_DP_TA(value=value, 196 quality=quality) 197 return element, io_bytes 198 199 if asdu_type == common.AsduType.M_ST_NA: 200 value, io_bytes = decode_step_position_value(io_bytes) 201 quality, io_bytes = decode_quality(io_bytes, 202 common.QualityType.MEASUREMENT) 203 204 element = common.IoElement_M_ST_NA(value=value, 205 quality=quality) 206 return element, io_bytes 207 208 if asdu_type == common.AsduType.M_ST_TA: 209 value, io_bytes = decode_step_position_value(io_bytes) 210 quality, io_bytes = decode_quality(io_bytes, 211 common.QualityType.MEASUREMENT) 212 213 element = common.IoElement_M_ST_TA(value=value, 214 quality=quality) 215 return element, io_bytes 216 217 if asdu_type == common.AsduType.M_BO_NA: 218 value, io_bytes = decode_bitstring_value(io_bytes) 219 quality, io_bytes = decode_quality(io_bytes, 220 common.QualityType.MEASUREMENT) 221 222 element = common.IoElement_M_BO_NA(value=value, 223 quality=quality) 224 return element, io_bytes 225 226 if asdu_type == common.AsduType.M_BO_TA: 227 value, io_bytes = decode_bitstring_value(io_bytes) 228 quality, io_bytes = decode_quality(io_bytes, 229 common.QualityType.MEASUREMENT) 230 231 element = common.IoElement_M_BO_NA(value=value, 232 quality=quality) 233 return element, io_bytes 234 235 if asdu_type == common.AsduType.M_ME_NA: 236 value, io_bytes = decode_normalized_value(io_bytes) 237 quality, io_bytes = decode_quality(io_bytes, 238 common.QualityType.MEASUREMENT) 239 240 element = common.IoElement_M_ME_NA(value=value, 241 quality=quality) 242 return element, io_bytes 243 244 if asdu_type == common.AsduType.M_ME_TA: 245 value, io_bytes = decode_normalized_value(io_bytes) 246 quality, io_bytes = decode_quality(io_bytes, 247 common.QualityType.MEASUREMENT) 248 249 element = common.IoElement_M_ME_TA(value=value, 250 quality=quality) 251 return element, io_bytes 252 253 if asdu_type == common.AsduType.M_ME_NB: 254 value, io_bytes = decode_scaled_value(io_bytes) 255 quality, io_bytes = decode_quality(io_bytes, 256 common.QualityType.MEASUREMENT) 257 258 element = common.IoElement_M_ME_NB(value=value, 259 quality=quality) 260 return element, io_bytes 261 262 if asdu_type == common.AsduType.M_ME_TB: 263 value, io_bytes = decode_scaled_value(io_bytes) 264 quality, io_bytes = decode_quality(io_bytes, 265 common.QualityType.MEASUREMENT) 266 267 element = common.IoElement_M_ME_TB(value=value, 268 quality=quality) 269 return element, io_bytes 270 271 if asdu_type == common.AsduType.M_ME_NC: 272 value, io_bytes = decode_floating_value(io_bytes) 273 quality, io_bytes = decode_quality(io_bytes, 274 common.QualityType.MEASUREMENT) 275 276 element = common.IoElement_M_ME_NC(value=value, 277 quality=quality) 278 return element, io_bytes 279 280 if asdu_type == common.AsduType.M_ME_TC: 281 value, io_bytes = decode_floating_value(io_bytes) 282 quality, io_bytes = decode_quality(io_bytes, 283 common.QualityType.MEASUREMENT) 284 285 element = common.IoElement_M_ME_TC(value=value, 286 quality=quality) 287 return element, io_bytes 288 289 if asdu_type == common.AsduType.M_IT_NA: 290 value, io_bytes = decode_binary_counter_value(io_bytes) 291 quality, io_bytes = decode_quality(io_bytes, 292 common.QualityType.COUNTER) 293 294 element = common.IoElement_M_IT_NA(value=value, 295 quality=quality) 296 return element, io_bytes 297 298 if asdu_type == common.AsduType.M_IT_TA: 299 value, io_bytes = decode_binary_counter_value(io_bytes) 300 quality, io_bytes = decode_quality(io_bytes, 301 common.QualityType.COUNTER) 302 303 element = common.IoElement_M_IT_TA(value=value, 304 quality=quality) 305 return element, io_bytes 306 307 if asdu_type == common.AsduType.M_EP_TA: 308 value = common.ProtectionValue(io_bytes[0] & 0x03) 309 quality, io_bytes = decode_quality(io_bytes, 310 common.QualityType.PROTECTION) 311 elapsed_time = int.from_bytes(io_bytes[:2], 'little') 312 io_bytes = io_bytes[2:] 313 314 element = common.IoElement_M_EP_TA(value=value, 315 quality=quality, 316 elapsed_time=elapsed_time) 317 return element, io_bytes 318 319 if asdu_type == common.AsduType.M_EP_TB: 320 value, io_bytes = decode_protection_start_value(io_bytes) 321 quality, io_bytes = decode_quality(io_bytes, 322 common.QualityType.PROTECTION) 323 duration_time = int.from_bytes(io_bytes[:2], 'little') 324 io_bytes = io_bytes[2:] 325 326 element = common.IoElement_M_EP_TB(value=value, 327 quality=quality, 328 duration_time=duration_time) 329 return element, io_bytes 330 331 if asdu_type == common.AsduType.M_EP_TC: 332 value, io_bytes = decode_protection_command_value(io_bytes) 333 quality, io_bytes = decode_quality(io_bytes, 334 common.QualityType.PROTECTION) 335 operating_time = int.from_bytes(io_bytes[:2], 'little') 336 io_bytes = io_bytes[2:] 337 338 element = common.IoElement_M_EP_TC(value=value, 339 quality=quality, 340 operating_time=operating_time) 341 return element, io_bytes 342 343 if asdu_type == common.AsduType.M_PS_NA: 344 value, io_bytes = decode_status_value(io_bytes) 345 quality, io_bytes = decode_quality(io_bytes, 346 common.QualityType.MEASUREMENT) 347 348 element = common.IoElement_M_PS_NA(value=value, 349 quality=quality) 350 return element, io_bytes 351 352 if asdu_type == common.AsduType.M_ME_ND: 353 value, io_bytes = decode_normalized_value(io_bytes) 354 355 element = common.IoElement_M_ME_ND(value=value) 356 return element, io_bytes 357 358 if asdu_type == common.AsduType.M_SP_TB: 359 value = common.SingleValue(io_bytes[0] & 1) 360 quality, io_bytes = decode_quality(io_bytes, 361 common.QualityType.INDICATION) 362 363 element = common.IoElement_M_SP_TB(value=value, 364 quality=quality) 365 return element, io_bytes 366 367 if asdu_type == common.AsduType.M_DP_TB: 368 value = common.DoubleValue(io_bytes[0] & 3) 369 quality, io_bytes = decode_quality(io_bytes, 370 common.QualityType.INDICATION) 371 372 element = common.IoElement_M_DP_TB(value=value, 373 quality=quality) 374 return element, io_bytes 375 376 if asdu_type == common.AsduType.M_ST_TB: 377 value, io_bytes = decode_step_position_value(io_bytes) 378 quality, io_bytes = decode_quality(io_bytes, 379 common.QualityType.MEASUREMENT) 380 381 element = common.IoElement_M_ST_TB(value=value, 382 quality=quality) 383 return element, io_bytes 384 385 if asdu_type == common.AsduType.M_BO_TB: 386 value, io_bytes = decode_bitstring_value(io_bytes) 387 quality, io_bytes = decode_quality(io_bytes, 388 common.QualityType.MEASUREMENT) 389 390 element = common.IoElement_M_BO_TB(value=value, 391 quality=quality) 392 return element, io_bytes 393 394 if asdu_type == common.AsduType.M_ME_TD: 395 value, io_bytes = decode_normalized_value(io_bytes) 396 quality, io_bytes = decode_quality(io_bytes, 397 common.QualityType.MEASUREMENT) 398 399 element = common.IoElement_M_ME_TD(value=value, 400 quality=quality) 401 return element, io_bytes 402 403 if asdu_type == common.AsduType.M_ME_TE: 404 value, io_bytes = decode_scaled_value(io_bytes) 405 quality, io_bytes = decode_quality(io_bytes, 406 common.QualityType.MEASUREMENT) 407 408 element = common.IoElement_M_ME_TE(value=value, 409 quality=quality) 410 return element, io_bytes 411 412 if asdu_type == common.AsduType.M_ME_TF: 413 value, io_bytes = decode_floating_value(io_bytes) 414 quality, io_bytes = decode_quality(io_bytes, 415 common.QualityType.MEASUREMENT) 416 417 element = common.IoElement_M_ME_TF(value=value, 418 quality=quality) 419 return element, io_bytes 420 421 if asdu_type == common.AsduType.M_IT_TB: 422 value, io_bytes = decode_binary_counter_value(io_bytes) 423 quality, io_bytes = decode_quality(io_bytes, 424 common.QualityType.COUNTER) 425 426 element = common.IoElement_M_IT_TB(value=value, 427 quality=quality) 428 return element, io_bytes 429 430 if asdu_type == common.AsduType.M_EP_TD: 431 value = common.ProtectionValue(io_bytes[0] & 0x03) 432 quality, io_bytes = decode_quality(io_bytes, 433 common.QualityType.PROTECTION) 434 elapsed_time = int.from_bytes(io_bytes[:2], 'little') 435 io_bytes = io_bytes[2:] 436 437 element = common.IoElement_M_EP_TD(value=value, 438 quality=quality, 439 elapsed_time=elapsed_time) 440 return element, io_bytes 441 442 if asdu_type == common.AsduType.M_EP_TE: 443 value, io_bytes = decode_protection_start_value(io_bytes) 444 quality, io_bytes = decode_quality(io_bytes, 445 common.QualityType.PROTECTION) 446 duration_time = int.from_bytes(io_bytes[:2], 'little') 447 io_bytes = io_bytes[2:] 448 449 element = common.IoElement_M_EP_TE(value=value, 450 quality=quality, 451 duration_time=duration_time) 452 return element, io_bytes 453 454 if asdu_type == common.AsduType.M_EP_TF: 455 value, io_bytes = decode_protection_command_value(io_bytes) 456 quality, io_bytes = decode_quality(io_bytes, 457 common.QualityType.PROTECTION) 458 operating_time = int.from_bytes(io_bytes[:2], 'little') 459 io_bytes = io_bytes[2:] 460 461 element = common.IoElement_M_EP_TF(value=value, 462 quality=quality, 463 operating_time=operating_time) 464 return element, io_bytes 465 466 if asdu_type == common.AsduType.C_SC_NA: 467 value = common.SingleValue(io_bytes[0] & 1) 468 select = bool(io_bytes[0] & 0x80) 469 qualifier = (io_bytes[0] >> 2) & 0x1F 470 io_bytes = io_bytes[1:] 471 472 element = common.IoElement_C_SC_NA(value=value, 473 select=select, 474 qualifier=qualifier) 475 return element, io_bytes 476 477 if asdu_type == common.AsduType.C_DC_NA: 478 value = common.DoubleValue(io_bytes[0] & 3) 479 select = bool(io_bytes[0] & 0x80) 480 qualifier = (io_bytes[0] >> 2) & 0x1F 481 io_bytes = io_bytes[1:] 482 483 element = common.IoElement_C_DC_NA(value=value, 484 select=select, 485 qualifier=qualifier) 486 return element, io_bytes 487 488 if asdu_type == common.AsduType.C_RC_NA: 489 value = common.RegulatingValue(io_bytes[0] & 3) 490 select = bool(io_bytes[0] & 0x80) 491 qualifier = (io_bytes[0] >> 2) & 0x1F 492 io_bytes = io_bytes[1:] 493 494 element = common.IoElement_C_RC_NA(value=value, 495 select=select, 496 qualifier=qualifier) 497 return element, io_bytes 498 499 if asdu_type == common.AsduType.C_SE_NA: 500 value, io_bytes = decode_normalized_value(io_bytes) 501 select = bool(io_bytes[0] & 0x80) 502 io_bytes = io_bytes[1:] 503 504 element = common.IoElement_C_SE_NA(value=value, 505 select=select) 506 return element, io_bytes 507 508 if asdu_type == common.AsduType.C_SE_NB: 509 value, io_bytes = decode_scaled_value(io_bytes) 510 select = bool(io_bytes[0] & 0x80) 511 io_bytes = io_bytes[1:] 512 513 element = common.IoElement_C_SE_NB(value=value, 514 select=select) 515 return element, io_bytes 516 517 if asdu_type == common.AsduType.C_SE_NC: 518 value, io_bytes = decode_floating_value(io_bytes) 519 select = bool(io_bytes[0] & 0x80) 520 io_bytes = io_bytes[1:] 521 522 element = common.IoElement_C_SE_NC(value=value, 523 select=select) 524 return element, io_bytes 525 526 if asdu_type == common.AsduType.C_BO_NA: 527 value, io_bytes = decode_bitstring_value(io_bytes) 528 529 element = common.IoElement_C_BO_NA(value=value) 530 return element, io_bytes 531 532 if asdu_type == common.AsduType.M_EI_NA: 533 param_change = bool(io_bytes[0] & 0x80) 534 cause = io_bytes[0] & 0x7F 535 io_bytes = io_bytes[1:] 536 537 element = common.IoElement_M_EI_NA(param_change=param_change, 538 cause=cause) 539 return element, io_bytes 540 541 if asdu_type == common.AsduType.C_IC_NA: 542 qualifier, io_bytes = io_bytes[0], io_bytes[1:] 543 544 element = common.IoElement_C_IC_NA(qualifier=qualifier) 545 return element, io_bytes 546 547 if asdu_type == common.AsduType.C_CI_NA: 548 request = io_bytes[0] & 0x3F 549 freeze = common.FreezeCode(io_bytes[0] >> 6) 550 io_bytes = io_bytes[1:] 551 552 element = common.IoElement_C_CI_NA(request=request, 553 freeze=freeze) 554 return element, io_bytes 555 556 if asdu_type == common.AsduType.C_RD_NA: 557 element = common.IoElement_C_RD_NA() 558 return element, io_bytes 559 560 if asdu_type == common.AsduType.C_CS_NA: 561 time = encoder.decode_time(io_bytes[:7], common.TimeSize.SEVEN) 562 io_bytes = io_bytes[7:] 563 564 element = common.IoElement_C_CS_NA(time=time) 565 return element, io_bytes 566 567 if asdu_type == common.AsduType.C_TS_NA: 568 io_bytes = io_bytes[2:] 569 570 element = common.IoElement_C_TS_NA() 571 return element, io_bytes 572 573 if asdu_type == common.AsduType.C_RP_NA: 574 qualifier, io_bytes = io_bytes[0], io_bytes[1:] 575 576 element = common.IoElement_C_RP_NA(qualifier=qualifier) 577 return element, io_bytes 578 579 if asdu_type == common.AsduType.C_CD_NA: 580 time = int.from_bytes(io_bytes[:2], 'little') 581 io_bytes = io_bytes[2:] 582 583 element = common.IoElement_C_CD_NA(time=time) 584 return element, io_bytes 585 586 if asdu_type == common.AsduType.P_ME_NA: 587 value, io_bytes = decode_normalized_value(io_bytes) 588 qualifier, io_bytes = io_bytes[0], io_bytes[1:] 589 590 element = common.IoElement_P_ME_NA(value=value, 591 qualifier=qualifier) 592 return element, io_bytes 593 594 if asdu_type == common.AsduType.P_ME_NB: 595 value, io_bytes = decode_scaled_value(io_bytes) 596 qualifier, io_bytes = io_bytes[0], io_bytes[1:] 597 598 element = common.IoElement_P_ME_NB(value=value, 599 qualifier=qualifier) 600 return element, io_bytes 601 602 if asdu_type == common.AsduType.P_ME_NC: 603 value, io_bytes = decode_floating_value(io_bytes) 604 qualifier, io_bytes = io_bytes[0], io_bytes[1:] 605 606 element = common.IoElement_P_ME_NC(value=value, 607 qualifier=qualifier) 608 return element, io_bytes 609 610 if asdu_type == common.AsduType.P_AC_NA: 611 qualifier, io_bytes = io_bytes[0], io_bytes[1:] 612 613 element = common.IoElement_P_AC_NA(qualifier=qualifier) 614 return element, io_bytes 615 616 if asdu_type == common.AsduType.F_FR_NA: 617 file_name = int.from_bytes(io_bytes[:2], 'little') 618 file_length = int.from_bytes(io_bytes[2:5], 'little') 619 ready = bool(io_bytes[5] & 0x80) 620 io_bytes = io_bytes[6:] 621 622 element = common.IoElement_F_FR_NA(file_name=file_name, 623 file_length=file_length, 624 ready=ready) 625 return element, io_bytes 626 627 if asdu_type == common.AsduType.F_SR_NA: 628 file_name = int.from_bytes(io_bytes[:2], 'little') 629 section_name = io_bytes[2] 630 section_length = int.from_bytes(io_bytes[3:6], 'little') 631 ready = bool(io_bytes[6] & 0x80) 632 io_bytes = io_bytes[7:] 633 634 element = common.IoElement_F_SR_NA(file_name=file_name, 635 section_name=section_name, 636 section_length=section_length, 637 ready=ready) 638 return element, io_bytes 639 640 if asdu_type == common.AsduType.F_SC_NA: 641 file_name = int.from_bytes(io_bytes[:2], 'little') 642 section_name = io_bytes[2] 643 qualifier = io_bytes[3] 644 io_bytes = io_bytes[4:] 645 646 element = common.IoElement_F_SC_NA(file_name=file_name, 647 section_name=section_name, 648 qualifier=qualifier) 649 return element, io_bytes 650 651 if asdu_type == common.AsduType.F_LS_NA: 652 file_name = int.from_bytes(io_bytes[:2], 'little') 653 section_name = io_bytes[2] 654 last_qualifier = io_bytes[3] 655 checksum = io_bytes[4] 656 io_bytes = io_bytes[5:] 657 658 element = common.IoElement_F_LS_NA(file_name=file_name, 659 section_name=section_name, 660 last_qualifier=last_qualifier, 661 checksum=checksum) 662 return element, io_bytes 663 664 if asdu_type == common.AsduType.F_AF_NA: 665 file_name = int.from_bytes(io_bytes[:2], 'little') 666 section_name = io_bytes[2] 667 qualifier = io_bytes[3] 668 io_bytes = io_bytes[4:] 669 670 element = common.IoElement_F_AF_NA(file_name=file_name, 671 section_name=section_name, 672 qualifier=qualifier) 673 return element, io_bytes 674 675 if asdu_type == common.AsduType.F_SG_NA: 676 file_name = int.from_bytes(io_bytes[:2], 'little') 677 section_name = io_bytes[2] 678 length = io_bytes[3] 679 segment = io_bytes[4:4+length] 680 io_bytes = io_bytes[4+length:] 681 682 element = common.IoElement_F_SG_NA(file_name=file_name, 683 section_name=section_name, 684 segment=segment) 685 return element, io_bytes 686 687 if asdu_type == common.AsduType.F_DR_TA: 688 file_name = int.from_bytes(io_bytes[:2], 'little') 689 file_length = int.from_bytes(io_bytes[2:5], 'little') 690 more_follows = bool(io_bytes[5] & 0x20) 691 is_directory = bool(io_bytes[5] & 0x40) 692 transfer_active = bool(io_bytes[5] & 0x80) 693 creation_time = encoder.decode_time(io_bytes[6:13], 694 common.TimeSize.SEVEN) 695 io_bytes = io_bytes[13:] 696 697 element = common.IoElement_F_DR_TA(file_name=file_name, 698 file_length=file_length, 699 more_follows=more_follows, 700 is_directory=is_directory, 701 transfer_active=transfer_active, 702 creation_time=creation_time) 703 return element, io_bytes 704 705 raise ValueError('unsupported ASDU type')
708def encode_io_element(element: common.IoElement, 709 asdu_type: int 710 ) -> typing.Iterable[int]: 711 asdu_type = _decode_asdu_type(asdu_type) 712 713 if isinstance(element, common.IoElement_M_SP_NA): 714 quality = util.first(encode_quality(element.quality)) 715 yield element.value.value | quality 716 717 elif isinstance(element, common.IoElement_M_SP_TA): 718 quality = util.first(encode_quality(element.quality)) 719 yield element.value.value | quality 720 721 elif isinstance(element, common.IoElement_M_DP_NA): 722 quality = util.first(encode_quality(element.quality)) 723 yield element.value.value | quality 724 725 elif isinstance(element, common.IoElement_M_DP_TA): 726 quality = util.first(encode_quality(element.quality)) 727 yield element.value.value | quality 728 729 elif isinstance(element, common.IoElement_M_ST_NA): 730 yield from encode_step_position_value(element.value) 731 yield from encode_quality(element.quality) 732 733 elif isinstance(element, common.IoElement_M_ST_TA): 734 yield from encode_step_position_value(element.value) 735 yield from encode_quality(element.quality) 736 737 elif isinstance(element, common.IoElement_M_BO_NA): 738 yield from encode_bitstring_value(element.value) 739 yield from encode_quality(element.quality) 740 741 elif isinstance(element, common.IoElement_M_BO_TA): 742 yield from encode_bitstring_value(element.value) 743 yield from encode_quality(element.quality) 744 745 elif isinstance(element, common.IoElement_M_ME_NA): 746 yield from encode_normalized_value(element.value) 747 yield from encode_quality(element.quality) 748 749 elif isinstance(element, common.IoElement_M_ME_TA): 750 yield from encode_normalized_value(element.value) 751 yield from encode_quality(element.quality) 752 753 elif isinstance(element, common.IoElement_M_ME_NB): 754 yield from encode_scaled_value(element.value) 755 yield from encode_quality(element.quality) 756 757 elif isinstance(element, common.IoElement_M_ME_TB): 758 yield from encode_scaled_value(element.value) 759 yield from encode_quality(element.quality) 760 761 elif isinstance(element, common.IoElement_M_ME_NC): 762 yield from encode_floating_value(element.value) 763 yield from encode_quality(element.quality) 764 765 elif isinstance(element, common.IoElement_M_ME_TC): 766 yield from encode_floating_value(element.value) 767 yield from encode_quality(element.quality) 768 769 elif isinstance(element, common.IoElement_M_IT_NA): 770 yield from encode_binary_counter_value(element.value) 771 yield from encode_quality(element.quality) 772 773 elif isinstance(element, common.IoElement_M_IT_TA): 774 yield from encode_binary_counter_value(element.value) 775 yield from encode_quality(element.quality) 776 777 elif isinstance(element, common.IoElement_M_EP_TA): 778 quality = util.first(encode_quality(element.quality)) 779 yield element.value.value | quality 780 yield from element.elapsed_time.to_bytes(2, 'little') 781 782 elif isinstance(element, common.IoElement_M_EP_TB): 783 yield from encode_protection_start_value(element.value) 784 yield from encode_quality(element.quality) 785 yield from element.duration_time.to_bytes(2, 'little') 786 787 elif isinstance(element, common.IoElement_M_EP_TC): 788 yield from encode_protection_command_value(element.value) 789 yield from encode_quality(element.quality) 790 yield from element.operating_time.to_bytes(2, 'little') 791 792 elif isinstance(element, common.IoElement_M_PS_NA): 793 yield from encode_status_value(element.value) 794 yield from encode_quality(element.quality) 795 796 elif isinstance(element, common.IoElement_M_ME_ND): 797 yield from encode_normalized_value(element.value) 798 799 elif isinstance(element, common.IoElement_M_SP_TB): 800 quality = util.first(encode_quality(element.quality)) 801 yield element.value.value | quality 802 803 elif isinstance(element, common.IoElement_M_DP_TB): 804 quality = util.first(encode_quality(element.quality)) 805 yield element.value.value | quality 806 807 elif isinstance(element, common.IoElement_M_ST_TB): 808 yield from encode_step_position_value(element.value) 809 yield from encode_quality(element.quality) 810 811 elif isinstance(element, common.IoElement_M_BO_TB): 812 yield from encode_bitstring_value(element.value) 813 yield from encode_quality(element.quality) 814 815 elif isinstance(element, common.IoElement_M_ME_TD): 816 yield from encode_normalized_value(element.value) 817 yield from encode_quality(element.quality) 818 819 elif isinstance(element, common.IoElement_M_ME_TE): 820 yield from encode_scaled_value(element.value) 821 yield from encode_quality(element.quality) 822 823 elif isinstance(element, common.IoElement_M_ME_TF): 824 yield from encode_floating_value(element.value) 825 yield from encode_quality(element.quality) 826 827 elif isinstance(element, common.IoElement_M_IT_TB): 828 yield from encode_binary_counter_value(element.value) 829 yield from encode_quality(element.quality) 830 831 elif isinstance(element, common.IoElement_M_EP_TD): 832 quality = util.first(encode_quality(element.quality)) 833 yield element.value.value | quality 834 yield from element.elapsed_time.to_bytes(2, 'little') 835 836 elif isinstance(element, common.IoElement_M_EP_TE): 837 yield from encode_protection_start_value(element.value) 838 yield from encode_quality(element.quality) 839 yield from element.duration_time.to_bytes(2, 'little') 840 841 elif isinstance(element, common.IoElement_M_EP_TF): 842 yield from encode_protection_command_value(element.value) 843 yield from encode_quality(element.quality) 844 yield from element.operating_time.to_bytes(2, 'little') 845 846 elif isinstance(element, common.IoElement_C_SC_NA): 847 yield (element.value.value | 848 (0x80 if element.select else 0) | 849 ((element.qualifier & 0x1F) << 2)) 850 851 elif isinstance(element, common.IoElement_C_DC_NA): 852 yield (element.value.value | 853 (0x80 if element.select else 0) | 854 ((element.qualifier & 0x1F) << 2)) 855 856 elif isinstance(element, common.IoElement_C_RC_NA): 857 yield (element.value.value | 858 (0x80 if element.select else 0) | 859 ((element.qualifier & 0x1F) << 2)) 860 861 elif isinstance(element, common.IoElement_C_SE_NA): 862 yield from encode_normalized_value(element.value) 863 yield (0x80 if element.select else 0) 864 865 elif isinstance(element, common.IoElement_C_SE_NB): 866 yield from encode_scaled_value(element.value) 867 yield (0x80 if element.select else 0) 868 869 elif isinstance(element, common.IoElement_C_SE_NC): 870 yield from encode_floating_value(element.value) 871 yield (0x80 if element.select else 0) 872 873 elif isinstance(element, common.IoElement_C_BO_NA): 874 yield from encode_bitstring_value(element.value) 875 876 elif isinstance(element, common.IoElement_M_EI_NA): 877 yield ((0x80 if element.param_change else 0x00) | 878 (element.cause & 0x7F)) 879 880 elif isinstance(element, common.IoElement_C_IC_NA): 881 yield element.qualifier & 0xFF 882 883 elif isinstance(element, common.IoElement_C_CI_NA): 884 yield ((element.freeze.value << 6) | 885 (element.request & 0x3F)) 886 887 elif isinstance(element, common.IoElement_C_RD_NA): 888 pass 889 890 elif isinstance(element, common.IoElement_C_CS_NA): 891 yield from encoder.encode_time(element.time, common.TimeSize.SEVEN) 892 893 elif isinstance(element, common.IoElement_C_TS_NA): 894 yield 0xAA 895 yield 0x55 896 897 elif isinstance(element, common.IoElement_C_RP_NA): 898 yield element.qualifier & 0xFF 899 900 elif isinstance(element, common.IoElement_C_CD_NA): 901 yield from element.time.to_bytes(2, 'little') 902 903 elif isinstance(element, common.IoElement_P_ME_NA): 904 yield from encode_normalized_value(element.value) 905 yield element.qualifier & 0xFF 906 907 elif isinstance(element, common.IoElement_P_ME_NB): 908 yield from encode_scaled_value(element.value) 909 yield element.qualifier & 0xFF 910 911 elif isinstance(element, common.IoElement_P_ME_NC): 912 yield from encode_floating_value(element.value) 913 yield element.qualifier & 0xFF 914 915 elif isinstance(element, common.IoElement_P_AC_NA): 916 yield element.qualifier & 0xFF 917 918 elif isinstance(element, common.IoElement_F_FR_NA): 919 yield from element.file_name.to_bytes(2, 'little') 920 yield from element.file_length.to_bytes(3, 'little') 921 yield (0x80 if element.ready else 0x00) 922 923 elif isinstance(element, common.IoElement_F_SR_NA): 924 yield from element.file_name.to_bytes(2, 'little') 925 yield element.section_name & 0xFF 926 yield from element.section_length.to_bytes(3, 'little') 927 yield (0x80 if element.ready else 0x00) 928 929 elif isinstance(element, common.IoElement_F_SC_NA): 930 yield from element.file_name.to_bytes(2, 'little') 931 yield element.section_name & 0xFF 932 yield element.qualifier & 0xFF 933 934 elif isinstance(element, common.IoElement_F_LS_NA): 935 yield from element.file_name.to_bytes(2, 'little') 936 yield element.section_name & 0xFF 937 yield element.last_qualifier & 0xFF 938 yield element.checksum & 0xFF 939 940 elif isinstance(element, common.IoElement_F_AF_NA): 941 yield from element.file_name.to_bytes(2, 'little') 942 yield element.section_name & 0xFF 943 yield element.qualifier & 0xFF 944 945 elif isinstance(element, common.IoElement_F_SG_NA): 946 yield from element.file_name.to_bytes(2, 'little') 947 yield element.section_name & 0xFF 948 yield len(element.segment) 949 yield from element.segment 950 951 elif isinstance(element, common.IoElement_F_DR_TA): 952 yield from element.file_name.to_bytes(2, 'little') 953 yield from element.file_length.to_bytes(3, 'little') 954 yield ((0x20 if element.more_follows else 0x00) | 955 (0x40 if element.is_directory else 0x00) | 956 (0x80 if element.transfer_active else 0x00)) 957 yield from encoder.encode_time(element.creation_time, 958 common.TimeSize.SEVEN) 959 960 else: 961 raise ValueError('unsupported IO element')
964def decode_quality(io_bytes: util.Bytes, 965 quality_type: common.QualityType 966 ) -> tuple[common.Quality, util.Bytes]: 967 if quality_type == common.QualityType.INDICATION: 968 invalid = bool(io_bytes[0] & 0x80) 969 not_topical = bool(io_bytes[0] & 0x40) 970 substituted = bool(io_bytes[0] & 0x20) 971 blocked = bool(io_bytes[0] & 0x10) 972 quality = common.IndicationQuality(invalid=invalid, 973 not_topical=not_topical, 974 substituted=substituted, 975 blocked=blocked) 976 977 elif quality_type == common.QualityType.MEASUREMENT: 978 invalid = bool(io_bytes[0] & 0x80) 979 not_topical = bool(io_bytes[0] & 0x40) 980 substituted = bool(io_bytes[0] & 0x20) 981 blocked = bool(io_bytes[0] & 0x10) 982 overflow = bool(io_bytes[0] & 0x01) 983 quality = common.MeasurementQuality(invalid=invalid, 984 not_topical=not_topical, 985 substituted=substituted, 986 blocked=blocked, 987 overflow=overflow) 988 989 elif quality_type == common.QualityType.COUNTER: 990 invalid = bool(io_bytes[0] & 0x80) 991 adjusted = bool(io_bytes[0] & 0x40) 992 overflow = bool(io_bytes[0] & 0x20) 993 sequence = io_bytes[0] & 0x1F 994 quality = common.CounterQuality(invalid=invalid, 995 adjusted=adjusted, 996 overflow=overflow, 997 sequence=sequence) 998 999 elif quality_type == common.QualityType.PROTECTION: 1000 invalid = bool(io_bytes[0] & 0x80) 1001 not_topical = bool(io_bytes[0] & 0x40) 1002 substituted = bool(io_bytes[0] & 0x20) 1003 blocked = bool(io_bytes[0] & 0x10) 1004 time_invalid = bool(io_bytes[0] & 0x08) 1005 quality = common.ProtectionQuality(invalid=invalid, 1006 not_topical=not_topical, 1007 substituted=substituted, 1008 blocked=blocked, 1009 time_invalid=time_invalid) 1010 1011 else: 1012 raise ValueError('unsupported quality type') 1013 1014 return quality, io_bytes[1:]
1017def encode_quality(quality: common.Quality 1018 ) -> typing.Iterable[int]: 1019 if isinstance(quality, common.IndicationQuality): 1020 yield ((0x80 if quality.invalid else 0) | 1021 (0x40 if quality.not_topical else 0) | 1022 (0x20 if quality.substituted else 0) | 1023 (0x10 if quality.blocked else 0)) 1024 1025 elif isinstance(quality, common.MeasurementQuality): 1026 yield ((0x80 if quality.invalid else 0) | 1027 (0x40 if quality.not_topical else 0) | 1028 (0x20 if quality.substituted else 0) | 1029 (0x10 if quality.blocked else 0) | 1030 (0x01 if quality.overflow else 0)) 1031 1032 elif isinstance(quality, common.CounterQuality): 1033 yield ((0x80 if quality.invalid else 0) | 1034 (0x40 if quality.adjusted else 0) | 1035 (0x20 if quality.overflow else 0) | 1036 (quality.sequence & 0x1F)) 1037 1038 elif isinstance(quality, common.ProtectionQuality): 1039 yield ((0x80 if quality.invalid else 0) | 1040 (0x40 if quality.not_topical else 0) | 1041 (0x20 if quality.substituted else 0) | 1042 (0x10 if quality.blocked else 0) | 1043 (0x08 if quality.time_invalid else 0)) 1044 1045 else: 1046 raise ValueError('unsupported quality')
1049def decode_step_position_value(io_bytes: util.Bytes 1050 ) -> tuple[common.StepPositionValue, 1051 util.Bytes]: 1052 value = (((-1 << 7) if io_bytes[0] & 0x40 else 0) | 1053 (io_bytes[0] & 0x7F)) 1054 transient = bool(io_bytes[0] & 0x80) 1055 step_position_value = common.StepPositionValue(value=value, 1056 transient=transient) 1057 return step_position_value, io_bytes[1:]
1130def decode_protection_start_value(io_bytes: util.Bytes 1131 ) -> tuple[common.ProtectionStartValue, 1132 util.Bytes]: 1133 general = bool(io_bytes[0] & 0x01) 1134 l1 = bool(io_bytes[0] & 0x02) 1135 l2 = bool(io_bytes[0] & 0x04) 1136 l3 = bool(io_bytes[0] & 0x08) 1137 ie = bool(io_bytes[0] & 0x10) 1138 reverse = bool(io_bytes[0] & 0x20) 1139 protection_start_value = common.ProtectionStartValue(general=general, 1140 l1=l1, 1141 l2=l2, 1142 l3=l3, 1143 ie=ie, 1144 reverse=reverse) 1145 return protection_start_value, io_bytes[1:]
1148def encode_protection_start_value(value: common.ProtectionStartValue 1149 ) -> typing.Iterable[int]: 1150 yield ((0x01 if value.general else 0x00) | 1151 (0x02 if value.l1 else 0x00) | 1152 (0x04 if value.l2 else 0x00) | 1153 (0x08 if value.l3 else 0x00) | 1154 (0x10 if value.ie else 0x00) | 1155 (0x20 if value.reverse else 0x00))
1158def decode_protection_command_value(io_bytes: util.Bytes 1159 ) -> tuple[common.ProtectionCommandValue, 1160 util.Bytes]: 1161 general = bool(io_bytes[0] & 0x01) 1162 l1 = bool(io_bytes[0] & 0x02) 1163 l2 = bool(io_bytes[0] & 0x04) 1164 l3 = bool(io_bytes[0] & 0x08) 1165 protection_command_value = common.ProtectionCommandValue(general=general, 1166 l1=l1, 1167 l2=l2, 1168 l3=l3) 1169 return protection_command_value, io_bytes[1:]
1180def decode_status_value(io_bytes: util.Bytes 1181 ) -> tuple[common.StatusValue, util.Bytes]: 1182 value = [bool(io_bytes[i // 8] & (1 << (i % 8))) 1183 for i in range(16)] 1184 change = [bool(io_bytes[2 + i // 8] & (1 << (i % 8))) 1185 for i in range(16)] 1186 status_value = common.StatusValue(value=value, 1187 change=change) 1188 return status_value, io_bytes[4:]