hat.drivers.iec60870.encodings.iec101
IEC 60870-5-101 messages
1"""IEC 60870-5-101 messages""" 2 3from hat.drivers.iec60870.encodings.iec101.common import ( 4 AsduTypeError, 5 CauseSize, 6 AsduAddressSize, 7 IoAddressSize, 8 TimeSize, 9 Time, 10 time_from_datetime, 11 time_to_datetime, 12 OriginatorAddress, 13 AsduAddress, 14 IoAddress, 15 OtherCauseType, 16 AsduType, 17 CauseType, 18 Cause, 19 QualityType, 20 IndicationQuality, 21 MeasurementQuality, 22 CounterQuality, 23 ProtectionQuality, 24 Quality, 25 FreezeCode, 26 SingleValue, 27 DoubleValue, 28 RegulatingValue, 29 StepPositionValue, 30 BitstringValue, 31 NormalizedValue, 32 ScaledValue, 33 FloatingValue, 34 BinaryCounterValue, 35 ProtectionValue, 36 ProtectionStartValue, 37 ProtectionCommandValue, 38 StatusValue, 39 IoElement_M_SP_NA, 40 IoElement_M_SP_TA, 41 IoElement_M_DP_NA, 42 IoElement_M_DP_TA, 43 IoElement_M_ST_NA, 44 IoElement_M_ST_TA, 45 IoElement_M_BO_NA, 46 IoElement_M_BO_TA, 47 IoElement_M_ME_NA, 48 IoElement_M_ME_TA, 49 IoElement_M_ME_NB, 50 IoElement_M_ME_TB, 51 IoElement_M_ME_NC, 52 IoElement_M_ME_TC, 53 IoElement_M_IT_NA, 54 IoElement_M_IT_TA, 55 IoElement_M_EP_TA, 56 IoElement_M_EP_TB, 57 IoElement_M_EP_TC, 58 IoElement_M_PS_NA, 59 IoElement_M_ME_ND, 60 IoElement_M_SP_TB, 61 IoElement_M_DP_TB, 62 IoElement_M_ST_TB, 63 IoElement_M_BO_TB, 64 IoElement_M_ME_TD, 65 IoElement_M_ME_TE, 66 IoElement_M_ME_TF, 67 IoElement_M_IT_TB, 68 IoElement_M_EP_TD, 69 IoElement_M_EP_TE, 70 IoElement_M_EP_TF, 71 IoElement_C_SC_NA, 72 IoElement_C_DC_NA, 73 IoElement_C_RC_NA, 74 IoElement_C_SE_NA, 75 IoElement_C_SE_NB, 76 IoElement_C_SE_NC, 77 IoElement_C_BO_NA, 78 IoElement_M_EI_NA, 79 IoElement_C_IC_NA, 80 IoElement_C_CI_NA, 81 IoElement_C_RD_NA, 82 IoElement_C_CS_NA, 83 IoElement_C_TS_NA, 84 IoElement_C_RP_NA, 85 IoElement_C_CD_NA, 86 IoElement_P_ME_NA, 87 IoElement_P_ME_NB, 88 IoElement_P_ME_NC, 89 IoElement_P_AC_NA, 90 IoElement_F_FR_NA, 91 IoElement_F_SR_NA, 92 IoElement_F_SC_NA, 93 IoElement_F_LS_NA, 94 IoElement_F_AF_NA, 95 IoElement_F_SG_NA, 96 IoElement_F_DR_TA, 97 IoElement, 98 IO, 99 ASDU) 100from hat.drivers.iec60870.encodings.iec101.encoder import ( 101 Encoder, 102 asdu_type_time_sizes, 103 decode_cause, 104 encode_cause, 105 decode_cause_type, 106 encode_cause_type, 107 decode_io_element, 108 encode_io_element, 109 decode_quality, 110 encode_quality, 111 decode_step_position_value, 112 encode_step_position_value, 113 decode_bitstring_value, 114 encode_bitstring_value, 115 decode_normalized_value, 116 encode_normalized_value, 117 decode_scaled_value, 118 encode_scaled_value, 119 decode_floating_value, 120 encode_floating_value, 121 decode_binary_counter_value, 122 encode_binary_counter_value, 123 decode_protection_start_value, 124 encode_protection_start_value, 125 decode_protection_command_value, 126 encode_protection_command_value, 127 decode_status_value, 128 encode_status_value) 129 130 131__all__ = ['AsduTypeError', 132 'CauseSize', 133 'AsduAddressSize', 134 'IoAddressSize', 135 'TimeSize', 136 'Time', 137 'time_from_datetime', 138 'time_to_datetime', 139 'OriginatorAddress', 140 'AsduAddress', 141 'IoAddress', 142 'OtherCauseType', 143 'AsduType', 144 'CauseType', 145 'Cause', 146 'QualityType', 147 'IndicationQuality', 148 'MeasurementQuality', 149 'CounterQuality', 150 'ProtectionQuality', 151 'Quality', 152 'FreezeCode', 153 'SingleValue', 154 'DoubleValue', 155 'RegulatingValue', 156 'StepPositionValue', 157 'BitstringValue', 158 'NormalizedValue', 159 'ScaledValue', 160 'FloatingValue', 161 'BinaryCounterValue', 162 'ProtectionValue', 163 'ProtectionStartValue', 164 'ProtectionCommandValue', 165 'StatusValue', 166 'IoElement_M_SP_NA', 167 'IoElement_M_SP_TA', 168 'IoElement_M_DP_NA', 169 'IoElement_M_DP_TA', 170 'IoElement_M_ST_NA', 171 'IoElement_M_ST_TA', 172 'IoElement_M_BO_NA', 173 'IoElement_M_BO_TA', 174 'IoElement_M_ME_NA', 175 'IoElement_M_ME_TA', 176 'IoElement_M_ME_NB', 177 'IoElement_M_ME_TB', 178 'IoElement_M_ME_NC', 179 'IoElement_M_ME_TC', 180 'IoElement_M_IT_NA', 181 'IoElement_M_IT_TA', 182 'IoElement_M_EP_TA', 183 'IoElement_M_EP_TB', 184 'IoElement_M_EP_TC', 185 'IoElement_M_PS_NA', 186 'IoElement_M_ME_ND', 187 'IoElement_M_SP_TB', 188 'IoElement_M_DP_TB', 189 'IoElement_M_ST_TB', 190 'IoElement_M_BO_TB', 191 'IoElement_M_ME_TD', 192 'IoElement_M_ME_TE', 193 'IoElement_M_ME_TF', 194 'IoElement_M_IT_TB', 195 'IoElement_M_EP_TD', 196 'IoElement_M_EP_TE', 197 'IoElement_M_EP_TF', 198 'IoElement_C_SC_NA', 199 'IoElement_C_DC_NA', 200 'IoElement_C_RC_NA', 201 'IoElement_C_SE_NA', 202 'IoElement_C_SE_NB', 203 'IoElement_C_SE_NC', 204 'IoElement_C_BO_NA', 205 'IoElement_M_EI_NA', 206 'IoElement_C_IC_NA', 207 'IoElement_C_CI_NA', 208 'IoElement_C_RD_NA', 209 'IoElement_C_CS_NA', 210 'IoElement_C_TS_NA', 211 'IoElement_C_RP_NA', 212 'IoElement_C_CD_NA', 213 'IoElement_P_ME_NA', 214 'IoElement_P_ME_NB', 215 'IoElement_P_ME_NC', 216 'IoElement_P_AC_NA', 217 'IoElement_F_FR_NA', 218 'IoElement_F_SR_NA', 219 'IoElement_F_SC_NA', 220 'IoElement_F_LS_NA', 221 'IoElement_F_AF_NA', 222 'IoElement_F_SG_NA', 223 'IoElement_F_DR_TA', 224 'IoElement', 225 'IO', 226 'ASDU', 227 'Encoder', 228 'asdu_type_time_sizes', 229 'decode_cause', 230 'encode_cause', 231 'decode_cause_type', 232 'encode_cause_type', 233 'decode_io_element', 234 'encode_io_element', 235 'decode_quality', 236 'encode_quality', 237 'decode_step_position_value', 238 'encode_step_position_value', 239 'decode_bitstring_value', 240 'encode_bitstring_value', 241 'decode_normalized_value', 242 'encode_normalized_value', 243 'decode_scaled_value', 244 'encode_scaled_value', 245 'decode_floating_value', 246 'encode_floating_value', 247 'decode_binary_counter_value', 248 'encode_binary_counter_value', 249 'decode_protection_start_value', 250 'encode_protection_start_value', 251 'decode_protection_command_value', 252 'encode_protection_command_value', 253 'decode_status_value', 254 'encode_status_value']
Common base class for all non-exit exceptions.
35class Time(typing.NamedTuple): 36 size: TimeSize 37 milliseconds: int 38 """milliseconds in range [0, 59999]""" 39 invalid: bool | None 40 """available for size THREE, FOUR, SEVEN""" 41 minutes: int | None 42 """available for size THREE, FOUR, SEVEN (minutes in range [0, 59])""" 43 summer_time: bool | None 44 """available for size FOUR, SEVEN""" 45 hours: int | None 46 """available for size FOUR, SEVEN (hours in range [0, 23])""" 47 day_of_week: int | None 48 """available for size SEVEN (day_of_week in range [1, 7])""" 49 day_of_month: int | None 50 """available for size SEVEN (day_of_month in range [1, 31])""" 51 months: int | None 52 """available for size SEVEN (months in range [1, 12])""" 53 years: int | None 54 """available for size SEVEN (years in range [0, 99])"""
Time(size, milliseconds, invalid, minutes, summer_time, hours, day_of_week, day_of_month, months, years)
Create new instance of Time(size, milliseconds, invalid, minutes, summer_time, hours, day_of_week, day_of_month, months, years)
70def time_from_datetime(dt: datetime.datetime, 71 invalid: bool = False 72 ) -> Time: 73 """Create Time from datetime.datetime""" 74 # TODO document edge cases (local time, os implementation, ...) 75 # rounding microseconds to the nearest millisecond 76 dt_rounded = ( 77 dt.replace(microsecond=0) + 78 datetime.timedelta(milliseconds=round(dt.microsecond / 1000))) 79 local_time = time.localtime(dt_rounded.timestamp()) 80 81 return Time( 82 size=TimeSize.SEVEN, 83 milliseconds=(local_time.tm_sec * 1000 + 84 dt_rounded.microsecond // 1000), 85 invalid=invalid, 86 minutes=local_time.tm_min, 87 summer_time=bool(local_time.tm_isdst), 88 hours=local_time.tm_hour, 89 day_of_week=local_time.tm_wday + 1, 90 day_of_month=local_time.tm_mday, 91 months=local_time.tm_mon, 92 years=local_time.tm_year % 100)
Create Time from datetime.datetime
95def time_to_datetime(t: Time 96 ) -> datetime.datetime: 97 """Convert Time to datetime.datetime""" 98 # TODO document edge cases (local time, os implementation, ...) 99 # TODO support TimeSize.FOUR 100 if t.size == TimeSize.TWO: 101 local_now = datetime.datetime.now() 102 local_dt = local_now.replace( 103 second=int(t.milliseconds / 1000), 104 microsecond=(t.milliseconds % 1000) * 1000) 105 106 local_seconds = local_now.second + local_now.microsecond / 1_000_000 107 t_seconds = t.milliseconds / 1_000 108 109 if abs(local_seconds - t_seconds) > 30: 110 if local_seconds < t_seconds: 111 local_dt = local_dt - datetime.timedelta(minutes=1) 112 113 else: 114 local_dt = local_dt + datetime.timedelta(minutes=1) 115 116 elif t.size == TimeSize.THREE: 117 local_now = datetime.datetime.now() 118 local_dt = local_now.replace( 119 minute=t.minutes, 120 second=int(t.milliseconds / 1000), 121 microsecond=(t.milliseconds % 1000) * 1000) 122 123 local_minutes = (local_now.minute + 124 local_now.second / 60 + 125 local_now.microsecond / 60_000_000) 126 t_minutes = t.minutes + t.milliseconds / 60_000 127 128 if abs(local_minutes - t_minutes) > 30: 129 if local_minutes < t_minutes: 130 local_dt = local_dt - datetime.timedelta(hours=1) 131 132 else: 133 local_dt = local_dt + datetime.timedelta(hours=1) 134 135 elif t.size == TimeSize.SEVEN: 136 local_dt = datetime.datetime( 137 year=2000 + t.years if t.years < 70 else 1900 + t.years, 138 month=t.months, 139 day=t.day_of_month, 140 hour=t.hours, 141 minute=t.minutes, 142 second=int(t.milliseconds / 1000), 143 microsecond=(t.milliseconds % 1000) * 1000, 144 fold=not t.summer_time) 145 146 else: 147 raise ValueError('unsupported time size') 148 149 return local_dt.astimezone(tz=datetime.timezone.utc)
Convert Time to datetime.datetime
25class AsduType(enum.Enum): 26 M_SP_NA = 1 27 M_SP_TA = 2 28 M_DP_NA = 3 29 M_DP_TA = 4 30 M_ST_NA = 5 31 M_ST_TA = 6 32 M_BO_NA = 7 33 M_BO_TA = 8 34 M_ME_NA = 9 35 M_ME_TA = 10 36 M_ME_NB = 11 37 M_ME_TB = 12 38 M_ME_NC = 13 39 M_ME_TC = 14 40 M_IT_NA = 15 41 M_IT_TA = 16 42 M_EP_TA = 17 43 M_EP_TB = 18 44 M_EP_TC = 19 45 M_PS_NA = 20 46 M_ME_ND = 21 47 M_SP_TB = 30 48 M_DP_TB = 31 49 M_ST_TB = 32 50 M_BO_TB = 33 51 M_ME_TD = 34 52 M_ME_TE = 35 53 M_ME_TF = 36 54 M_IT_TB = 37 55 M_EP_TD = 38 56 M_EP_TE = 39 57 M_EP_TF = 40 58 C_SC_NA = 45 59 C_DC_NA = 46 60 C_RC_NA = 47 61 C_SE_NA = 48 62 C_SE_NB = 49 63 C_SE_NC = 50 64 C_BO_NA = 51 65 M_EI_NA = 70 66 C_IC_NA = 100 67 C_CI_NA = 101 68 C_RD_NA = 102 69 C_CS_NA = 103 70 C_TS_NA = 104 71 C_RP_NA = 105 72 C_CD_NA = 106 73 P_ME_NA = 110 74 P_ME_NB = 111 75 P_ME_NC = 112 76 P_AC_NA = 113 77 F_FR_NA = 120 78 F_SR_NA = 121 79 F_SC_NA = 122 80 F_LS_NA = 123 81 F_AF_NA = 124 82 F_SG_NA = 125 83 F_DR_TA = 126
86class CauseType(enum.Enum): 87 UNDEFINED = 0 88 PERIODIC = 1 89 BACKGROUND_SCAN = 2 90 SPONTANEOUS = 3 91 INITIALIZED = 4 92 REQUEST = 5 93 ACTIVATION = 6 94 ACTIVATION_CONFIRMATION = 7 95 DEACTIVATION = 8 96 DEACTIVATION_CONFIRMATION = 9 97 ACTIVATION_TERMINATION = 10 98 REMOTE_COMMAND = 11 99 LOCAL_COMMAND = 12 100 FILE_TRANSFER = 13 101 INTERROGATED_STATION = 20 102 INTERROGATED_GROUP01 = 21 103 INTERROGATED_GROUP02 = 22 104 INTERROGATED_GROUP03 = 23 105 INTERROGATED_GROUP04 = 24 106 INTERROGATED_GROUP05 = 25 107 INTERROGATED_GROUP06 = 26 108 INTERROGATED_GROUP07 = 27 109 INTERROGATED_GROUP08 = 28 110 INTERROGATED_GROUP09 = 29 111 INTERROGATED_GROUP10 = 30 112 INTERROGATED_GROUP11 = 31 113 INTERROGATED_GROUP12 = 32 114 INTERROGATED_GROUP13 = 33 115 INTERROGATED_GROUP14 = 34 116 INTERROGATED_GROUP15 = 35 117 INTERROGATED_GROUP16 = 36 118 INTERROGATED_COUNTER = 37 119 INTERROGATED_COUNTER01 = 38 120 INTERROGATED_COUNTER02 = 39 121 INTERROGATED_COUNTER03 = 40 122 INTERROGATED_COUNTER04 = 41 123 UNKNOWN_TYPE = 44 124 UNKNOWN_CAUSE = 45 125 UNKNOWN_ASDU_ADDRESS = 46 126 UNKNOWN_IO_ADDRESS = 47
129class Cause(typing.NamedTuple): 130 type: CauseType | OtherCauseType 131 is_negative_confirm: bool 132 is_test: bool 133 originator_address: OriginatorAddress
Cause(type, is_negative_confirm, is_test, originator_address)
Create new instance of Cause(type, is_negative_confirm, is_test, originator_address)
136class QualityType(enum.Enum): 137 INDICATION = 0 138 MEASUREMENT = 1 139 COUNTER = 2 140 PROTECTION = 3
143class IndicationQuality(typing.NamedTuple): 144 invalid: bool 145 not_topical: bool 146 substituted: bool 147 blocked: bool
IndicationQuality(invalid, not_topical, substituted, blocked)
150class MeasurementQuality(typing.NamedTuple): 151 invalid: bool 152 not_topical: bool 153 substituted: bool 154 blocked: bool 155 overflow: bool
MeasurementQuality(invalid, not_topical, substituted, blocked, overflow)
158class CounterQuality(typing.NamedTuple): 159 invalid: bool 160 adjusted: bool 161 overflow: bool 162 sequence: int 163 """sequence in range [0, 31]"""
CounterQuality(invalid, adjusted, overflow, sequence)
166class ProtectionQuality(typing.NamedTuple): 167 invalid: bool 168 not_topical: bool 169 substituted: bool 170 blocked: bool 171 time_invalid: bool
ProtectionQuality(invalid, not_topical, substituted, blocked, time_invalid)
192class DoubleValue(enum.Enum): 193 """DoubleDataValue 194 195 `FAULT` stands for value 3, defined in the protocol as *INDETERMINATE*. 196 This is in order to make it more distinguishable from ``INTERMEDIATE``. 197 198 """ 199 INTERMEDIATE = 0 200 OFF = 1 201 ON = 2 202 FAULT = 3
DoubleDataValue
FAULT stands for value 3, defined in the protocol as INDETERMINATE.
This is in order to make it more distinguishable from INTERMEDIATE.
210class StepPositionValue(typing.NamedTuple): 211 value: int 212 """value in range [-64, 63]""" 213 transient: bool
StepPositionValue(value, transient)
216class BitstringValue(typing.NamedTuple): 217 value: util.Bytes 218 """bitstring encoded as 4 bytes"""
BitstringValue(value,)
NormalizedValue(value,)
ScaledValue(value,)
FloatingValue(value,)
235class BinaryCounterValue(typing.NamedTuple): 236 value: int 237 """value in range [-2^31, 2^31-1]"""
BinaryCounterValue(value,)
245class ProtectionStartValue(typing.NamedTuple): 246 general: bool 247 l1: bool 248 l2: bool 249 l3: bool 250 ie: bool 251 reverse: bool
ProtectionStartValue(general, l1, l2, l3, ie, reverse)
254class ProtectionCommandValue(typing.NamedTuple): 255 general: bool 256 l1: bool 257 l2: bool 258 l3: bool
ProtectionCommandValue(general, l1, l2, l3)
261class StatusValue(typing.NamedTuple): 262 value: list[bool] 263 """value length is 16""" 264 change: list[bool] 265 """change length is 16"""
StatusValue(value, change)
268class IoElement_M_SP_NA(typing.NamedTuple): 269 value: SingleValue 270 quality: IndicationQuality
IoElement_M_SP_NA(value, quality)
Create new instance of IoElement_M_SP_NA(value, quality)
273class IoElement_M_SP_TA(typing.NamedTuple): 274 value: SingleValue 275 quality: IndicationQuality
IoElement_M_SP_TA(value, quality)
Create new instance of IoElement_M_SP_TA(value, quality)
278class IoElement_M_DP_NA(typing.NamedTuple): 279 value: DoubleValue 280 quality: IndicationQuality
IoElement_M_DP_NA(value, quality)
Create new instance of IoElement_M_DP_NA(value, quality)
283class IoElement_M_DP_TA(typing.NamedTuple): 284 value: DoubleValue 285 quality: IndicationQuality
IoElement_M_DP_TA(value, quality)
Create new instance of IoElement_M_DP_TA(value, quality)
288class IoElement_M_ST_NA(typing.NamedTuple): 289 value: StepPositionValue 290 quality: MeasurementQuality
IoElement_M_ST_NA(value, quality)
Create new instance of IoElement_M_ST_NA(value, quality)
293class IoElement_M_ST_TA(typing.NamedTuple): 294 value: StepPositionValue 295 quality: MeasurementQuality
IoElement_M_ST_TA(value, quality)
Create new instance of IoElement_M_ST_TA(value, quality)
298class IoElement_M_BO_NA(typing.NamedTuple): 299 value: BitstringValue 300 quality: MeasurementQuality
IoElement_M_BO_NA(value, quality)
Create new instance of IoElement_M_BO_NA(value, quality)
303class IoElement_M_BO_TA(typing.NamedTuple): 304 value: BitstringValue 305 quality: MeasurementQuality
IoElement_M_BO_TA(value, quality)
Create new instance of IoElement_M_BO_TA(value, quality)
308class IoElement_M_ME_NA(typing.NamedTuple): 309 value: NormalizedValue 310 quality: MeasurementQuality
IoElement_M_ME_NA(value, quality)
Create new instance of IoElement_M_ME_NA(value, quality)
313class IoElement_M_ME_TA(typing.NamedTuple): 314 value: NormalizedValue 315 quality: MeasurementQuality
IoElement_M_ME_TA(value, quality)
Create new instance of IoElement_M_ME_TA(value, quality)
318class IoElement_M_ME_NB(typing.NamedTuple): 319 value: ScaledValue 320 quality: MeasurementQuality
IoElement_M_ME_NB(value, quality)
Create new instance of IoElement_M_ME_NB(value, quality)
323class IoElement_M_ME_TB(typing.NamedTuple): 324 value: ScaledValue 325 quality: MeasurementQuality
IoElement_M_ME_TB(value, quality)
Create new instance of IoElement_M_ME_TB(value, quality)
328class IoElement_M_ME_NC(typing.NamedTuple): 329 value: FloatingValue 330 quality: MeasurementQuality
IoElement_M_ME_NC(value, quality)
Create new instance of IoElement_M_ME_NC(value, quality)
333class IoElement_M_ME_TC(typing.NamedTuple): 334 value: FloatingValue 335 quality: MeasurementQuality
IoElement_M_ME_TC(value, quality)
Create new instance of IoElement_M_ME_TC(value, quality)
338class IoElement_M_IT_NA(typing.NamedTuple): 339 value: BinaryCounterValue 340 quality: CounterQuality
IoElement_M_IT_NA(value, quality)
Create new instance of IoElement_M_IT_NA(value, quality)
343class IoElement_M_IT_TA(typing.NamedTuple): 344 value: BinaryCounterValue 345 quality: CounterQuality
IoElement_M_IT_TA(value, quality)
Create new instance of IoElement_M_IT_TA(value, quality)
348class IoElement_M_EP_TA(typing.NamedTuple): 349 value: ProtectionValue 350 quality: ProtectionQuality 351 elapsed_time: int 352 """elapsed_time in range [0, 65535]"""
IoElement_M_EP_TA(value, quality, elapsed_time)
Create new instance of IoElement_M_EP_TA(value, quality, elapsed_time)
355class IoElement_M_EP_TB(typing.NamedTuple): 356 value: ProtectionStartValue 357 quality: ProtectionQuality 358 duration_time: int 359 """duration_time in range [0, 65535]"""
IoElement_M_EP_TB(value, quality, duration_time)
Create new instance of IoElement_M_EP_TB(value, quality, duration_time)
362class IoElement_M_EP_TC(typing.NamedTuple): 363 value: ProtectionCommandValue 364 quality: ProtectionQuality 365 operating_time: int 366 """operating_time in range [0, 65535]"""
IoElement_M_EP_TC(value, quality, operating_time)
Create new instance of IoElement_M_EP_TC(value, quality, operating_time)
369class IoElement_M_PS_NA(typing.NamedTuple): 370 value: StatusValue 371 quality: MeasurementQuality
IoElement_M_PS_NA(value, quality)
Create new instance of IoElement_M_PS_NA(value, quality)
IoElement_M_ME_ND(value,)
378class IoElement_M_SP_TB(typing.NamedTuple): 379 value: SingleValue 380 quality: IndicationQuality
IoElement_M_SP_TB(value, quality)
Create new instance of IoElement_M_SP_TB(value, quality)
383class IoElement_M_DP_TB(typing.NamedTuple): 384 value: DoubleValue 385 quality: IndicationQuality
IoElement_M_DP_TB(value, quality)
Create new instance of IoElement_M_DP_TB(value, quality)
388class IoElement_M_ST_TB(typing.NamedTuple): 389 value: StepPositionValue 390 quality: MeasurementQuality
IoElement_M_ST_TB(value, quality)
Create new instance of IoElement_M_ST_TB(value, quality)
393class IoElement_M_BO_TB(typing.NamedTuple): 394 value: BitstringValue 395 quality: MeasurementQuality
IoElement_M_BO_TB(value, quality)
Create new instance of IoElement_M_BO_TB(value, quality)
398class IoElement_M_ME_TD(typing.NamedTuple): 399 value: NormalizedValue 400 quality: MeasurementQuality
IoElement_M_ME_TD(value, quality)
Create new instance of IoElement_M_ME_TD(value, quality)
403class IoElement_M_ME_TE(typing.NamedTuple): 404 value: ScaledValue 405 quality: MeasurementQuality
IoElement_M_ME_TE(value, quality)
Create new instance of IoElement_M_ME_TE(value, quality)
408class IoElement_M_ME_TF(typing.NamedTuple): 409 value: FloatingValue 410 quality: MeasurementQuality
IoElement_M_ME_TF(value, quality)
Create new instance of IoElement_M_ME_TF(value, quality)
413class IoElement_M_IT_TB(typing.NamedTuple): 414 value: BinaryCounterValue 415 quality: CounterQuality
IoElement_M_IT_TB(value, quality)
Create new instance of IoElement_M_IT_TB(value, quality)
418class IoElement_M_EP_TD(typing.NamedTuple): 419 value: ProtectionValue 420 quality: ProtectionQuality 421 elapsed_time: int 422 """elapsed_time in range [0, 65535]"""
IoElement_M_EP_TD(value, quality, elapsed_time)
Create new instance of IoElement_M_EP_TD(value, quality, elapsed_time)
425class IoElement_M_EP_TE(typing.NamedTuple): 426 value: ProtectionStartValue 427 quality: ProtectionQuality 428 duration_time: int 429 """duration_time in range [0, 65535]"""
IoElement_M_EP_TE(value, quality, duration_time)
Create new instance of IoElement_M_EP_TE(value, quality, duration_time)
432class IoElement_M_EP_TF(typing.NamedTuple): 433 value: ProtectionCommandValue 434 quality: ProtectionQuality 435 operating_time: int 436 """operating_time in range [0, 65535]"""
IoElement_M_EP_TF(value, quality, operating_time)
Create new instance of IoElement_M_EP_TF(value, quality, operating_time)
439class IoElement_C_SC_NA(typing.NamedTuple): 440 value: SingleValue 441 select: bool 442 qualifier: int 443 """qualifier in range [0, 31]"""
IoElement_C_SC_NA(value, select, qualifier)
Create new instance of IoElement_C_SC_NA(value, select, qualifier)
446class IoElement_C_DC_NA(typing.NamedTuple): 447 value: DoubleValue 448 select: bool 449 qualifier: int 450 """qualifier in range [0, 31]"""
IoElement_C_DC_NA(value, select, qualifier)
Create new instance of IoElement_C_DC_NA(value, select, qualifier)
453class IoElement_C_RC_NA(typing.NamedTuple): 454 value: RegulatingValue 455 select: bool 456 qualifier: int 457 """qualifier in range [0, 31]"""
IoElement_C_RC_NA(value, select, qualifier)
Create new instance of IoElement_C_RC_NA(value, select, qualifier)
IoElement_C_SE_NA(value, select)
Create new instance of IoElement_C_SE_NA(value, select)
IoElement_C_SE_NB(value, select)
Create new instance of IoElement_C_SE_NB(value, select)
IoElement_C_SE_NC(value, select)
Create new instance of IoElement_C_SE_NC(value, select)
IoElement_C_BO_NA(value,)
479class IoElement_M_EI_NA(typing.NamedTuple): 480 param_change: bool 481 cause: int 482 """cause in range [0, 127]"""
IoElement_M_EI_NA(param_change, cause)
485class IoElement_C_IC_NA(typing.NamedTuple): 486 qualifier: int 487 """qualifier in range [0, 255]"""
IoElement_C_IC_NA(qualifier,)
490class IoElement_C_CI_NA(typing.NamedTuple): 491 request: int 492 """request in range [0, 63]""" 493 freeze: FreezeCode
IoElement_C_CI_NA(request, freeze)
Create new instance of IoElement_C_CI_NA(request, freeze)
IoElement_C_RD_NA()
IoElement_C_CS_NA(time,)
IoElement_C_TS_NA()
509class IoElement_C_RP_NA(typing.NamedTuple): 510 qualifier: int 511 """qualifier in range [0, 255]"""
IoElement_C_RP_NA(qualifier,)
IoElement_C_CD_NA(time,)
519class IoElement_P_ME_NA(typing.NamedTuple): 520 value: NormalizedValue 521 qualifier: int 522 """qualifier in range [0, 255]"""
IoElement_P_ME_NA(value, qualifier)
Create new instance of IoElement_P_ME_NA(value, qualifier)
525class IoElement_P_ME_NB(typing.NamedTuple): 526 value: ScaledValue 527 qualifier: int 528 """qualifier in range [0, 255]"""
IoElement_P_ME_NB(value, qualifier)
Create new instance of IoElement_P_ME_NB(value, qualifier)
531class IoElement_P_ME_NC(typing.NamedTuple): 532 value: FloatingValue 533 qualifier: int 534 """qualifier in range [0, 255]"""
IoElement_P_ME_NC(value, qualifier)
Create new instance of IoElement_P_ME_NC(value, qualifier)
537class IoElement_P_AC_NA(typing.NamedTuple): 538 qualifier: int 539 """qualifier in range [0, 255]"""
IoElement_P_AC_NA(qualifier,)
542class IoElement_F_FR_NA(typing.NamedTuple): 543 file_name: int 544 """file_name in range [0, 65535]""" 545 file_length: int 546 """file_length in range [0, 16777215]""" 547 ready: bool
IoElement_F_FR_NA(file_name, file_length, ready)
550class IoElement_F_SR_NA(typing.NamedTuple): 551 file_name: int 552 """file_name in range [0, 65535]""" 553 section_name: int 554 """section_name in range [0, 255]""" 555 section_length: int 556 """section_length in range [0, 16777215]""" 557 ready: bool
IoElement_F_SR_NA(file_name, section_name, section_length, ready)
560class IoElement_F_SC_NA(typing.NamedTuple): 561 file_name: int 562 """file_name in range [0, 65535]""" 563 section_name: int 564 """section_name in range [0, 255]""" 565 qualifier: int 566 """qualifier in range [0, 255]"""
IoElement_F_SC_NA(file_name, section_name, qualifier)
569class IoElement_F_LS_NA(typing.NamedTuple): 570 file_name: int 571 """file_name in range [0, 65535]""" 572 section_name: int 573 """section_name in range [0, 255]""" 574 last_qualifier: int 575 """last_qualifier in range [0, 255]""" 576 checksum: int 577 """checksum in range [0, 255]"""
IoElement_F_LS_NA(file_name, section_name, last_qualifier, checksum)
580class IoElement_F_AF_NA(typing.NamedTuple): 581 file_name: int 582 """file_name in range [0, 65535]""" 583 section_name: int 584 """section_name in range [0, 255]""" 585 qualifier: int 586 """qualifier in range [0, 255]"""
IoElement_F_AF_NA(file_name, section_name, qualifier)
589class IoElement_F_SG_NA(typing.NamedTuple): 590 file_name: int 591 """file_name in range [0, 65535]""" 592 section_name: int 593 """section_name in range [0, 255]""" 594 segment: util.Bytes
IoElement_F_SG_NA(file_name, section_name, segment)
597class IoElement_F_DR_TA(typing.NamedTuple): 598 file_name: int 599 """file_name in range [0, 65535]""" 600 file_length: int 601 """file_length in range [0, 16777215]""" 602 more_follows: bool 603 is_directory: bool 604 transfer_active: bool 605 creation_time: Time
IoElement_F_DR_TA(file_name, file_length, more_follows, is_directory, transfer_active, creation_time)
Create new instance of IoElement_F_DR_TA(file_name, file_length, more_follows, is_directory, transfer_active, creation_time)
668class IO(typing.NamedTuple): 669 address: IoAddress 670 elements: list[IoElement] 671 time: Time | None
IO(address, elements, time)
Create new instance of IO(address, elements, time)
Alias for field number 1
674class ASDU(typing.NamedTuple): 675 type: AsduType 676 cause: Cause 677 address: AsduAddress 678 ios: list[IO]
ASDU(type, cause, address, ios)
12class Encoder: 13 14 def __init__(self, 15 cause_size: common.CauseSize, 16 asdu_address_size: common.AsduAddressSize, 17 io_address_size: common.IoAddressSize, 18 max_asdu_size: int = 252): 19 self._cause_size = cause_size 20 self._max_asdu_size = max_asdu_size 21 self._encoder = encoder.Encoder( 22 cause_size=cause_size, 23 asdu_address_size=asdu_address_size, 24 io_address_size=io_address_size, 25 asdu_type_time_sizes=asdu_type_time_sizes, 26 inverted_sequence_bit=False, 27 decode_io_element_cb=decode_io_element, 28 encode_io_element_cb=encode_io_element) 29 30 @property 31 def max_asdu_size(self) -> int: 32 return self._max_asdu_size 33 34 @property 35 def cause_size(self) -> common.CauseSize: 36 return self._encoder.cause_size 37 38 @property 39 def asdu_address_size(self) -> common.AsduAddressSize: 40 return self._encoder.asdu_address_size 41 42 @property 43 def io_address_size(self) -> common.IoAddressSize: 44 return self._encoder.io_address_size 45 46 def decode_asdu(self, 47 asdu_bytes: util.Bytes 48 ) -> tuple[common.ASDU, util.Bytes]: 49 asdu, rest = self._encoder.decode_asdu(asdu_bytes) 50 51 asdu_type = _decode_asdu_type(asdu.type) 52 53 cause = decode_cause(asdu.cause, self._cause_size) 54 address = asdu.address 55 ios = [common.IO(address=io.address, 56 elements=io.elements, 57 time=io.time) 58 for io in asdu.ios] 59 60 asdu = common.ASDU(type=asdu_type, 61 cause=cause, 62 address=address, 63 ios=ios) 64 return asdu, rest 65 66 def encode_asdu(self, asdu: common.ASDU) -> util.Bytes: 67 asdu_type = asdu.type.value 68 cause = encode_cause(asdu.cause, self._cause_size) 69 address = asdu.address 70 ios = [encoder.common.IO(address=io.address, 71 elements=io.elements, 72 time=io.time) 73 for io in asdu.ios] 74 75 asdu = encoder.common.ASDU(type=asdu_type, 76 cause=cause, 77 address=address, 78 ios=ios) 79 80 return self._encoder.encode_asdu(asdu)
14 def __init__(self, 15 cause_size: common.CauseSize, 16 asdu_address_size: common.AsduAddressSize, 17 io_address_size: common.IoAddressSize, 18 max_asdu_size: int = 252): 19 self._cause_size = cause_size 20 self._max_asdu_size = max_asdu_size 21 self._encoder = encoder.Encoder( 22 cause_size=cause_size, 23 asdu_address_size=asdu_address_size, 24 io_address_size=io_address_size, 25 asdu_type_time_sizes=asdu_type_time_sizes, 26 inverted_sequence_bit=False, 27 decode_io_element_cb=decode_io_element, 28 encode_io_element_cb=encode_io_element)
46 def decode_asdu(self, 47 asdu_bytes: util.Bytes 48 ) -> tuple[common.ASDU, util.Bytes]: 49 asdu, rest = self._encoder.decode_asdu(asdu_bytes) 50 51 asdu_type = _decode_asdu_type(asdu.type) 52 53 cause = decode_cause(asdu.cause, self._cause_size) 54 address = asdu.address 55 ios = [common.IO(address=io.address, 56 elements=io.elements, 57 time=io.time) 58 for io in asdu.ios] 59 60 asdu = common.ASDU(type=asdu_type, 61 cause=cause, 62 address=address, 63 ios=ios) 64 return asdu, rest
66 def encode_asdu(self, asdu: common.ASDU) -> util.Bytes: 67 asdu_type = asdu.type.value 68 cause = encode_cause(asdu.cause, self._cause_size) 69 address = asdu.address 70 ios = [encoder.common.IO(address=io.address, 71 elements=io.elements, 72 time=io.time) 73 for io in asdu.ios] 74 75 asdu = encoder.common.ASDU(type=asdu_type, 76 cause=cause, 77 address=address, 78 ios=ios) 79 80 return self._encoder.encode_asdu(asdu)
107def decode_cause(cause: int, 108 cause_size: common.CauseSize 109 ) -> common.Cause: 110 cause_type = decode_cause_type(cause & 0x3F) 111 is_negative_confirm = bool(cause & 0x40) 112 is_test = bool(cause & 0x80) 113 114 if cause_size == common.CauseSize.ONE: 115 originator_address = 0 116 117 elif cause_size == common.CauseSize.TWO: 118 originator_address = cause >> 8 119 120 else: 121 raise ValueError('unsupported cause size') 122 123 return common.Cause(type=cause_type, 124 is_negative_confirm=is_negative_confirm, 125 is_test=is_test, 126 originator_address=originator_address)
129def encode_cause(cause: common.Cause, 130 cause_size: common.CauseSize 131 ) -> int: 132 result = ((0x80 if cause.is_test else 0) | 133 (0x40 if cause.is_negative_confirm else 0) | 134 encode_cause_type(cause.type)) 135 136 if cause_size == common.CauseSize.ONE: 137 return result 138 139 if cause_size == common.CauseSize.TWO: 140 return result | (cause.originator_address << 8) 141 142 raise ValueError('unsupported cause size')
158def decode_io_element(io_bytes: util.Bytes, 159 asdu_type: int 160 ) -> tuple[common.IoElement, util.Bytes]: 161 asdu_type = _decode_asdu_type(asdu_type) 162 163 if asdu_type == common.AsduType.M_SP_NA: 164 value = common.SingleValue(io_bytes[0] & 1) 165 quality, io_bytes = decode_quality(io_bytes, 166 common.QualityType.INDICATION) 167 168 element = common.IoElement_M_SP_NA(value=value, 169 quality=quality) 170 return element, io_bytes 171 172 if asdu_type == common.AsduType.M_SP_TA: 173 value = common.SingleValue(io_bytes[0] & 1) 174 quality, io_bytes = decode_quality(io_bytes, 175 common.QualityType.INDICATION) 176 177 element = common.IoElement_M_SP_TA(value=value, 178 quality=quality) 179 return element, io_bytes 180 181 if asdu_type == common.AsduType.M_DP_NA: 182 value = common.DoubleValue(io_bytes[0] & 3) 183 quality, io_bytes = decode_quality(io_bytes, 184 common.QualityType.INDICATION) 185 186 element = common.IoElement_M_DP_NA(value=value, 187 quality=quality) 188 return element, io_bytes 189 190 if asdu_type == common.AsduType.M_DP_TA: 191 value = common.DoubleValue(io_bytes[0] & 3) 192 quality, io_bytes = decode_quality(io_bytes, 193 common.QualityType.INDICATION) 194 195 element = common.IoElement_M_DP_TA(value=value, 196 quality=quality) 197 return element, io_bytes 198 199 if asdu_type == common.AsduType.M_ST_NA: 200 value, io_bytes = decode_step_position_value(io_bytes) 201 quality, io_bytes = decode_quality(io_bytes, 202 common.QualityType.MEASUREMENT) 203 204 element = common.IoElement_M_ST_NA(value=value, 205 quality=quality) 206 return element, io_bytes 207 208 if asdu_type == common.AsduType.M_ST_TA: 209 value, io_bytes = decode_step_position_value(io_bytes) 210 quality, io_bytes = decode_quality(io_bytes, 211 common.QualityType.MEASUREMENT) 212 213 element = common.IoElement_M_ST_TA(value=value, 214 quality=quality) 215 return element, io_bytes 216 217 if asdu_type == common.AsduType.M_BO_NA: 218 value, io_bytes = decode_bitstring_value(io_bytes) 219 quality, io_bytes = decode_quality(io_bytes, 220 common.QualityType.MEASUREMENT) 221 222 element = common.IoElement_M_BO_NA(value=value, 223 quality=quality) 224 return element, io_bytes 225 226 if asdu_type == common.AsduType.M_BO_TA: 227 value, io_bytes = decode_bitstring_value(io_bytes) 228 quality, io_bytes = decode_quality(io_bytes, 229 common.QualityType.MEASUREMENT) 230 231 element = common.IoElement_M_BO_NA(value=value, 232 quality=quality) 233 return element, io_bytes 234 235 if asdu_type == common.AsduType.M_ME_NA: 236 value, io_bytes = decode_normalized_value(io_bytes) 237 quality, io_bytes = decode_quality(io_bytes, 238 common.QualityType.MEASUREMENT) 239 240 element = common.IoElement_M_ME_NA(value=value, 241 quality=quality) 242 return element, io_bytes 243 244 if asdu_type == common.AsduType.M_ME_TA: 245 value, io_bytes = decode_normalized_value(io_bytes) 246 quality, io_bytes = decode_quality(io_bytes, 247 common.QualityType.MEASUREMENT) 248 249 element = common.IoElement_M_ME_TA(value=value, 250 quality=quality) 251 return element, io_bytes 252 253 if asdu_type == common.AsduType.M_ME_NB: 254 value, io_bytes = decode_scaled_value(io_bytes) 255 quality, io_bytes = decode_quality(io_bytes, 256 common.QualityType.MEASUREMENT) 257 258 element = common.IoElement_M_ME_NB(value=value, 259 quality=quality) 260 return element, io_bytes 261 262 if asdu_type == common.AsduType.M_ME_TB: 263 value, io_bytes = decode_scaled_value(io_bytes) 264 quality, io_bytes = decode_quality(io_bytes, 265 common.QualityType.MEASUREMENT) 266 267 element = common.IoElement_M_ME_TB(value=value, 268 quality=quality) 269 return element, io_bytes 270 271 if asdu_type == common.AsduType.M_ME_NC: 272 value, io_bytes = decode_floating_value(io_bytes) 273 quality, io_bytes = decode_quality(io_bytes, 274 common.QualityType.MEASUREMENT) 275 276 element = common.IoElement_M_ME_NC(value=value, 277 quality=quality) 278 return element, io_bytes 279 280 if asdu_type == common.AsduType.M_ME_TC: 281 value, io_bytes = decode_floating_value(io_bytes) 282 quality, io_bytes = decode_quality(io_bytes, 283 common.QualityType.MEASUREMENT) 284 285 element = common.IoElement_M_ME_TC(value=value, 286 quality=quality) 287 return element, io_bytes 288 289 if asdu_type == common.AsduType.M_IT_NA: 290 value, io_bytes = decode_binary_counter_value(io_bytes) 291 quality, io_bytes = decode_quality(io_bytes, 292 common.QualityType.COUNTER) 293 294 element = common.IoElement_M_IT_NA(value=value, 295 quality=quality) 296 return element, io_bytes 297 298 if asdu_type == common.AsduType.M_IT_TA: 299 value, io_bytes = decode_binary_counter_value(io_bytes) 300 quality, io_bytes = decode_quality(io_bytes, 301 common.QualityType.COUNTER) 302 303 element = common.IoElement_M_IT_TA(value=value, 304 quality=quality) 305 return element, io_bytes 306 307 if asdu_type == common.AsduType.M_EP_TA: 308 value = common.ProtectionValue(io_bytes[0] & 0x03) 309 quality, io_bytes = decode_quality(io_bytes, 310 common.QualityType.PROTECTION) 311 elapsed_time = int.from_bytes(io_bytes[:2], 'little') 312 io_bytes = io_bytes[2:] 313 314 element = common.IoElement_M_EP_TA(value=value, 315 quality=quality, 316 elapsed_time=elapsed_time) 317 return element, io_bytes 318 319 if asdu_type == common.AsduType.M_EP_TB: 320 value, io_bytes = decode_protection_start_value(io_bytes) 321 quality, io_bytes = decode_quality(io_bytes, 322 common.QualityType.PROTECTION) 323 duration_time = int.from_bytes(io_bytes[:2], 'little') 324 io_bytes = io_bytes[2:] 325 326 element = common.IoElement_M_EP_TB(value=value, 327 quality=quality, 328 duration_time=duration_time) 329 return element, io_bytes 330 331 if asdu_type == common.AsduType.M_EP_TC: 332 value, io_bytes = decode_protection_command_value(io_bytes) 333 quality, io_bytes = decode_quality(io_bytes, 334 common.QualityType.PROTECTION) 335 operating_time = int.from_bytes(io_bytes[:2], 'little') 336 io_bytes = io_bytes[2:] 337 338 element = common.IoElement_M_EP_TC(value=value, 339 quality=quality, 340 operating_time=operating_time) 341 return element, io_bytes 342 343 if asdu_type == common.AsduType.M_PS_NA: 344 value, io_bytes = decode_status_value(io_bytes) 345 quality, io_bytes = decode_quality(io_bytes, 346 common.QualityType.MEASUREMENT) 347 348 element = common.IoElement_M_PS_NA(value=value, 349 quality=quality) 350 return element, io_bytes 351 352 if asdu_type == common.AsduType.M_ME_ND: 353 value, io_bytes = decode_normalized_value(io_bytes) 354 355 element = common.IoElement_M_ME_ND(value=value) 356 return element, io_bytes 357 358 if asdu_type == common.AsduType.M_SP_TB: 359 value = common.SingleValue(io_bytes[0] & 1) 360 quality, io_bytes = decode_quality(io_bytes, 361 common.QualityType.INDICATION) 362 363 element = common.IoElement_M_SP_TB(value=value, 364 quality=quality) 365 return element, io_bytes 366 367 if asdu_type == common.AsduType.M_DP_TB: 368 value = common.DoubleValue(io_bytes[0] & 3) 369 quality, io_bytes = decode_quality(io_bytes, 370 common.QualityType.INDICATION) 371 372 element = common.IoElement_M_DP_TB(value=value, 373 quality=quality) 374 return element, io_bytes 375 376 if asdu_type == common.AsduType.M_ST_TB: 377 value, io_bytes = decode_step_position_value(io_bytes) 378 quality, io_bytes = decode_quality(io_bytes, 379 common.QualityType.MEASUREMENT) 380 381 element = common.IoElement_M_ST_TB(value=value, 382 quality=quality) 383 return element, io_bytes 384 385 if asdu_type == common.AsduType.M_BO_TB: 386 value, io_bytes = decode_bitstring_value(io_bytes) 387 quality, io_bytes = decode_quality(io_bytes, 388 common.QualityType.MEASUREMENT) 389 390 element = common.IoElement_M_BO_TB(value=value, 391 quality=quality) 392 return element, io_bytes 393 394 if asdu_type == common.AsduType.M_ME_TD: 395 value, io_bytes = decode_normalized_value(io_bytes) 396 quality, io_bytes = decode_quality(io_bytes, 397 common.QualityType.MEASUREMENT) 398 399 element = common.IoElement_M_ME_TD(value=value, 400 quality=quality) 401 return element, io_bytes 402 403 if asdu_type == common.AsduType.M_ME_TE: 404 value, io_bytes = decode_scaled_value(io_bytes) 405 quality, io_bytes = decode_quality(io_bytes, 406 common.QualityType.MEASUREMENT) 407 408 element = common.IoElement_M_ME_TE(value=value, 409 quality=quality) 410 return element, io_bytes 411 412 if asdu_type == common.AsduType.M_ME_TF: 413 value, io_bytes = decode_floating_value(io_bytes) 414 quality, io_bytes = decode_quality(io_bytes, 415 common.QualityType.MEASUREMENT) 416 417 element = common.IoElement_M_ME_TF(value=value, 418 quality=quality) 419 return element, io_bytes 420 421 if asdu_type == common.AsduType.M_IT_TB: 422 value, io_bytes = decode_binary_counter_value(io_bytes) 423 quality, io_bytes = decode_quality(io_bytes, 424 common.QualityType.COUNTER) 425 426 element = common.IoElement_M_IT_TB(value=value, 427 quality=quality) 428 return element, io_bytes 429 430 if asdu_type == common.AsduType.M_EP_TD: 431 value = common.ProtectionValue(io_bytes[0] & 0x03) 432 quality, io_bytes = decode_quality(io_bytes, 433 common.QualityType.PROTECTION) 434 elapsed_time = int.from_bytes(io_bytes[:2], 'little') 435 io_bytes = io_bytes[2:] 436 437 element = common.IoElement_M_EP_TD(value=value, 438 quality=quality, 439 elapsed_time=elapsed_time) 440 return element, io_bytes 441 442 if asdu_type == common.AsduType.M_EP_TE: 443 value, io_bytes = decode_protection_start_value(io_bytes) 444 quality, io_bytes = decode_quality(io_bytes, 445 common.QualityType.PROTECTION) 446 duration_time = int.from_bytes(io_bytes[:2], 'little') 447 io_bytes = io_bytes[2:] 448 449 element = common.IoElement_M_EP_TE(value=value, 450 quality=quality, 451 duration_time=duration_time) 452 return element, io_bytes 453 454 if asdu_type == common.AsduType.M_EP_TF: 455 value, io_bytes = decode_protection_command_value(io_bytes) 456 quality, io_bytes = decode_quality(io_bytes, 457 common.QualityType.PROTECTION) 458 operating_time = int.from_bytes(io_bytes[:2], 'little') 459 io_bytes = io_bytes[2:] 460 461 element = common.IoElement_M_EP_TF(value=value, 462 quality=quality, 463 operating_time=operating_time) 464 return element, io_bytes 465 466 if asdu_type == common.AsduType.C_SC_NA: 467 value = common.SingleValue(io_bytes[0] & 1) 468 select = bool(io_bytes[0] & 0x80) 469 qualifier = (io_bytes[0] >> 2) & 0x1F 470 io_bytes = io_bytes[1:] 471 472 element = common.IoElement_C_SC_NA(value=value, 473 select=select, 474 qualifier=qualifier) 475 return element, io_bytes 476 477 if asdu_type == common.AsduType.C_DC_NA: 478 value = common.DoubleValue(io_bytes[0] & 3) 479 select = bool(io_bytes[0] & 0x80) 480 qualifier = (io_bytes[0] >> 2) & 0x1F 481 io_bytes = io_bytes[1:] 482 483 element = common.IoElement_C_DC_NA(value=value, 484 select=select, 485 qualifier=qualifier) 486 return element, io_bytes 487 488 if asdu_type == common.AsduType.C_RC_NA: 489 value = common.RegulatingValue(io_bytes[0] & 3) 490 select = bool(io_bytes[0] & 0x80) 491 qualifier = (io_bytes[0] >> 2) & 0x1F 492 io_bytes = io_bytes[1:] 493 494 element = common.IoElement_C_RC_NA(value=value, 495 select=select, 496 qualifier=qualifier) 497 return element, io_bytes 498 499 if asdu_type == common.AsduType.C_SE_NA: 500 value, io_bytes = decode_normalized_value(io_bytes) 501 select = bool(io_bytes[0] & 0x80) 502 io_bytes = io_bytes[1:] 503 504 element = common.IoElement_C_SE_NA(value=value, 505 select=select) 506 return element, io_bytes 507 508 if asdu_type == common.AsduType.C_SE_NB: 509 value, io_bytes = decode_scaled_value(io_bytes) 510 select = bool(io_bytes[0] & 0x80) 511 io_bytes = io_bytes[1:] 512 513 element = common.IoElement_C_SE_NB(value=value, 514 select=select) 515 return element, io_bytes 516 517 if asdu_type == common.AsduType.C_SE_NC: 518 value, io_bytes = decode_floating_value(io_bytes) 519 select = bool(io_bytes[0] & 0x80) 520 io_bytes = io_bytes[1:] 521 522 element = common.IoElement_C_SE_NC(value=value, 523 select=select) 524 return element, io_bytes 525 526 if asdu_type == common.AsduType.C_BO_NA: 527 value, io_bytes = decode_bitstring_value(io_bytes) 528 529 element = common.IoElement_C_BO_NA(value=value) 530 return element, io_bytes 531 532 if asdu_type == common.AsduType.M_EI_NA: 533 param_change = bool(io_bytes[0] & 0x80) 534 cause = io_bytes[0] & 0x7F 535 io_bytes = io_bytes[1:] 536 537 element = common.IoElement_M_EI_NA(param_change=param_change, 538 cause=cause) 539 return element, io_bytes 540 541 if asdu_type == common.AsduType.C_IC_NA: 542 qualifier, io_bytes = io_bytes[0], io_bytes[1:] 543 544 element = common.IoElement_C_IC_NA(qualifier=qualifier) 545 return element, io_bytes 546 547 if asdu_type == common.AsduType.C_CI_NA: 548 request = io_bytes[0] & 0x3F 549 freeze = common.FreezeCode(io_bytes[0] >> 6) 550 io_bytes = io_bytes[1:] 551 552 element = common.IoElement_C_CI_NA(request=request, 553 freeze=freeze) 554 return element, io_bytes 555 556 if asdu_type == common.AsduType.C_RD_NA: 557 element = common.IoElement_C_RD_NA() 558 return element, io_bytes 559 560 if asdu_type == common.AsduType.C_CS_NA: 561 time = encoder.decode_time(io_bytes[:7], common.TimeSize.SEVEN) 562 io_bytes = io_bytes[7:] 563 564 element = common.IoElement_C_CS_NA(time=time) 565 return element, io_bytes 566 567 if asdu_type == common.AsduType.C_TS_NA: 568 io_bytes = io_bytes[2:] 569 570 element = common.IoElement_C_TS_NA() 571 return element, io_bytes 572 573 if asdu_type == common.AsduType.C_RP_NA: 574 qualifier, io_bytes = io_bytes[0], io_bytes[1:] 575 576 element = common.IoElement_C_RP_NA(qualifier=qualifier) 577 return element, io_bytes 578 579 if asdu_type == common.AsduType.C_CD_NA: 580 time = int.from_bytes(io_bytes[:2], 'little') 581 io_bytes = io_bytes[2:] 582 583 element = common.IoElement_C_CD_NA(time=time) 584 return element, io_bytes 585 586 if asdu_type == common.AsduType.P_ME_NA: 587 value, io_bytes = decode_normalized_value(io_bytes) 588 qualifier, io_bytes = io_bytes[0], io_bytes[1:] 589 590 element = common.IoElement_P_ME_NA(value=value, 591 qualifier=qualifier) 592 return element, io_bytes 593 594 if asdu_type == common.AsduType.P_ME_NB: 595 value, io_bytes = decode_scaled_value(io_bytes) 596 qualifier, io_bytes = io_bytes[0], io_bytes[1:] 597 598 element = common.IoElement_P_ME_NB(value=value, 599 qualifier=qualifier) 600 return element, io_bytes 601 602 if asdu_type == common.AsduType.P_ME_NC: 603 value, io_bytes = decode_floating_value(io_bytes) 604 qualifier, io_bytes = io_bytes[0], io_bytes[1:] 605 606 element = common.IoElement_P_ME_NC(value=value, 607 qualifier=qualifier) 608 return element, io_bytes 609 610 if asdu_type == common.AsduType.P_AC_NA: 611 qualifier, io_bytes = io_bytes[0], io_bytes[1:] 612 613 element = common.IoElement_P_AC_NA(qualifier=qualifier) 614 return element, io_bytes 615 616 if asdu_type == common.AsduType.F_FR_NA: 617 file_name = int.from_bytes(io_bytes[:2], 'little') 618 file_length = int.from_bytes(io_bytes[2:5], 'little') 619 ready = bool(io_bytes[5] & 0x80) 620 io_bytes = io_bytes[6:] 621 622 element = common.IoElement_F_FR_NA(file_name=file_name, 623 file_length=file_length, 624 ready=ready) 625 return element, io_bytes 626 627 if asdu_type == common.AsduType.F_SR_NA: 628 file_name = int.from_bytes(io_bytes[:2], 'little') 629 section_name = io_bytes[2] 630 section_length = int.from_bytes(io_bytes[3:6], 'little') 631 ready = bool(io_bytes[6] & 0x80) 632 io_bytes = io_bytes[7:] 633 634 element = common.IoElement_F_SR_NA(file_name=file_name, 635 section_name=section_name, 636 section_length=section_length, 637 ready=ready) 638 return element, io_bytes 639 640 if asdu_type == common.AsduType.F_SC_NA: 641 file_name = int.from_bytes(io_bytes[:2], 'little') 642 section_name = io_bytes[2] 643 qualifier = io_bytes[3] 644 io_bytes = io_bytes[4:] 645 646 element = common.IoElement_F_SC_NA(file_name=file_name, 647 section_name=section_name, 648 qualifier=qualifier) 649 return element, io_bytes 650 651 if asdu_type == common.AsduType.F_LS_NA: 652 file_name = int.from_bytes(io_bytes[:2], 'little') 653 section_name = io_bytes[2] 654 last_qualifier = io_bytes[3] 655 checksum = io_bytes[4] 656 io_bytes = io_bytes[5:] 657 658 element = common.IoElement_F_LS_NA(file_name=file_name, 659 section_name=section_name, 660 last_qualifier=last_qualifier, 661 checksum=checksum) 662 return element, io_bytes 663 664 if asdu_type == common.AsduType.F_AF_NA: 665 file_name = int.from_bytes(io_bytes[:2], 'little') 666 section_name = io_bytes[2] 667 qualifier = io_bytes[3] 668 io_bytes = io_bytes[4:] 669 670 element = common.IoElement_F_AF_NA(file_name=file_name, 671 section_name=section_name, 672 qualifier=qualifier) 673 return element, io_bytes 674 675 if asdu_type == common.AsduType.F_SG_NA: 676 file_name = int.from_bytes(io_bytes[:2], 'little') 677 section_name = io_bytes[2] 678 length = io_bytes[3] 679 segment = io_bytes[4:4+length] 680 io_bytes = io_bytes[4+length:] 681 682 element = common.IoElement_F_SG_NA(file_name=file_name, 683 section_name=section_name, 684 segment=segment) 685 return element, io_bytes 686 687 if asdu_type == common.AsduType.F_DR_TA: 688 file_name = int.from_bytes(io_bytes[:2], 'little') 689 file_length = int.from_bytes(io_bytes[2:5], 'little') 690 more_follows = bool(io_bytes[5] & 0x20) 691 is_directory = bool(io_bytes[5] & 0x40) 692 transfer_active = bool(io_bytes[5] & 0x80) 693 creation_time = encoder.decode_time(io_bytes[6:13], 694 common.TimeSize.SEVEN) 695 io_bytes = io_bytes[13:] 696 697 element = common.IoElement_F_DR_TA(file_name=file_name, 698 file_length=file_length, 699 more_follows=more_follows, 700 is_directory=is_directory, 701 transfer_active=transfer_active, 702 creation_time=creation_time) 703 return element, io_bytes 704 705 raise ValueError('unsupported ASDU type')
708def encode_io_element(element: common.IoElement, 709 asdu_type: int 710 ) -> typing.Iterable[int]: 711 asdu_type = _decode_asdu_type(asdu_type) 712 713 if isinstance(element, common.IoElement_M_SP_NA): 714 quality = util.first(encode_quality(element.quality)) 715 yield element.value.value | quality 716 717 elif isinstance(element, common.IoElement_M_SP_TA): 718 quality = util.first(encode_quality(element.quality)) 719 yield element.value.value | quality 720 721 elif isinstance(element, common.IoElement_M_DP_NA): 722 quality = util.first(encode_quality(element.quality)) 723 yield element.value.value | quality 724 725 elif isinstance(element, common.IoElement_M_DP_TA): 726 quality = util.first(encode_quality(element.quality)) 727 yield element.value.value | quality 728 729 elif isinstance(element, common.IoElement_M_ST_NA): 730 yield from encode_step_position_value(element.value) 731 yield from encode_quality(element.quality) 732 733 elif isinstance(element, common.IoElement_M_ST_TA): 734 yield from encode_step_position_value(element.value) 735 yield from encode_quality(element.quality) 736 737 elif isinstance(element, common.IoElement_M_BO_NA): 738 yield from encode_bitstring_value(element.value) 739 yield from encode_quality(element.quality) 740 741 elif isinstance(element, common.IoElement_M_BO_TA): 742 yield from encode_bitstring_value(element.value) 743 yield from encode_quality(element.quality) 744 745 elif isinstance(element, common.IoElement_M_ME_NA): 746 yield from encode_normalized_value(element.value) 747 yield from encode_quality(element.quality) 748 749 elif isinstance(element, common.IoElement_M_ME_TA): 750 yield from encode_normalized_value(element.value) 751 yield from encode_quality(element.quality) 752 753 elif isinstance(element, common.IoElement_M_ME_NB): 754 yield from encode_scaled_value(element.value) 755 yield from encode_quality(element.quality) 756 757 elif isinstance(element, common.IoElement_M_ME_TB): 758 yield from encode_scaled_value(element.value) 759 yield from encode_quality(element.quality) 760 761 elif isinstance(element, common.IoElement_M_ME_NC): 762 yield from encode_floating_value(element.value) 763 yield from encode_quality(element.quality) 764 765 elif isinstance(element, common.IoElement_M_ME_TC): 766 yield from encode_floating_value(element.value) 767 yield from encode_quality(element.quality) 768 769 elif isinstance(element, common.IoElement_M_IT_NA): 770 yield from encode_binary_counter_value(element.value) 771 yield from encode_quality(element.quality) 772 773 elif isinstance(element, common.IoElement_M_IT_TA): 774 yield from encode_binary_counter_value(element.value) 775 yield from encode_quality(element.quality) 776 777 elif isinstance(element, common.IoElement_M_EP_TA): 778 quality = util.first(encode_quality(element.quality)) 779 yield element.value.value | quality 780 yield from element.elapsed_time.to_bytes(2, 'little') 781 782 elif isinstance(element, common.IoElement_M_EP_TB): 783 yield from encode_protection_start_value(element.value) 784 yield from encode_quality(element.quality) 785 yield from element.duration_time.to_bytes(2, 'little') 786 787 elif isinstance(element, common.IoElement_M_EP_TC): 788 yield from encode_protection_command_value(element.value) 789 yield from encode_quality(element.quality) 790 yield from element.operating_time.to_bytes(2, 'little') 791 792 elif isinstance(element, common.IoElement_M_PS_NA): 793 yield from encode_status_value(element.value) 794 yield from encode_quality(element.quality) 795 796 elif isinstance(element, common.IoElement_M_ME_ND): 797 yield from encode_normalized_value(element.value) 798 799 elif isinstance(element, common.IoElement_M_SP_TB): 800 quality = util.first(encode_quality(element.quality)) 801 yield element.value.value | quality 802 803 elif isinstance(element, common.IoElement_M_DP_TB): 804 quality = util.first(encode_quality(element.quality)) 805 yield element.value.value | quality 806 807 elif isinstance(element, common.IoElement_M_ST_TB): 808 yield from encode_step_position_value(element.value) 809 yield from encode_quality(element.quality) 810 811 elif isinstance(element, common.IoElement_M_BO_TB): 812 yield from encode_bitstring_value(element.value) 813 yield from encode_quality(element.quality) 814 815 elif isinstance(element, common.IoElement_M_ME_TD): 816 yield from encode_normalized_value(element.value) 817 yield from encode_quality(element.quality) 818 819 elif isinstance(element, common.IoElement_M_ME_TE): 820 yield from encode_scaled_value(element.value) 821 yield from encode_quality(element.quality) 822 823 elif isinstance(element, common.IoElement_M_ME_TF): 824 yield from encode_floating_value(element.value) 825 yield from encode_quality(element.quality) 826 827 elif isinstance(element, common.IoElement_M_IT_TB): 828 yield from encode_binary_counter_value(element.value) 829 yield from encode_quality(element.quality) 830 831 elif isinstance(element, common.IoElement_M_EP_TD): 832 quality = util.first(encode_quality(element.quality)) 833 yield element.value.value | quality 834 yield from element.elapsed_time.to_bytes(2, 'little') 835 836 elif isinstance(element, common.IoElement_M_EP_TE): 837 yield from encode_protection_start_value(element.value) 838 yield from encode_quality(element.quality) 839 yield from element.duration_time.to_bytes(2, 'little') 840 841 elif isinstance(element, common.IoElement_M_EP_TF): 842 yield from encode_protection_command_value(element.value) 843 yield from encode_quality(element.quality) 844 yield from element.operating_time.to_bytes(2, 'little') 845 846 elif isinstance(element, common.IoElement_C_SC_NA): 847 yield (element.value.value | 848 (0x80 if element.select else 0) | 849 ((element.qualifier & 0x1F) << 2)) 850 851 elif isinstance(element, common.IoElement_C_DC_NA): 852 yield (element.value.value | 853 (0x80 if element.select else 0) | 854 ((element.qualifier & 0x1F) << 2)) 855 856 elif isinstance(element, common.IoElement_C_RC_NA): 857 yield (element.value.value | 858 (0x80 if element.select else 0) | 859 ((element.qualifier & 0x1F) << 2)) 860 861 elif isinstance(element, common.IoElement_C_SE_NA): 862 yield from encode_normalized_value(element.value) 863 yield (0x80 if element.select else 0) 864 865 elif isinstance(element, common.IoElement_C_SE_NB): 866 yield from encode_scaled_value(element.value) 867 yield (0x80 if element.select else 0) 868 869 elif isinstance(element, common.IoElement_C_SE_NC): 870 yield from encode_floating_value(element.value) 871 yield (0x80 if element.select else 0) 872 873 elif isinstance(element, common.IoElement_C_BO_NA): 874 yield from encode_bitstring_value(element.value) 875 876 elif isinstance(element, common.IoElement_M_EI_NA): 877 yield ((0x80 if element.param_change else 0x00) | 878 (element.cause & 0x7F)) 879 880 elif isinstance(element, common.IoElement_C_IC_NA): 881 yield element.qualifier & 0xFF 882 883 elif isinstance(element, common.IoElement_C_CI_NA): 884 yield ((element.freeze.value << 6) | 885 (element.request & 0x3F)) 886 887 elif isinstance(element, common.IoElement_C_RD_NA): 888 pass 889 890 elif isinstance(element, common.IoElement_C_CS_NA): 891 yield from encoder.encode_time(element.time, common.TimeSize.SEVEN) 892 893 elif isinstance(element, common.IoElement_C_TS_NA): 894 yield 0xAA 895 yield 0x55 896 897 elif isinstance(element, common.IoElement_C_RP_NA): 898 yield element.qualifier & 0xFF 899 900 elif isinstance(element, common.IoElement_C_CD_NA): 901 yield from element.time.to_bytes(2, 'little') 902 903 elif isinstance(element, common.IoElement_P_ME_NA): 904 yield from encode_normalized_value(element.value) 905 yield element.qualifier & 0xFF 906 907 elif isinstance(element, common.IoElement_P_ME_NB): 908 yield from encode_scaled_value(element.value) 909 yield element.qualifier & 0xFF 910 911 elif isinstance(element, common.IoElement_P_ME_NC): 912 yield from encode_floating_value(element.value) 913 yield element.qualifier & 0xFF 914 915 elif isinstance(element, common.IoElement_P_AC_NA): 916 yield element.qualifier & 0xFF 917 918 elif isinstance(element, common.IoElement_F_FR_NA): 919 yield from element.file_name.to_bytes(2, 'little') 920 yield from element.file_length.to_bytes(3, 'little') 921 yield (0x80 if element.ready else 0x00) 922 923 elif isinstance(element, common.IoElement_F_SR_NA): 924 yield from element.file_name.to_bytes(2, 'little') 925 yield element.section_name & 0xFF 926 yield from element.section_length.to_bytes(3, 'little') 927 yield (0x80 if element.ready else 0x00) 928 929 elif isinstance(element, common.IoElement_F_SC_NA): 930 yield from element.file_name.to_bytes(2, 'little') 931 yield element.section_name & 0xFF 932 yield element.qualifier & 0xFF 933 934 elif isinstance(element, common.IoElement_F_LS_NA): 935 yield from element.file_name.to_bytes(2, 'little') 936 yield element.section_name & 0xFF 937 yield element.last_qualifier & 0xFF 938 yield element.checksum & 0xFF 939 940 elif isinstance(element, common.IoElement_F_AF_NA): 941 yield from element.file_name.to_bytes(2, 'little') 942 yield element.section_name & 0xFF 943 yield element.qualifier & 0xFF 944 945 elif isinstance(element, common.IoElement_F_SG_NA): 946 yield from element.file_name.to_bytes(2, 'little') 947 yield element.section_name & 0xFF 948 yield len(element.segment) 949 yield from element.segment 950 951 elif isinstance(element, common.IoElement_F_DR_TA): 952 yield from element.file_name.to_bytes(2, 'little') 953 yield from element.file_length.to_bytes(3, 'little') 954 yield ((0x20 if element.more_follows else 0x00) | 955 (0x40 if element.is_directory else 0x00) | 956 (0x80 if element.transfer_active else 0x00)) 957 yield from encoder.encode_time(element.creation_time, 958 common.TimeSize.SEVEN) 959 960 else: 961 raise ValueError('unsupported IO element')
964def decode_quality(io_bytes: util.Bytes, 965 quality_type: common.QualityType 966 ) -> tuple[common.Quality, util.Bytes]: 967 if quality_type == common.QualityType.INDICATION: 968 invalid = bool(io_bytes[0] & 0x80) 969 not_topical = bool(io_bytes[0] & 0x40) 970 substituted = bool(io_bytes[0] & 0x20) 971 blocked = bool(io_bytes[0] & 0x10) 972 quality = common.IndicationQuality(invalid=invalid, 973 not_topical=not_topical, 974 substituted=substituted, 975 blocked=blocked) 976 977 elif quality_type == common.QualityType.MEASUREMENT: 978 invalid = bool(io_bytes[0] & 0x80) 979 not_topical = bool(io_bytes[0] & 0x40) 980 substituted = bool(io_bytes[0] & 0x20) 981 blocked = bool(io_bytes[0] & 0x10) 982 overflow = bool(io_bytes[0] & 0x01) 983 quality = common.MeasurementQuality(invalid=invalid, 984 not_topical=not_topical, 985 substituted=substituted, 986 blocked=blocked, 987 overflow=overflow) 988 989 elif quality_type == common.QualityType.COUNTER: 990 invalid = bool(io_bytes[0] & 0x80) 991 adjusted = bool(io_bytes[0] & 0x40) 992 overflow = bool(io_bytes[0] & 0x20) 993 sequence = io_bytes[0] & 0x1F 994 quality = common.CounterQuality(invalid=invalid, 995 adjusted=adjusted, 996 overflow=overflow, 997 sequence=sequence) 998 999 elif quality_type == common.QualityType.PROTECTION: 1000 invalid = bool(io_bytes[0] & 0x80) 1001 not_topical = bool(io_bytes[0] & 0x40) 1002 substituted = bool(io_bytes[0] & 0x20) 1003 blocked = bool(io_bytes[0] & 0x10) 1004 time_invalid = bool(io_bytes[0] & 0x08) 1005 quality = common.ProtectionQuality(invalid=invalid, 1006 not_topical=not_topical, 1007 substituted=substituted, 1008 blocked=blocked, 1009 time_invalid=time_invalid) 1010 1011 else: 1012 raise ValueError('unsupported quality type') 1013 1014 return quality, io_bytes[1:]
1017def encode_quality(quality: common.Quality 1018 ) -> typing.Iterable[int]: 1019 if isinstance(quality, common.IndicationQuality): 1020 yield ((0x80 if quality.invalid else 0) | 1021 (0x40 if quality.not_topical else 0) | 1022 (0x20 if quality.substituted else 0) | 1023 (0x10 if quality.blocked else 0)) 1024 1025 elif isinstance(quality, common.MeasurementQuality): 1026 yield ((0x80 if quality.invalid else 0) | 1027 (0x40 if quality.not_topical else 0) | 1028 (0x20 if quality.substituted else 0) | 1029 (0x10 if quality.blocked else 0) | 1030 (0x01 if quality.overflow else 0)) 1031 1032 elif isinstance(quality, common.CounterQuality): 1033 yield ((0x80 if quality.invalid else 0) | 1034 (0x40 if quality.adjusted else 0) | 1035 (0x20 if quality.overflow else 0) | 1036 (quality.sequence & 0x1F)) 1037 1038 elif isinstance(quality, common.ProtectionQuality): 1039 yield ((0x80 if quality.invalid else 0) | 1040 (0x40 if quality.not_topical else 0) | 1041 (0x20 if quality.substituted else 0) | 1042 (0x10 if quality.blocked else 0) | 1043 (0x08 if quality.time_invalid else 0)) 1044 1045 else: 1046 raise ValueError('unsupported quality')
1049def decode_step_position_value(io_bytes: util.Bytes 1050 ) -> tuple[common.StepPositionValue, 1051 util.Bytes]: 1052 value = (((-1 << 7) if io_bytes[0] & 0x40 else 0) | 1053 (io_bytes[0] & 0x7F)) 1054 transient = bool(io_bytes[0] & 0x80) 1055 step_position_value = common.StepPositionValue(value=value, 1056 transient=transient) 1057 return step_position_value, io_bytes[1:]
1130def decode_protection_start_value(io_bytes: util.Bytes 1131 ) -> tuple[common.ProtectionStartValue, 1132 util.Bytes]: 1133 general = bool(io_bytes[0] & 0x01) 1134 l1 = bool(io_bytes[0] & 0x02) 1135 l2 = bool(io_bytes[0] & 0x04) 1136 l3 = bool(io_bytes[0] & 0x08) 1137 ie = bool(io_bytes[0] & 0x10) 1138 reverse = bool(io_bytes[0] & 0x20) 1139 protection_start_value = common.ProtectionStartValue(general=general, 1140 l1=l1, 1141 l2=l2, 1142 l3=l3, 1143 ie=ie, 1144 reverse=reverse) 1145 return protection_start_value, io_bytes[1:]
1148def encode_protection_start_value(value: common.ProtectionStartValue 1149 ) -> typing.Iterable[int]: 1150 yield ((0x01 if value.general else 0x00) | 1151 (0x02 if value.l1 else 0x00) | 1152 (0x04 if value.l2 else 0x00) | 1153 (0x08 if value.l3 else 0x00) | 1154 (0x10 if value.ie else 0x00) | 1155 (0x20 if value.reverse else 0x00))
1158def decode_protection_command_value(io_bytes: util.Bytes 1159 ) -> tuple[common.ProtectionCommandValue, 1160 util.Bytes]: 1161 general = bool(io_bytes[0] & 0x01) 1162 l1 = bool(io_bytes[0] & 0x02) 1163 l2 = bool(io_bytes[0] & 0x04) 1164 l3 = bool(io_bytes[0] & 0x08) 1165 protection_command_value = common.ProtectionCommandValue(general=general, 1166 l1=l1, 1167 l2=l2, 1168 l3=l3) 1169 return protection_command_value, io_bytes[1:]
1180def decode_status_value(io_bytes: util.Bytes 1181 ) -> tuple[common.StatusValue, util.Bytes]: 1182 value = [bool(io_bytes[i // 8] & (1 << (i % 8))) 1183 for i in range(16)] 1184 change = [bool(io_bytes[2 + i // 8] & (1 << (i % 8))) 1185 for i in range(16)] 1186 status_value = common.StatusValue(value=value, 1187 change=change) 1188 return status_value, io_bytes[4:]