hat.drivers.iec103
IEC 60870-5-103 communication protocol
1"""IEC 60870-5-103 communication protocol""" 2 3from hat.drivers.iec103.common import (Description, 4 IoAddress, 5 Identification, 6 TimeSize, 7 Time, 8 ValueType, 9 NoneValue, 10 TextValue, 11 BitstringValue, 12 UIntValue, 13 IntValue, 14 UFixedValue, 15 FixedValue, 16 Real32Value, 17 Real64Value, 18 DoubleValue, 19 SingleValue, 20 ExtendedDoubleValue, 21 MeasurandValue, 22 TimeValue, 23 IdentificationValue, 24 RelativeTimeValue, 25 IoAddressValue, 26 DoubleWithTimeValue, 27 DoubleWithRelativeTimeValue, 28 MeasurandWithRelativeTimeValue, 29 TextNumberValue, 30 ReplyValue, 31 ArrayValue, 32 IndexValue, 33 Value, 34 AsduAddress, 35 DataCause, 36 GenericDataCause, 37 MeasurandType, 38 MeasurandValues, 39 Data, 40 GenericData, 41 time_from_datetime, 42 time_to_datetime) 43from hat.drivers.iec103.master import (DataCb, 44 GenericDataCb, 45 MasterConnection) 46 47 48__all__ = ['Description', 49 'IoAddress', 50 'Identification', 51 'TimeSize', 52 'Time', 53 'ValueType', 54 'NoneValue', 55 'TextValue', 56 'BitstringValue', 57 'UIntValue', 58 'IntValue', 59 'UFixedValue', 60 'FixedValue', 61 'Real32Value', 62 'Real64Value', 63 'DoubleValue', 64 'SingleValue', 65 'ExtendedDoubleValue', 66 'MeasurandValue', 67 'TimeValue', 68 'IdentificationValue', 69 'RelativeTimeValue', 70 'IoAddressValue', 71 'DoubleWithTimeValue', 72 'DoubleWithRelativeTimeValue', 73 'MeasurandWithRelativeTimeValue', 74 'TextNumberValue', 75 'ReplyValue', 76 'ArrayValue', 77 'IndexValue', 78 'Value', 79 'AsduAddress', 80 'DataCause', 81 'GenericDataCause', 82 'MeasurandType', 83 'MeasurandValues', 84 'Data', 85 'GenericData', 86 'time_from_datetime', 87 'time_to_datetime', 88 'DataCb', 89 'GenericDataCb', 90 'MasterConnection']
67class Description(enum.Enum): 68 NOT_SPECIFIED = 0 69 ACTUAL_VALUE = 1 70 DEFAULT_VALUE = 2 71 RANGE = 3 72 PRECISION = 5 73 FACTOR = 6 74 REFERENCE = 7 75 ENUMERATION = 8 76 DIMENSION = 9 77 DESCRIPTION = 10 78 PASSWORD = 12 79 READ_ONLY = 13 80 WRITE_ONLY = 14 81 IO_ADDRESS = 19 82 EVENT = 20 83 TEXT_ARRAY = 21 84 VALUE_ARRAY = 22 85 RELATED = 23
126class IoAddress(typing.NamedTuple): 127 function_type: int 128 """function_type is in range [0, 255]""" 129 information_number: int 130 """information_number is in range [0, 255]"""
IoAddress(function_type, information_number)
133class Identification(typing.NamedTuple): 134 group_id: int 135 """group_id in range [0, 255]""" 136 entry_id: int 137 """entry_id in range [0, 255]"""
Identification(group_id, entry_id)
31class Time(typing.NamedTuple): 32 size: TimeSize 33 milliseconds: int 34 """milliseconds in range [0, 59999]""" 35 invalid: bool | None 36 """available for size THREE, FOUR, SEVEN""" 37 minutes: int | None 38 """available for size THREE, FOUR, SEVEN (minutes in range [0, 59])""" 39 summer_time: bool | None 40 """available for size FOUR, SEVEN""" 41 hours: int | None 42 """available for size FOUR, SEVEN (hours in range [0, 23])""" 43 day_of_week: int | None 44 """available for size SEVEN (day_of_week in range [1, 7])""" 45 day_of_month: int | None 46 """available for size SEVEN (day_of_month in range [1, 31])""" 47 months: int | None 48 """available for size SEVEN (months in range [1, 12])""" 49 years: int | None 50 """available for size SEVEN (years in range [0, 99])"""
Time(size, milliseconds, invalid, minutes, summer_time, hours, day_of_week, day_of_month, months, years)
Create new instance of Time(size, milliseconds, invalid, minutes, summer_time, hours, day_of_week, day_of_month, months, years)
140class ValueType(enum.Enum): 141 NONE = 0 142 TEXT = 1 143 BITSTRING = 2 144 UINT = 3 145 INT = 4 146 UFIXED = 5 147 FIXED = 6 148 REAL32 = 7 149 REAL64 = 8 150 DOUBLE = 9 151 SINGLE = 10 152 EXTENDED_DOUBLE = 11 153 MEASURAND = 12 154 TIME = 14 155 IDENTIFICATION = 15 156 RELATIVE_TIME = 16 157 IO_ADDRESS = 17 158 DOUBLE_WITH_TIME = 18 159 DOUBLE_WITH_RELATIVE_TIME = 19 160 MEASURAND_WITH_RELATIVE_TIME = 20 161 TEXT_NUMBER = 21 162 REPLY = 22 163 ARRAY = 23 164 INDEX = 24
NoneValue()
TextValue(value,)
BitstringValue(value,)
UIntValue(value,)
IntValue(value,)
UFixedValue(value,)
FixedValue(value,)
Real32Value(value,)
Real64Value(value,)
224class MeasurandValue(typing.NamedTuple): 225 overflow: bool 226 invalid: bool 227 value: float 228 """value in range [-1.0, 1.0)"""
MeasurandValue(overflow, invalid, value)
TimeValue(value,)
IdentificationValue(value,)
RelativeTimeValue(value,)
IoAddressValue(value,)
249class DoubleWithTimeValue(typing.NamedTuple): 250 value: DoubleValue 251 time: Time 252 """time size is FOUR""" 253 supplementary: int 254 """supplementary in range [0, 255]"""
DoubleWithTimeValue(value, time, supplementary)
Create new instance of DoubleWithTimeValue(value, time, supplementary)
257class DoubleWithRelativeTimeValue(typing.NamedTuple): 258 value: DoubleValue 259 relative_time: int 260 """relative_time in range [0, 65535]""" 261 fault_number: int 262 """fault_number in range [0, 65535]""" 263 time: Time 264 """time size is FOUR""" 265 supplementary: int 266 """supplementary in range [0, 255]"""
DoubleWithRelativeTimeValue(value, relative_time, fault_number, time, supplementary)
Create new instance of DoubleWithRelativeTimeValue(value, relative_time, fault_number, time, supplementary)
269class MeasurandWithRelativeTimeValue(typing.NamedTuple): 270 value: float 271 relative_time: int 272 """relative_time in range [0, 65535]""" 273 fault_number: int 274 """fault_number in range [0, 65535]""" 275 time: Time 276 """time size is FOUR"""
MeasurandWithRelativeTimeValue(value, relative_time, fault_number, time)
Create new instance of MeasurandWithRelativeTimeValue(value, relative_time, fault_number, time)
TextNumberValue(value,)
283class ReplyValue(enum.Enum): 284 ACK = 0 285 INVALID_IDENTIFICATION = 1 286 DATA_NOT_EXISTS = 2 287 DATA_NOT_AVAILABLE = 3 288 VERIFY_ERROR = 4 289 OUT_OF_RANGE = 5 290 ENTRY_TO_LARGE = 6 291 TOO_MANY_COMMANDS = 7 292 ENTRY_READ_ONLY = 8 293 PASSWORD_PROTECTED = 9 294 IN_PROGRESS = 10 295 FOLLOWING_DESCRIPTION = 11
298class ArrayValue(typing.NamedTuple): 299 value_type: ValueType 300 more_follows: bool 301 values: typing.List['Value']
ArrayValue(value_type, more_follows, values)
Create new instance of ArrayValue(value_type, more_follows, values)
Alias for field number 2
IndexValue(value,)
46class DataCause(enum.Enum): 47 SPONTANEOUS = iec103.Cause.SPONTANEOUS.value 48 CYCLIC = iec103.Cause.CYCLIC.value 49 TEST_MODE = iec103.Cause.TEST_MODE.value 50 GENERAL_INTERROGATION = iec103.Cause.GENERAL_INTERROGATION.value 51 LOCAL_OPERATION = iec103.Cause.LOCAL_OPERATION.value 52 REMOTE_OPERATION = iec103.Cause.REMOTE_OPERATION.value
55class GenericDataCause(enum.Enum): 56 SPONTANEOUS = iec103.Cause.SPONTANEOUS.value 57 CYCLIC = iec103.Cause.CYCLIC.value 58 TEST_MODE = iec103.Cause.TEST_MODE.value 59 GENERAL_INTERROGATION = iec103.Cause.GENERAL_INTERROGATION.value 60 LOCAL_OPERATION = iec103.Cause.LOCAL_OPERATION.value 61 REMOTE_OPERATION = iec103.Cause.REMOTE_OPERATION.value 62 WRITE_ACK = iec103.Cause.GENERIC_WRITE_COMMAND.value 63 WRITE_NACK = iec103.Cause.GENERIC_WRITE_COMMAND_NACK.value 64 READ_ACK = iec103.Cause.GENERIC_READ_COMMAND.value 65 READ_NACK = iec103.Cause.GENERIC_READ_COMMAND_NACK.value 66 WRITE_CONFIRMATION = iec103.Cause.GENERIC_WRITE_CONFIRMATION.value
69class MeasurandType(enum.Enum): 70 M1_I_L2 = (iec103.AsduType.MEASURANDS_1.value, 0) 71 M1_U_L12 = (iec103.AsduType.MEASURANDS_1.value, 1) 72 M1_P = (iec103.AsduType.MEASURANDS_1.value, 2) 73 M1_Q = (iec103.AsduType.MEASURANDS_1.value, 3) 74 M2_I_L1 = (iec103.AsduType.MEASURANDS_2.value, 0) 75 M2_I_L2 = (iec103.AsduType.MEASURANDS_2.value, 1) 76 M2_I_L3 = (iec103.AsduType.MEASURANDS_2.value, 2) 77 M2_U_L1E = (iec103.AsduType.MEASURANDS_2.value, 3) 78 M2_U_L2E = (iec103.AsduType.MEASURANDS_2.value, 4) 79 M2_U_L3E = (iec103.AsduType.MEASURANDS_2.value, 5) 80 M2_P = (iec103.AsduType.MEASURANDS_2.value, 6) 81 M2_Q = (iec103.AsduType.MEASURANDS_2.value, 7) 82 M2_F = (iec103.AsduType.MEASURANDS_2.value, 8)
MeasurandValues(values,)
Create new instance of MeasurandValues(values,)
89class Data(typing.NamedTuple): 90 asdu_address: AsduAddress 91 io_address: IoAddress 92 cause: DataCause | OtherCause 93 value: (DoubleWithTimeValue | 94 DoubleWithRelativeTimeValue | 95 MeasurandValues | 96 MeasurandWithRelativeTimeValue)
Data(asdu_address, io_address, cause, value)
Create new instance of Data(asdu_address, io_address, cause, value)
Alias for field number 3
99class GenericData(typing.NamedTuple): 100 asdu_address: AsduAddress 101 io_address: IoAddress 102 cause: GenericDataCause | OtherCause 103 identification: Identification 104 description: Description 105 value: ArrayValue
GenericData(asdu_address, io_address, cause, identification, description, value)
Create new instance of GenericData(asdu_address, io_address, cause, identification, description, value)
66def time_from_datetime(dt: datetime.datetime, 67 invalid: bool = False 68 ) -> Time: 69 """Create Time from datetime.datetime""" 70 # TODO document edge cases (local time, os implementation, ...) 71 # rounding microseconds to the nearest millisecond 72 dt_rounded = ( 73 dt.replace(microsecond=0) + 74 datetime.timedelta(milliseconds=round(dt.microsecond / 1000))) 75 local_time = time.localtime(dt_rounded.timestamp()) 76 77 return Time( 78 size=TimeSize.SEVEN, 79 milliseconds=(local_time.tm_sec * 1000 + 80 dt_rounded.microsecond // 1000), 81 invalid=invalid, 82 minutes=local_time.tm_min, 83 summer_time=bool(local_time.tm_isdst), 84 hours=local_time.tm_hour, 85 day_of_week=local_time.tm_wday + 1, 86 day_of_month=local_time.tm_mday, 87 months=local_time.tm_mon, 88 years=local_time.tm_year % 100)
Create Time from datetime.datetime
91def time_to_datetime(t: Time 92 ) -> datetime.datetime: 93 """Convert Time to datetime.datetime""" 94 # TODO document edge cases (local time, os implementation, ...) 95 # TODO support TimeSize.FOUR 96 if t.size == TimeSize.TWO: 97 local_now = datetime.datetime.now() 98 local_dt = local_now.replace( 99 second=int(t.milliseconds / 1000), 100 microsecond=(t.milliseconds % 1000) * 1000) 101 102 local_seconds = local_now.second + local_now.microsecond / 1_000_000 103 t_seconds = t.milliseconds / 1_000 104 105 if abs(local_seconds - t_seconds) > 30: 106 if local_seconds < t_seconds: 107 local_dt = local_dt - datetime.timedelta(minutes=1) 108 109 else: 110 local_dt = local_dt + datetime.timedelta(minutes=1) 111 112 elif t.size == TimeSize.THREE: 113 local_now = datetime.datetime.now() 114 local_dt = local_now.replace( 115 minute=t.minutes, 116 second=int(t.milliseconds / 1000), 117 microsecond=(t.milliseconds % 1000) * 1000) 118 119 local_minutes = (local_now.minute + 120 local_now.second / 60 + 121 local_now.microsecond / 60_000_000) 122 t_minutes = t.minutes + t.milliseconds / 60_000 123 124 if abs(local_minutes - t_minutes) > 30: 125 if local_minutes < t_minutes: 126 local_dt = local_dt - datetime.timedelta(hours=1) 127 128 else: 129 local_dt = local_dt + datetime.timedelta(hours=1) 130 131 elif t.size == TimeSize.SEVEN: 132 local_dt = datetime.datetime( 133 year=2000 + t.years if t.years < 70 else 1900 + t.years, 134 month=t.months, 135 day=t.day_of_month, 136 hour=t.hours, 137 minute=t.minutes, 138 second=int(t.milliseconds / 1000), 139 microsecond=(t.milliseconds % 1000) * 1000, 140 fold=not t.summer_time) 141 142 else: 143 raise ValueError('unsupported time size') 144 145 return local_dt.astimezone(tz=datetime.timezone.utc)
Convert Time to datetime.datetime
25class MasterConnection(aio.Resource): 26 27 def __init__(self, 28 conn: link.Connection, 29 *, 30 data_cb: DataCb | None = None, 31 generic_data_cb: GenericDataCb | None = None): 32 self._conn = conn 33 self._data_cb = data_cb 34 self._generic_data_cb = generic_data_cb 35 36 self._log = logger.create_logger(mlog, conn.info) 37 self._comm_log = logger.CommunicationLogger(mlog, conn.info) 38 39 self._encoder = iec103.Encoder() 40 41 self._interrogate_lock = asyncio.Lock() 42 self._interrogate_req_id = None 43 self._interrogate_future = None 44 45 self._send_command_lock = asyncio.Lock() 46 self._send_command_req_id = None 47 self._send_command_future = None 48 49 self._interrogate_generic_lock = asyncio.Lock() 50 self._interrogate_generic_req_id = None 51 self._interrogate_generic_future = None 52 53 self._next_req_ids = (i % 0x100 for i in itertools.count(0)) 54 55 self._process_single_element_fns = { 56 iec103.AsduType.TIME_TAGGED_MESSAGE: self._process_TIME_TAGGED_MESSAGE, # NOQA 57 iec103.AsduType.TIME_TAGGED_MESSAGE_WITH_RELATIVE_TIME: self._process_TIME_TAGGED_MESSAGE_WITH_RELATIVE_TIME, # NOQA 58 iec103.AsduType.TIME_TAGGED_MEASURANDS_WITH_RELATIVE_TIME: self._process_TIME_TAGGED_MEASURANDS_WITH_RELATIVE_TIME, # NOQA 59 iec103.AsduType.IDENTIFICATION: self._process_IDENTIFICATION, 60 iec103.AsduType.TIME_SYNCHRONIZATION: self._process_TIME_SYNCHRONIZATION, # NOQA 61 iec103.AsduType.GENERAL_INTERROGATION_TERMINATION: self._process_GENERAL_INTERROGATION_TERMINATION, # NOQA 62 iec103.AsduType.GENERIC_DATA: self._process_GENERIC_DATA, 63 iec103.AsduType.GENERIC_IDENTIFICATION: self._process_GENERIC_IDENTIFICATION, # NOQA 64 iec103.AsduType.READY_FOR_TRANSMISSION_OF_DISTURBANCE_DATA: self._process_READY_FOR_TRANSMISSION_OF_DISTURBANCE_DATA, # NOQA 65 iec103.AsduType.READY_FOR_TRANSMISSION_OF_A_CHANNEL: self._process_READY_FOR_TRANSMISSION_OF_A_CHANNEL, # NOQA 66 iec103.AsduType.READY_FOR_TRANSMISSION_OF_TAGS: self._process_READY_FOR_TRANSMISSION_OF_TAGS, # NOQA 67 iec103.AsduType.TRANSMISSION_OF_TAGS: self._process_TRANSMISSION_OF_TAGS, # NOQA 68 iec103.AsduType.TRANSMISSION_OF_DISTURBANCE_VALUES: self._process_TRANSMISSION_OF_DISTURBANCE_VALUES, # NOQA 69 iec103.AsduType.END_OF_TRANSMISSION: self._process_END_OF_TRANSMISSION} # NOQA 70 self._process_multiple_elements_fns = { 71 iec103.AsduType.MEASURANDS_1: self._process_MEASURANDS_1, 72 iec103.AsduType.MEASURANDS_2: self._process_MEASURANDS_2, 73 iec103.AsduType.LIST_OF_RECORDED_DISTURBANCES: self._process_LIST_OF_RECORDED_DISTURBANCES} # NOQA 74 75 self.async_group.spawn(self._receive_loop) 76 77 self.async_group.spawn(aio.call_on_cancel, self._comm_log.log, 78 common.CommLogAction.CLOSE) 79 self._comm_log.log(common.CommLogAction.OPEN) 80 81 @property 82 def async_group(self): 83 return self._conn.async_group 84 85 async def time_sync(self, 86 time: common.Time | None = None, 87 asdu_address: common.AsduAddress = 0xFF): 88 if not self.is_open: 89 raise ConnectionError() 90 91 time = time or common.time_from_datetime(datetime.datetime.now()) 92 io_address = common.IoAddress( 93 _FunctionType.GLOBAL_FUNCTION_TYPE.value, 94 _InformationNumber.GENERAL_INTERROGATION_OR_TIME_SYNCHRONIZATION.value) # NOQA 95 asdu = iec103.ASDU( 96 type=iec103.AsduType.TIME_SYNCHRONIZATION, 97 cause=iec103.Cause.TIME_SYNCHRONIZATION, 98 address=asdu_address, 99 ios=[iec103.IO( 100 address=io_address, 101 elements=[iec103.IoElement_TIME_SYNCHRONIZATION( 102 time=time)])]) 103 104 data = self._encoder.encode_asdu(asdu) 105 106 self._comm_log.log(common.CommLogAction.SEND, asdu) 107 108 await self._conn.send(data) 109 110 async def interrogate(self, asdu_address: common.AsduAddress): 111 async with self._interrogate_lock: 112 if not self.is_open: 113 raise ConnectionError() 114 115 scan_number = next(self._next_req_ids) 116 asdu = iec103.ASDU( 117 type=iec103.AsduType.GENERAL_INTERROGATION, 118 cause=iec103.Cause.GENERAL_INTERROGATION, 119 address=asdu_address, 120 ios=[iec103.IO( 121 address=iec103.IoAddress( 122 function_type=_FunctionType.GLOBAL_FUNCTION_TYPE.value, # NOQA 123 information_number=_InformationNumber.GENERAL_INTERROGATION_OR_TIME_SYNCHRONIZATION.value), # NOQA 124 elements=[iec103.IoElement_GENERAL_INTERROGATION( # NOQA 125 scan_number=scan_number)])]) 126 127 data = self._encoder.encode_asdu(asdu) 128 129 try: 130 self._comm_log.log(common.CommLogAction.SEND, asdu) 131 132 self._interrogate_req_id = scan_number 133 self._interrogate_future = asyncio.Future() 134 await self._conn.send(data) 135 await self._interrogate_future 136 137 finally: 138 self._interrogate_req_id = None 139 self._interrogate_future = None 140 141 async def send_command(self, 142 asdu_address: common.AsduAddress, 143 io_address: common.IoAddress, 144 value: common.DoubleValue 145 ) -> bool: 146 async with self._send_command_lock: 147 if not self.is_open: 148 raise ConnectionError() 149 150 return_identifier = next(self._next_req_ids) 151 asdu = iec103.ASDU( 152 type=iec103.AsduType.GENERAL_COMMAND, 153 cause=iec103.Cause.GENERAL_COMMAND, 154 address=asdu_address, 155 ios=[iec103.IO( 156 address=io_address, 157 elements=[iec103.IoElement_GENERAL_COMMAND( 158 value=value, 159 return_identifier=return_identifier)])]) 160 161 data = self._encoder.encode_asdu(asdu) 162 163 try: 164 self._comm_log.log(common.CommLogAction.SEND, asdu) 165 166 self._send_command_req_id = return_identifier 167 self._send_command_future = asyncio.Future() 168 await self._conn.send(data) 169 return await self._send_command_future 170 171 finally: 172 self._send_command_req_id = None 173 self._send_command_future = None 174 175 async def interrogate_generic(self, asdu_address: common.AsduAddress): 176 async with self._interrogate_generic_lock: 177 if not self.is_open: 178 raise ConnectionError() 179 180 return_identifier = next(self._next_req_ids) 181 asdu = iec103.ASDU( 182 type=iec103.AsduType.GENERIC_COMMAND, 183 cause=iec103.Cause.GENERAL_INTERROGATION, 184 address=asdu_address, 185 ios=[iec103.IO( 186 address=iec103.IoAddress( 187 function_type=_FunctionType.GENERIC_FUNCTION_TYPE.value, # NOQA 188 information_number=_InformationNumber.GENERAL_INTERROGATION_OF_GENERIC_DATA.value), # NOQA 189 elements=[iec103.IoElement_GENERIC_COMMAND( 190 return_identifier=return_identifier, 191 data=[])])]) 192 193 data = self._encoder.encode_asdu(asdu) 194 195 try: 196 self._comm_log.log(common.CommLogAction.SEND, asdu) 197 198 self._interrogate_generic_req_id = return_identifier 199 self._interrogate_generic_future = asyncio.Future() 200 await self._conn.send(data) 201 await self._interrogate_generic_future 202 203 finally: 204 self._interrogate_generic_req_id = None 205 self._interrogate_generic_future = None 206 207 async def _receive_loop(self): 208 try: 209 while True: 210 data = await self._conn.receive() 211 asdu, _ = self._encoder.decode_asdu(data) 212 213 self._comm_log.log(common.CommLogAction.RECEIVE, asdu) 214 215 for io in asdu.ios: 216 if asdu.type in self._process_single_element_fns: 217 fn = self._process_single_element_fns[asdu.type] 218 for element in io.elements: 219 await fn(asdu.cause, asdu.address, io.address, 220 element) 221 222 elif asdu.type in self._process_multiple_elements_fns: 223 fn = self._process_multiple_elements_fns[asdu.type] 224 await fn(asdu.cause, asdu.address, io.address, 225 io.elements) 226 227 else: 228 raise ValueError('unsupported asdu type') 229 230 except Exception as e: 231 self._log.error("receive loop error: %s", e, exc_info=e) 232 233 finally: 234 self.close() 235 _try_set_exception(self._interrogate_future, ConnectionError()) 236 _try_set_exception(self._send_command_future, ConnectionError()) 237 _try_set_exception(self._interrogate_generic_future, 238 ConnectionError()) 239 240 async def _process_TIME_TAGGED_MESSAGE(self, cause, asdu_address, io_address, element): # NOQA 241 if cause == iec103.Cause.GENERAL_COMMAND: 242 if element.value.supplementary == self._send_command_req_id: 243 _try_set_result(self._send_command_future, True) 244 245 elif cause == iec103.Cause.GENERAL_COMMAND_NACK: 246 if element.value.supplementary == self._send_command_req_id: 247 _try_set_result(self._send_command_future, False) 248 249 else: 250 await _try_aio_call(self._data_cb, common.Data( 251 asdu_address=asdu_address, 252 io_address=io_address, 253 cause=_try_decode_cause(cause, common.DataCause), 254 value=element.value)) 255 256 async def _process_TIME_TAGGED_MESSAGE_WITH_RELATIVE_TIME(self, cause, asdu_address, io_address, element): # NOQA 257 await _try_aio_call(self._data_cb, common.Data( 258 asdu_address=asdu_address, 259 io_address=io_address, 260 cause=_try_decode_cause(cause, common.DataCause), 261 value=element.value)) 262 263 async def _process_MEASURANDS_1(self, cause, asdu_address, io_address, elements): # NOQA 264 value = common.MeasurandValues(values={}) 265 for i, element in enumerate(elements): 266 measurand_type = common.MeasurandType(( 267 iec103.AsduType.MEASURANDS_1.value, i)) 268 value.values[measurand_type] = element.value 269 270 await _try_aio_call(self._data_cb, common.Data( 271 asdu_address=asdu_address, 272 io_address=io_address, 273 cause=_try_decode_cause(cause, common.DataCause), 274 value=value)) 275 276 async def _process_TIME_TAGGED_MEASURANDS_WITH_RELATIVE_TIME(self, cause, asdu_address, io_address, element): # NOQA 277 await _try_aio_call(self._data_cb, common.Data( 278 asdu_address=asdu_address, 279 io_address=io_address, 280 cause=_try_decode_cause(cause, common.DataCause), 281 value=element.value)) 282 283 async def _process_IDENTIFICATION(self, cause, asdu_address, io_address, element): # NOQA 284 if self._log.isEnabledFor(logging.DEBUG): 285 self._log.debug("received device identification " 286 "(compatibility: %s; value: %s; software: %s)", 287 element.compatibility, 288 bytes(element.value), 289 bytes(element.software)) 290 291 async def _process_TIME_SYNCHRONIZATION(self, cause, asdu_address, io_address, element): # NOQA 292 self._log.info("received time sync response") 293 294 async def _process_GENERAL_INTERROGATION_TERMINATION(self, cause, asdu_address, io_address, element): # NOQA 295 if element.scan_number != self._interrogate_req_id: 296 return 297 _try_set_result(self._interrogate_future, None) 298 299 async def _process_MEASURANDS_2(self, cause, asdu_address, io_address, elements): # NOQA 300 value = common.MeasurandValues(values={}) 301 for i, element in enumerate(elements): 302 measurand_type = common.MeasurandType(( 303 iec103.AsduType.MEASURANDS_2.value, i)) 304 value.values[measurand_type] = element.value 305 306 await _try_aio_call(self._data_cb, common.Data( 307 asdu_address=asdu_address, 308 io_address=io_address, 309 cause=_try_decode_cause(cause, common.DataCause), 310 value=value)) 311 312 async def _process_GENERIC_DATA(self, cause, asdu_address, io_address, element): # NOQA 313 if cause == iec103.Cause.TERMINATION_OF_GENERAL_INTERROGATION: # NOQA 314 if element.return_identifier == self._interrogate_generic_req_id: 315 _try_set_result(self._interrogate_generic_future, None) 316 317 else: 318 data_cause = _try_decode_cause(cause, common.GenericDataCause) 319 for identification, data in element.data: 320 await _try_aio_call(self._generic_data_cb, common.GenericData( 321 asdu_address=asdu_address, 322 io_address=io_address, 323 cause=data_cause, 324 identification=identification, 325 description=data.description, 326 value=data.value)) 327 328 async def _process_GENERIC_IDENTIFICATION(self, cause, asdu_address, io_address, element): # NOQA 329 pass 330 331 async def _process_LIST_OF_RECORDED_DISTURBANCES(self, cause, asdu_address, io_address, elements): # NOQA 332 pass 333 334 async def _process_READY_FOR_TRANSMISSION_OF_DISTURBANCE_DATA(self, cause, asdu_address, io_address, element): # NOQA 335 pass 336 337 async def _process_READY_FOR_TRANSMISSION_OF_A_CHANNEL(self, cause, asdu_address, io_address, element): # NOQA 338 pass 339 340 async def _process_READY_FOR_TRANSMISSION_OF_TAGS(self, cause, asdu_address, io_address, element): # NOQA 341 pass 342 343 async def _process_TRANSMISSION_OF_TAGS(self, cause, asdu_address, io_address, element): # NOQA 344 pass 345 346 async def _process_TRANSMISSION_OF_DISTURBANCE_VALUES(self, cause, asdu_address, io_address, element): # NOQA 347 pass 348 349 async def _process_END_OF_TRANSMISSION(self, cause, asdu_address, io_address, element): # NOQA 350 pass
Resource with lifetime control based on Group.
27 def __init__(self, 28 conn: link.Connection, 29 *, 30 data_cb: DataCb | None = None, 31 generic_data_cb: GenericDataCb | None = None): 32 self._conn = conn 33 self._data_cb = data_cb 34 self._generic_data_cb = generic_data_cb 35 36 self._log = logger.create_logger(mlog, conn.info) 37 self._comm_log = logger.CommunicationLogger(mlog, conn.info) 38 39 self._encoder = iec103.Encoder() 40 41 self._interrogate_lock = asyncio.Lock() 42 self._interrogate_req_id = None 43 self._interrogate_future = None 44 45 self._send_command_lock = asyncio.Lock() 46 self._send_command_req_id = None 47 self._send_command_future = None 48 49 self._interrogate_generic_lock = asyncio.Lock() 50 self._interrogate_generic_req_id = None 51 self._interrogate_generic_future = None 52 53 self._next_req_ids = (i % 0x100 for i in itertools.count(0)) 54 55 self._process_single_element_fns = { 56 iec103.AsduType.TIME_TAGGED_MESSAGE: self._process_TIME_TAGGED_MESSAGE, # NOQA 57 iec103.AsduType.TIME_TAGGED_MESSAGE_WITH_RELATIVE_TIME: self._process_TIME_TAGGED_MESSAGE_WITH_RELATIVE_TIME, # NOQA 58 iec103.AsduType.TIME_TAGGED_MEASURANDS_WITH_RELATIVE_TIME: self._process_TIME_TAGGED_MEASURANDS_WITH_RELATIVE_TIME, # NOQA 59 iec103.AsduType.IDENTIFICATION: self._process_IDENTIFICATION, 60 iec103.AsduType.TIME_SYNCHRONIZATION: self._process_TIME_SYNCHRONIZATION, # NOQA 61 iec103.AsduType.GENERAL_INTERROGATION_TERMINATION: self._process_GENERAL_INTERROGATION_TERMINATION, # NOQA 62 iec103.AsduType.GENERIC_DATA: self._process_GENERIC_DATA, 63 iec103.AsduType.GENERIC_IDENTIFICATION: self._process_GENERIC_IDENTIFICATION, # NOQA 64 iec103.AsduType.READY_FOR_TRANSMISSION_OF_DISTURBANCE_DATA: self._process_READY_FOR_TRANSMISSION_OF_DISTURBANCE_DATA, # NOQA 65 iec103.AsduType.READY_FOR_TRANSMISSION_OF_A_CHANNEL: self._process_READY_FOR_TRANSMISSION_OF_A_CHANNEL, # NOQA 66 iec103.AsduType.READY_FOR_TRANSMISSION_OF_TAGS: self._process_READY_FOR_TRANSMISSION_OF_TAGS, # NOQA 67 iec103.AsduType.TRANSMISSION_OF_TAGS: self._process_TRANSMISSION_OF_TAGS, # NOQA 68 iec103.AsduType.TRANSMISSION_OF_DISTURBANCE_VALUES: self._process_TRANSMISSION_OF_DISTURBANCE_VALUES, # NOQA 69 iec103.AsduType.END_OF_TRANSMISSION: self._process_END_OF_TRANSMISSION} # NOQA 70 self._process_multiple_elements_fns = { 71 iec103.AsduType.MEASURANDS_1: self._process_MEASURANDS_1, 72 iec103.AsduType.MEASURANDS_2: self._process_MEASURANDS_2, 73 iec103.AsduType.LIST_OF_RECORDED_DISTURBANCES: self._process_LIST_OF_RECORDED_DISTURBANCES} # NOQA 74 75 self.async_group.spawn(self._receive_loop) 76 77 self.async_group.spawn(aio.call_on_cancel, self._comm_log.log, 78 common.CommLogAction.CLOSE) 79 self._comm_log.log(common.CommLogAction.OPEN)
85 async def time_sync(self, 86 time: common.Time | None = None, 87 asdu_address: common.AsduAddress = 0xFF): 88 if not self.is_open: 89 raise ConnectionError() 90 91 time = time or common.time_from_datetime(datetime.datetime.now()) 92 io_address = common.IoAddress( 93 _FunctionType.GLOBAL_FUNCTION_TYPE.value, 94 _InformationNumber.GENERAL_INTERROGATION_OR_TIME_SYNCHRONIZATION.value) # NOQA 95 asdu = iec103.ASDU( 96 type=iec103.AsduType.TIME_SYNCHRONIZATION, 97 cause=iec103.Cause.TIME_SYNCHRONIZATION, 98 address=asdu_address, 99 ios=[iec103.IO( 100 address=io_address, 101 elements=[iec103.IoElement_TIME_SYNCHRONIZATION( 102 time=time)])]) 103 104 data = self._encoder.encode_asdu(asdu) 105 106 self._comm_log.log(common.CommLogAction.SEND, asdu) 107 108 await self._conn.send(data)
110 async def interrogate(self, asdu_address: common.AsduAddress): 111 async with self._interrogate_lock: 112 if not self.is_open: 113 raise ConnectionError() 114 115 scan_number = next(self._next_req_ids) 116 asdu = iec103.ASDU( 117 type=iec103.AsduType.GENERAL_INTERROGATION, 118 cause=iec103.Cause.GENERAL_INTERROGATION, 119 address=asdu_address, 120 ios=[iec103.IO( 121 address=iec103.IoAddress( 122 function_type=_FunctionType.GLOBAL_FUNCTION_TYPE.value, # NOQA 123 information_number=_InformationNumber.GENERAL_INTERROGATION_OR_TIME_SYNCHRONIZATION.value), # NOQA 124 elements=[iec103.IoElement_GENERAL_INTERROGATION( # NOQA 125 scan_number=scan_number)])]) 126 127 data = self._encoder.encode_asdu(asdu) 128 129 try: 130 self._comm_log.log(common.CommLogAction.SEND, asdu) 131 132 self._interrogate_req_id = scan_number 133 self._interrogate_future = asyncio.Future() 134 await self._conn.send(data) 135 await self._interrogate_future 136 137 finally: 138 self._interrogate_req_id = None 139 self._interrogate_future = None
141 async def send_command(self, 142 asdu_address: common.AsduAddress, 143 io_address: common.IoAddress, 144 value: common.DoubleValue 145 ) -> bool: 146 async with self._send_command_lock: 147 if not self.is_open: 148 raise ConnectionError() 149 150 return_identifier = next(self._next_req_ids) 151 asdu = iec103.ASDU( 152 type=iec103.AsduType.GENERAL_COMMAND, 153 cause=iec103.Cause.GENERAL_COMMAND, 154 address=asdu_address, 155 ios=[iec103.IO( 156 address=io_address, 157 elements=[iec103.IoElement_GENERAL_COMMAND( 158 value=value, 159 return_identifier=return_identifier)])]) 160 161 data = self._encoder.encode_asdu(asdu) 162 163 try: 164 self._comm_log.log(common.CommLogAction.SEND, asdu) 165 166 self._send_command_req_id = return_identifier 167 self._send_command_future = asyncio.Future() 168 await self._conn.send(data) 169 return await self._send_command_future 170 171 finally: 172 self._send_command_req_id = None 173 self._send_command_future = None
175 async def interrogate_generic(self, asdu_address: common.AsduAddress): 176 async with self._interrogate_generic_lock: 177 if not self.is_open: 178 raise ConnectionError() 179 180 return_identifier = next(self._next_req_ids) 181 asdu = iec103.ASDU( 182 type=iec103.AsduType.GENERIC_COMMAND, 183 cause=iec103.Cause.GENERAL_INTERROGATION, 184 address=asdu_address, 185 ios=[iec103.IO( 186 address=iec103.IoAddress( 187 function_type=_FunctionType.GENERIC_FUNCTION_TYPE.value, # NOQA 188 information_number=_InformationNumber.GENERAL_INTERROGATION_OF_GENERIC_DATA.value), # NOQA 189 elements=[iec103.IoElement_GENERIC_COMMAND( 190 return_identifier=return_identifier, 191 data=[])])]) 192 193 data = self._encoder.encode_asdu(asdu) 194 195 try: 196 self._comm_log.log(common.CommLogAction.SEND, asdu) 197 198 self._interrogate_generic_req_id = return_identifier 199 self._interrogate_generic_future = asyncio.Future() 200 await self._conn.send(data) 201 await self._interrogate_generic_future 202 203 finally: 204 self._interrogate_generic_req_id = None 205 self._interrogate_generic_future = None