hat.drivers.iec103
IEC 60870-5-103 communication protocol
1"""IEC 60870-5-103 communication protocol""" 2 3from hat.drivers.iec103.common import (Description, 4 IoAddress, 5 Identification, 6 TimeSize, 7 Time, 8 ValueType, 9 NoneValue, 10 TextValue, 11 BitstringValue, 12 UIntValue, 13 IntValue, 14 UFixedValue, 15 FixedValue, 16 Real32Value, 17 Real64Value, 18 DoubleValue, 19 SingleValue, 20 ExtendedDoubleValue, 21 MeasurandValue, 22 TimeValue, 23 IdentificationValue, 24 RelativeTimeValue, 25 IoAddressValue, 26 DoubleWithTimeValue, 27 DoubleWithRelativeTimeValue, 28 MeasurandWithRelativeTimeValue, 29 TextNumberValue, 30 ReplyValue, 31 ArrayValue, 32 IndexValue, 33 Value, 34 AsduAddress, 35 DataCause, 36 GenericDataCause, 37 MeasurandType, 38 MeasurandValues, 39 Data, 40 GenericData, 41 time_from_datetime, 42 time_to_datetime) 43from hat.drivers.iec103.master import (DataCb, 44 GenericDataCb, 45 MasterConnection) 46 47 48__all__ = ['Description', 49 'IoAddress', 50 'Identification', 51 'TimeSize', 52 'Time', 53 'ValueType', 54 'NoneValue', 55 'TextValue', 56 'BitstringValue', 57 'UIntValue', 58 'IntValue', 59 'UFixedValue', 60 'FixedValue', 61 'Real32Value', 62 'Real64Value', 63 'DoubleValue', 64 'SingleValue', 65 'ExtendedDoubleValue', 66 'MeasurandValue', 67 'TimeValue', 68 'IdentificationValue', 69 'RelativeTimeValue', 70 'IoAddressValue', 71 'DoubleWithTimeValue', 72 'DoubleWithRelativeTimeValue', 73 'MeasurandWithRelativeTimeValue', 74 'TextNumberValue', 75 'ReplyValue', 76 'ArrayValue', 77 'IndexValue', 78 'Value', 79 'AsduAddress', 80 'DataCause', 81 'GenericDataCause', 82 'MeasurandType', 83 'MeasurandValues', 84 'Data', 85 'GenericData', 86 'time_from_datetime', 87 'time_to_datetime', 88 'DataCb', 89 'GenericDataCb', 90 'MasterConnection']
67class Description(enum.Enum): 68 NOT_SPECIFIED = 0 69 ACTUAL_VALUE = 1 70 DEFAULT_VALUE = 2 71 RANGE = 3 72 PRECISION = 5 73 FACTOR = 6 74 REFERENCE = 7 75 ENUMERATION = 8 76 DIMENSION = 9 77 DESCRIPTION = 10 78 PASSWORD = 12 79 READ_ONLY = 13 80 WRITE_ONLY = 14 81 IO_ADDRESS = 19 82 EVENT = 20 83 TEXT_ARRAY = 21 84 VALUE_ARRAY = 22 85 RELATED = 23
126class IoAddress(typing.NamedTuple): 127 function_type: int 128 """function_type is in range [0, 255]""" 129 information_number: int 130 """information_number is in range [0, 255]"""
IoAddress(function_type, information_number)
133class Identification(typing.NamedTuple): 134 group_id: int 135 """group_id in range [0, 255]""" 136 entry_id: int 137 """entry_id in range [0, 255]"""
Identification(group_id, entry_id)
35class Time(typing.NamedTuple): 36 size: TimeSize 37 milliseconds: int 38 """milliseconds in range [0, 59999]""" 39 invalid: bool | None 40 """available for size THREE, FOUR, SEVEN""" 41 minutes: int | None 42 """available for size THREE, FOUR, SEVEN (minutes in range [0, 59])""" 43 summer_time: bool | None 44 """available for size FOUR, SEVEN""" 45 hours: int | None 46 """available for size FOUR, SEVEN (hours in range [0, 23])""" 47 day_of_week: int | None 48 """available for size SEVEN (day_of_week in range [1, 7])""" 49 day_of_month: int | None 50 """available for size SEVEN (day_of_month in range [1, 31])""" 51 months: int | None 52 """available for size SEVEN (months in range [1, 12])""" 53 years: int | None 54 """available for size SEVEN (years in range [0, 99])"""
Time(size, milliseconds, invalid, minutes, summer_time, hours, day_of_week, day_of_month, months, years)
Create new instance of Time(size, milliseconds, invalid, minutes, summer_time, hours, day_of_week, day_of_month, months, years)
140class ValueType(enum.Enum): 141 NONE = 0 142 TEXT = 1 143 BITSTRING = 2 144 UINT = 3 145 INT = 4 146 UFIXED = 5 147 FIXED = 6 148 REAL32 = 7 149 REAL64 = 8 150 DOUBLE = 9 151 SINGLE = 10 152 EXTENDED_DOUBLE = 11 153 MEASURAND = 12 154 TIME = 14 155 IDENTIFICATION = 15 156 RELATIVE_TIME = 16 157 IO_ADDRESS = 17 158 DOUBLE_WITH_TIME = 18 159 DOUBLE_WITH_RELATIVE_TIME = 19 160 MEASURAND_WITH_RELATIVE_TIME = 20 161 TEXT_NUMBER = 21 162 REPLY = 22 163 ARRAY = 23 164 INDEX = 24
NoneValue()
TextValue(value,)
BitstringValue(value,)
UIntValue(value,)
IntValue(value,)
UFixedValue(value,)
FixedValue(value,)
Real32Value(value,)
Real64Value(value,)
224class MeasurandValue(typing.NamedTuple): 225 overflow: bool 226 invalid: bool 227 value: float 228 """value in range [-1.0, 1.0)"""
MeasurandValue(overflow, invalid, value)
TimeValue(value,)
IdentificationValue(value,)
RelativeTimeValue(value,)
IoAddressValue(value,)
249class DoubleWithTimeValue(typing.NamedTuple): 250 value: DoubleValue 251 time: Time 252 """time size is FOUR""" 253 supplementary: int 254 """supplementary in range [0, 255]"""
DoubleWithTimeValue(value, time, supplementary)
Create new instance of DoubleWithTimeValue(value, time, supplementary)
257class DoubleWithRelativeTimeValue(typing.NamedTuple): 258 value: DoubleValue 259 relative_time: int 260 """relative_time in range [0, 65535]""" 261 fault_number: int 262 """fault_number in range [0, 65535]""" 263 time: Time 264 """time size is FOUR""" 265 supplementary: int 266 """supplementary in range [0, 255]"""
DoubleWithRelativeTimeValue(value, relative_time, fault_number, time, supplementary)
Create new instance of DoubleWithRelativeTimeValue(value, relative_time, fault_number, time, supplementary)
269class MeasurandWithRelativeTimeValue(typing.NamedTuple): 270 value: float 271 relative_time: int 272 """relative_time in range [0, 65535]""" 273 fault_number: int 274 """fault_number in range [0, 65535]""" 275 time: Time 276 """time size is FOUR"""
MeasurandWithRelativeTimeValue(value, relative_time, fault_number, time)
Create new instance of MeasurandWithRelativeTimeValue(value, relative_time, fault_number, time)
TextNumberValue(value,)
283class ReplyValue(enum.Enum): 284 ACK = 0 285 INVALID_IDENTIFICATION = 1 286 DATA_NOT_EXISTS = 2 287 DATA_NOT_AVAILABLE = 3 288 VERIFY_ERROR = 4 289 OUT_OF_RANGE = 5 290 ENTRY_TO_LARGE = 6 291 TOO_MANY_COMMANDS = 7 292 ENTRY_READ_ONLY = 8 293 PASSWORD_PROTECTED = 9 294 IN_PROGRESS = 10 295 FOLLOWING_DESCRIPTION = 11
298class ArrayValue(typing.NamedTuple): 299 value_type: ValueType 300 more_follows: bool 301 values: typing.List['Value']
ArrayValue(value_type, more_follows, values)
Create new instance of ArrayValue(value_type, more_follows, values)
Alias for field number 2
IndexValue(value,)
48class DataCause(enum.Enum): 49 SPONTANEOUS = iec103.Cause.SPONTANEOUS.value 50 CYCLIC = iec103.Cause.CYCLIC.value 51 TEST_MODE = iec103.Cause.TEST_MODE.value 52 GENERAL_INTERROGATION = iec103.Cause.GENERAL_INTERROGATION.value 53 LOCAL_OPERATION = iec103.Cause.LOCAL_OPERATION.value 54 REMOTE_OPERATION = iec103.Cause.REMOTE_OPERATION.value
57class GenericDataCause(enum.Enum): 58 SPONTANEOUS = iec103.Cause.SPONTANEOUS.value 59 CYCLIC = iec103.Cause.CYCLIC.value 60 TEST_MODE = iec103.Cause.TEST_MODE.value 61 GENERAL_INTERROGATION = iec103.Cause.GENERAL_INTERROGATION.value 62 LOCAL_OPERATION = iec103.Cause.LOCAL_OPERATION.value 63 REMOTE_OPERATION = iec103.Cause.REMOTE_OPERATION.value 64 WRITE_ACK = iec103.Cause.GENERIC_WRITE_COMMAND.value 65 WRITE_NACK = iec103.Cause.GENERIC_WRITE_COMMAND_NACK.value 66 READ_ACK = iec103.Cause.GENERIC_READ_COMMAND.value 67 READ_NACK = iec103.Cause.GENERIC_READ_COMMAND_NACK.value 68 WRITE_CONFIRMATION = iec103.Cause.GENERIC_WRITE_CONFIRMATION.value
71class MeasurandType(enum.Enum): 72 M1_I_L2 = (iec103.AsduType.MEASURANDS_1.value, 0) 73 M1_U_L12 = (iec103.AsduType.MEASURANDS_1.value, 1) 74 M1_P = (iec103.AsduType.MEASURANDS_1.value, 2) 75 M1_Q = (iec103.AsduType.MEASURANDS_1.value, 3) 76 M2_I_L1 = (iec103.AsduType.MEASURANDS_2.value, 0) 77 M2_I_L2 = (iec103.AsduType.MEASURANDS_2.value, 1) 78 M2_I_L3 = (iec103.AsduType.MEASURANDS_2.value, 2) 79 M2_U_L1E = (iec103.AsduType.MEASURANDS_2.value, 3) 80 M2_U_L2E = (iec103.AsduType.MEASURANDS_2.value, 4) 81 M2_U_L3E = (iec103.AsduType.MEASURANDS_2.value, 5) 82 M2_P = (iec103.AsduType.MEASURANDS_2.value, 6) 83 M2_Q = (iec103.AsduType.MEASURANDS_2.value, 7) 84 M2_F = (iec103.AsduType.MEASURANDS_2.value, 8)
MeasurandValues(values,)
Create new instance of MeasurandValues(values,)
91class Data(typing.NamedTuple): 92 asdu_address: AsduAddress 93 io_address: IoAddress 94 cause: DataCause | OtherCause 95 value: (DoubleWithTimeValue | 96 DoubleWithRelativeTimeValue | 97 MeasurandValues | 98 MeasurandWithRelativeTimeValue)
Data(asdu_address, io_address, cause, value)
Create new instance of Data(asdu_address, io_address, cause, value)
Alias for field number 3
101class GenericData(typing.NamedTuple): 102 asdu_address: AsduAddress 103 io_address: IoAddress 104 cause: GenericDataCause | OtherCause 105 identification: Identification 106 description: Description 107 value: ArrayValue
GenericData(asdu_address, io_address, cause, identification, description, value)
Create new instance of GenericData(asdu_address, io_address, cause, identification, description, value)
70def time_from_datetime(dt: datetime.datetime, 71 invalid: bool = False 72 ) -> Time: 73 """Create Time from datetime.datetime""" 74 # TODO document edge cases (local time, os implementation, ...) 75 # rounding microseconds to the nearest millisecond 76 dt_rounded = ( 77 dt.replace(microsecond=0) + 78 datetime.timedelta(milliseconds=round(dt.microsecond / 1000))) 79 local_time = time.localtime(dt_rounded.timestamp()) 80 81 return Time( 82 size=TimeSize.SEVEN, 83 milliseconds=(local_time.tm_sec * 1000 + 84 dt_rounded.microsecond // 1000), 85 invalid=invalid, 86 minutes=local_time.tm_min, 87 summer_time=bool(local_time.tm_isdst), 88 hours=local_time.tm_hour, 89 day_of_week=local_time.tm_wday + 1, 90 day_of_month=local_time.tm_mday, 91 months=local_time.tm_mon, 92 years=local_time.tm_year % 100)
Create Time from datetime.datetime
95def time_to_datetime(t: Time 96 ) -> datetime.datetime: 97 """Convert Time to datetime.datetime""" 98 # TODO document edge cases (local time, os implementation, ...) 99 # TODO support TimeSize.FOUR 100 if t.size == TimeSize.TWO: 101 local_now = datetime.datetime.now() 102 local_dt = local_now.replace( 103 second=int(t.milliseconds / 1000), 104 microsecond=(t.milliseconds % 1000) * 1000) 105 106 local_seconds = local_now.second + local_now.microsecond / 1_000_000 107 t_seconds = t.milliseconds / 1_000 108 109 if abs(local_seconds - t_seconds) > 30: 110 if local_seconds < t_seconds: 111 local_dt = local_dt - datetime.timedelta(minutes=1) 112 113 else: 114 local_dt = local_dt + datetime.timedelta(minutes=1) 115 116 elif t.size == TimeSize.THREE: 117 local_now = datetime.datetime.now() 118 local_dt = local_now.replace( 119 minute=t.minutes, 120 second=int(t.milliseconds / 1000), 121 microsecond=(t.milliseconds % 1000) * 1000) 122 123 local_minutes = (local_now.minute + 124 local_now.second / 60 + 125 local_now.microsecond / 60_000_000) 126 t_minutes = t.minutes + t.milliseconds / 60_000 127 128 if abs(local_minutes - t_minutes) > 30: 129 if local_minutes < t_minutes: 130 local_dt = local_dt - datetime.timedelta(hours=1) 131 132 else: 133 local_dt = local_dt + datetime.timedelta(hours=1) 134 135 elif t.size == TimeSize.SEVEN: 136 local_dt = datetime.datetime( 137 year=2000 + t.years if t.years < 70 else 1900 + t.years, 138 month=t.months, 139 day=t.day_of_month, 140 hour=t.hours, 141 minute=t.minutes, 142 second=int(t.milliseconds / 1000), 143 microsecond=(t.milliseconds % 1000) * 1000, 144 fold=not t.summer_time) 145 146 else: 147 raise ValueError('unsupported time size') 148 149 return local_dt.astimezone(tz=datetime.timezone.utc)
Convert Time to datetime.datetime
25class MasterConnection(aio.Resource): 26 27 def __init__(self, 28 conn: link.Connection, 29 *, 30 data_cb: DataCb | None = None, 31 generic_data_cb: GenericDataCb | None = None): 32 self._conn = conn 33 self._data_cb = data_cb 34 self._generic_data_cb = generic_data_cb 35 36 self._log = logger.create_logger(mlog, conn.info) 37 self._comm_log = logger.CommunicationLogger(mlog, conn.info) 38 39 self._encoder = iec103.Encoder() 40 41 self._interrogate_lock = asyncio.Lock() 42 self._interrogate_req_id = None 43 self._interrogate_future = None 44 45 self._send_command_lock = asyncio.Lock() 46 self._send_command_req_id = None 47 self._send_command_future = None 48 49 self._interrogate_generic_lock = asyncio.Lock() 50 self._interrogate_generic_req_id = None 51 self._interrogate_generic_future = None 52 53 self._next_req_ids = (i % 0x100 for i in itertools.count(0)) 54 55 self._process_single_element_fns = { 56 iec103.AsduType.TIME_TAGGED_MESSAGE: self._process_TIME_TAGGED_MESSAGE, # NOQA 57 iec103.AsduType.TIME_TAGGED_MESSAGE_WITH_RELATIVE_TIME: self._process_TIME_TAGGED_MESSAGE_WITH_RELATIVE_TIME, # NOQA 58 iec103.AsduType.TIME_TAGGED_MEASURANDS_WITH_RELATIVE_TIME: self._process_TIME_TAGGED_MEASURANDS_WITH_RELATIVE_TIME, # NOQA 59 iec103.AsduType.IDENTIFICATION: self._process_IDENTIFICATION, 60 iec103.AsduType.TIME_SYNCHRONIZATION: self._process_TIME_SYNCHRONIZATION, # NOQA 61 iec103.AsduType.GENERAL_INTERROGATION_TERMINATION: self._process_GENERAL_INTERROGATION_TERMINATION, # NOQA 62 iec103.AsduType.GENERIC_DATA: self._process_GENERIC_DATA, 63 iec103.AsduType.GENERIC_IDENTIFICATION: self._process_GENERIC_IDENTIFICATION, # NOQA 64 iec103.AsduType.READY_FOR_TRANSMISSION_OF_DISTURBANCE_DATA: self._process_READY_FOR_TRANSMISSION_OF_DISTURBANCE_DATA, # NOQA 65 iec103.AsduType.READY_FOR_TRANSMISSION_OF_A_CHANNEL: self._process_READY_FOR_TRANSMISSION_OF_A_CHANNEL, # NOQA 66 iec103.AsduType.READY_FOR_TRANSMISSION_OF_TAGS: self._process_READY_FOR_TRANSMISSION_OF_TAGS, # NOQA 67 iec103.AsduType.TRANSMISSION_OF_TAGS: self._process_TRANSMISSION_OF_TAGS, # NOQA 68 iec103.AsduType.TRANSMISSION_OF_DISTURBANCE_VALUES: self._process_TRANSMISSION_OF_DISTURBANCE_VALUES, # NOQA 69 iec103.AsduType.END_OF_TRANSMISSION: self._process_END_OF_TRANSMISSION} # NOQA 70 self._process_multiple_elements_fns = { 71 iec103.AsduType.MEASURANDS_1: self._process_MEASURANDS_1, 72 iec103.AsduType.MEASURANDS_2: self._process_MEASURANDS_2, 73 iec103.AsduType.LIST_OF_RECORDED_DISTURBANCES: self._process_LIST_OF_RECORDED_DISTURBANCES} # NOQA 74 75 self.async_group.spawn(self._receive_loop) 76 77 self.async_group.spawn(aio.call_on_cancel, self._comm_log.log, 78 common.CommLogAction.CLOSE) 79 self._comm_log.log(common.CommLogAction.OPEN) 80 81 @property 82 def async_group(self): 83 return self._conn.async_group 84 85 async def time_sync(self, 86 time: common.Time | None = None, 87 asdu_address: common.AsduAddress = 0xFF): 88 if not self.is_open: 89 raise ConnectionError() 90 91 time = time or common.time_from_datetime(datetime.datetime.now()) 92 io_address = common.IoAddress( 93 _FunctionType.GLOBAL_FUNCTION_TYPE.value, 94 _InformationNumber.GENERAL_INTERROGATION_OR_TIME_SYNCHRONIZATION.value) # NOQA 95 asdu = iec103.ASDU( 96 type=iec103.AsduType.TIME_SYNCHRONIZATION, 97 cause=iec103.Cause.TIME_SYNCHRONIZATION, 98 address=asdu_address, 99 ios=[iec103.IO( 100 address=io_address, 101 elements=[iec103.IoElement_TIME_SYNCHRONIZATION( 102 time=time)])]) 103 104 data = self._encoder.encode_asdu(asdu) 105 106 self._comm_log.log(common.CommLogAction.SEND, asdu) 107 108 await self._conn.send(data) 109 110 async def interrogate(self, asdu_address: common.AsduAddress): 111 async with self._interrogate_lock: 112 if not self.is_open: 113 raise ConnectionError() 114 115 scan_number = next(self._next_req_ids) 116 asdu = iec103.ASDU( 117 type=iec103.AsduType.GENERAL_INTERROGATION, 118 cause=iec103.Cause.GENERAL_INTERROGATION, 119 address=asdu_address, 120 ios=[iec103.IO( 121 address=iec103.IoAddress( 122 function_type=_FunctionType.GLOBAL_FUNCTION_TYPE.value, # NOQA 123 information_number=_InformationNumber.GENERAL_INTERROGATION_OR_TIME_SYNCHRONIZATION.value), # NOQA 124 elements=[iec103.IoElement_GENERAL_INTERROGATION( # NOQA 125 scan_number=scan_number)])]) 126 127 data = self._encoder.encode_asdu(asdu) 128 129 try: 130 self._comm_log.log(common.CommLogAction.SEND, asdu) 131 132 self._interrogate_req_id = scan_number 133 self._interrogate_future = asyncio.Future() 134 await self._conn.send(data) 135 await self._interrogate_future 136 137 finally: 138 self._interrogate_req_id = None 139 self._interrogate_future = None 140 141 async def send_command(self, 142 asdu_address: common.AsduAddress, 143 io_address: common.IoAddress, 144 value: common.DoubleValue 145 ) -> bool: 146 async with self._send_command_lock: 147 if not self.is_open: 148 raise ConnectionError() 149 150 return_identifier = next(self._next_req_ids) 151 asdu = iec103.ASDU( 152 type=iec103.AsduType.GENERAL_COMMAND, 153 cause=iec103.Cause.GENERAL_COMMAND, 154 address=asdu_address, 155 ios=[iec103.IO( 156 address=io_address, 157 elements=[iec103.IoElement_GENERAL_COMMAND( 158 value=value, 159 return_identifier=return_identifier)])]) 160 161 data = self._encoder.encode_asdu(asdu) 162 163 try: 164 self._comm_log.log(common.CommLogAction.SEND, asdu) 165 166 self._send_command_req_id = return_identifier 167 self._send_command_future = asyncio.Future() 168 await self._conn.send(data) 169 return await self._send_command_future 170 171 finally: 172 self._send_command_req_id = None 173 self._send_command_future = None 174 175 async def interrogate_generic(self, asdu_address: common.AsduAddress): 176 async with self._interrogate_generic_lock: 177 if not self.is_open: 178 raise ConnectionError() 179 180 return_identifier = next(self._next_req_ids) 181 asdu = iec103.ASDU( 182 type=iec103.AsduType.GENERIC_COMMAND, 183 cause=iec103.Cause.GENERAL_INTERROGATION, 184 address=asdu_address, 185 ios=[iec103.IO( 186 address=iec103.IoAddress( 187 function_type=_FunctionType.GENERIC_FUNCTION_TYPE.value, # NOQA 188 information_number=_InformationNumber.GENERAL_INTERROGATION_OF_GENERIC_DATA.value), # NOQA 189 elements=[iec103.IoElement_GENERIC_COMMAND( 190 return_identifier=return_identifier, 191 data=[])])]) 192 193 data = self._encoder.encode_asdu(asdu) 194 195 try: 196 self._comm_log.log(common.CommLogAction.SEND, asdu) 197 198 self._interrogate_generic_req_id = return_identifier 199 self._interrogate_generic_future = asyncio.Future() 200 await self._conn.send(data) 201 await self._interrogate_generic_future 202 203 finally: 204 self._interrogate_generic_req_id = None 205 self._interrogate_generic_future = None 206 207 async def _receive_loop(self): 208 try: 209 while True: 210 data = await self._conn.receive() 211 try: 212 asdu, _ = self._encoder.decode_asdu(data) 213 214 except common.AsduTypeError as e: 215 self._log.warning("asdu type error: %s", e) 216 continue 217 218 self._comm_log.log(common.CommLogAction.RECEIVE, asdu) 219 220 for io in asdu.ios: 221 if asdu.type in self._process_single_element_fns: 222 fn = self._process_single_element_fns[asdu.type] 223 for element in io.elements: 224 await fn(asdu.cause, asdu.address, io.address, 225 element) 226 227 elif asdu.type in self._process_multiple_elements_fns: 228 fn = self._process_multiple_elements_fns[asdu.type] 229 await fn(asdu.cause, asdu.address, io.address, 230 io.elements) 231 232 else: 233 raise ValueError('unsupported asdu type') 234 235 except Exception as e: 236 self._log.error("receive loop error: %s", e, exc_info=e) 237 238 finally: 239 self.close() 240 _try_set_exception(self._interrogate_future, ConnectionError()) 241 _try_set_exception(self._send_command_future, ConnectionError()) 242 _try_set_exception(self._interrogate_generic_future, 243 ConnectionError()) 244 245 async def _process_TIME_TAGGED_MESSAGE(self, cause, asdu_address, io_address, element): # NOQA 246 if cause == iec103.Cause.GENERAL_COMMAND: 247 if element.value.supplementary == self._send_command_req_id: 248 _try_set_result(self._send_command_future, True) 249 250 elif cause == iec103.Cause.GENERAL_COMMAND_NACK: 251 if element.value.supplementary == self._send_command_req_id: 252 _try_set_result(self._send_command_future, False) 253 254 else: 255 await _try_aio_call(self._data_cb, common.Data( 256 asdu_address=asdu_address, 257 io_address=io_address, 258 cause=_try_decode_cause(cause, common.DataCause), 259 value=element.value)) 260 261 async def _process_TIME_TAGGED_MESSAGE_WITH_RELATIVE_TIME(self, cause, asdu_address, io_address, element): # NOQA 262 await _try_aio_call(self._data_cb, common.Data( 263 asdu_address=asdu_address, 264 io_address=io_address, 265 cause=_try_decode_cause(cause, common.DataCause), 266 value=element.value)) 267 268 async def _process_MEASURANDS_1(self, cause, asdu_address, io_address, elements): # NOQA 269 value = common.MeasurandValues(values={}) 270 for i, element in enumerate(elements): 271 measurand_type = common.MeasurandType(( 272 iec103.AsduType.MEASURANDS_1.value, i)) 273 value.values[measurand_type] = element.value 274 275 await _try_aio_call(self._data_cb, common.Data( 276 asdu_address=asdu_address, 277 io_address=io_address, 278 cause=_try_decode_cause(cause, common.DataCause), 279 value=value)) 280 281 async def _process_TIME_TAGGED_MEASURANDS_WITH_RELATIVE_TIME(self, cause, asdu_address, io_address, element): # NOQA 282 await _try_aio_call(self._data_cb, common.Data( 283 asdu_address=asdu_address, 284 io_address=io_address, 285 cause=_try_decode_cause(cause, common.DataCause), 286 value=element.value)) 287 288 async def _process_IDENTIFICATION(self, cause, asdu_address, io_address, element): # NOQA 289 if self._log.isEnabledFor(logging.DEBUG): 290 self._log.debug("received device identification " 291 "(compatibility: %s; value: %s; software: %s)", 292 element.compatibility, 293 bytes(element.value), 294 bytes(element.software)) 295 296 async def _process_TIME_SYNCHRONIZATION(self, cause, asdu_address, io_address, element): # NOQA 297 self._log.info("received time sync response") 298 299 async def _process_GENERAL_INTERROGATION_TERMINATION(self, cause, asdu_address, io_address, element): # NOQA 300 if element.scan_number != self._interrogate_req_id: 301 return 302 _try_set_result(self._interrogate_future, None) 303 304 async def _process_MEASURANDS_2(self, cause, asdu_address, io_address, elements): # NOQA 305 value = common.MeasurandValues(values={}) 306 for i, element in enumerate(elements): 307 measurand_type = common.MeasurandType(( 308 iec103.AsduType.MEASURANDS_2.value, i)) 309 value.values[measurand_type] = element.value 310 311 await _try_aio_call(self._data_cb, common.Data( 312 asdu_address=asdu_address, 313 io_address=io_address, 314 cause=_try_decode_cause(cause, common.DataCause), 315 value=value)) 316 317 async def _process_GENERIC_DATA(self, cause, asdu_address, io_address, element): # NOQA 318 if cause == iec103.Cause.TERMINATION_OF_GENERAL_INTERROGATION: # NOQA 319 if element.return_identifier == self._interrogate_generic_req_id: 320 _try_set_result(self._interrogate_generic_future, None) 321 322 else: 323 data_cause = _try_decode_cause(cause, common.GenericDataCause) 324 for identification, data in element.data: 325 await _try_aio_call(self._generic_data_cb, common.GenericData( 326 asdu_address=asdu_address, 327 io_address=io_address, 328 cause=data_cause, 329 identification=identification, 330 description=data.description, 331 value=data.value)) 332 333 async def _process_GENERIC_IDENTIFICATION(self, cause, asdu_address, io_address, element): # NOQA 334 pass 335 336 async def _process_LIST_OF_RECORDED_DISTURBANCES(self, cause, asdu_address, io_address, elements): # NOQA 337 pass 338 339 async def _process_READY_FOR_TRANSMISSION_OF_DISTURBANCE_DATA(self, cause, asdu_address, io_address, element): # NOQA 340 pass 341 342 async def _process_READY_FOR_TRANSMISSION_OF_A_CHANNEL(self, cause, asdu_address, io_address, element): # NOQA 343 pass 344 345 async def _process_READY_FOR_TRANSMISSION_OF_TAGS(self, cause, asdu_address, io_address, element): # NOQA 346 pass 347 348 async def _process_TRANSMISSION_OF_TAGS(self, cause, asdu_address, io_address, element): # NOQA 349 pass 350 351 async def _process_TRANSMISSION_OF_DISTURBANCE_VALUES(self, cause, asdu_address, io_address, element): # NOQA 352 pass 353 354 async def _process_END_OF_TRANSMISSION(self, cause, asdu_address, io_address, element): # NOQA 355 pass
Resource with lifetime control based on Group.
27 def __init__(self, 28 conn: link.Connection, 29 *, 30 data_cb: DataCb | None = None, 31 generic_data_cb: GenericDataCb | None = None): 32 self._conn = conn 33 self._data_cb = data_cb 34 self._generic_data_cb = generic_data_cb 35 36 self._log = logger.create_logger(mlog, conn.info) 37 self._comm_log = logger.CommunicationLogger(mlog, conn.info) 38 39 self._encoder = iec103.Encoder() 40 41 self._interrogate_lock = asyncio.Lock() 42 self._interrogate_req_id = None 43 self._interrogate_future = None 44 45 self._send_command_lock = asyncio.Lock() 46 self._send_command_req_id = None 47 self._send_command_future = None 48 49 self._interrogate_generic_lock = asyncio.Lock() 50 self._interrogate_generic_req_id = None 51 self._interrogate_generic_future = None 52 53 self._next_req_ids = (i % 0x100 for i in itertools.count(0)) 54 55 self._process_single_element_fns = { 56 iec103.AsduType.TIME_TAGGED_MESSAGE: self._process_TIME_TAGGED_MESSAGE, # NOQA 57 iec103.AsduType.TIME_TAGGED_MESSAGE_WITH_RELATIVE_TIME: self._process_TIME_TAGGED_MESSAGE_WITH_RELATIVE_TIME, # NOQA 58 iec103.AsduType.TIME_TAGGED_MEASURANDS_WITH_RELATIVE_TIME: self._process_TIME_TAGGED_MEASURANDS_WITH_RELATIVE_TIME, # NOQA 59 iec103.AsduType.IDENTIFICATION: self._process_IDENTIFICATION, 60 iec103.AsduType.TIME_SYNCHRONIZATION: self._process_TIME_SYNCHRONIZATION, # NOQA 61 iec103.AsduType.GENERAL_INTERROGATION_TERMINATION: self._process_GENERAL_INTERROGATION_TERMINATION, # NOQA 62 iec103.AsduType.GENERIC_DATA: self._process_GENERIC_DATA, 63 iec103.AsduType.GENERIC_IDENTIFICATION: self._process_GENERIC_IDENTIFICATION, # NOQA 64 iec103.AsduType.READY_FOR_TRANSMISSION_OF_DISTURBANCE_DATA: self._process_READY_FOR_TRANSMISSION_OF_DISTURBANCE_DATA, # NOQA 65 iec103.AsduType.READY_FOR_TRANSMISSION_OF_A_CHANNEL: self._process_READY_FOR_TRANSMISSION_OF_A_CHANNEL, # NOQA 66 iec103.AsduType.READY_FOR_TRANSMISSION_OF_TAGS: self._process_READY_FOR_TRANSMISSION_OF_TAGS, # NOQA 67 iec103.AsduType.TRANSMISSION_OF_TAGS: self._process_TRANSMISSION_OF_TAGS, # NOQA 68 iec103.AsduType.TRANSMISSION_OF_DISTURBANCE_VALUES: self._process_TRANSMISSION_OF_DISTURBANCE_VALUES, # NOQA 69 iec103.AsduType.END_OF_TRANSMISSION: self._process_END_OF_TRANSMISSION} # NOQA 70 self._process_multiple_elements_fns = { 71 iec103.AsduType.MEASURANDS_1: self._process_MEASURANDS_1, 72 iec103.AsduType.MEASURANDS_2: self._process_MEASURANDS_2, 73 iec103.AsduType.LIST_OF_RECORDED_DISTURBANCES: self._process_LIST_OF_RECORDED_DISTURBANCES} # NOQA 74 75 self.async_group.spawn(self._receive_loop) 76 77 self.async_group.spawn(aio.call_on_cancel, self._comm_log.log, 78 common.CommLogAction.CLOSE) 79 self._comm_log.log(common.CommLogAction.OPEN)
85 async def time_sync(self, 86 time: common.Time | None = None, 87 asdu_address: common.AsduAddress = 0xFF): 88 if not self.is_open: 89 raise ConnectionError() 90 91 time = time or common.time_from_datetime(datetime.datetime.now()) 92 io_address = common.IoAddress( 93 _FunctionType.GLOBAL_FUNCTION_TYPE.value, 94 _InformationNumber.GENERAL_INTERROGATION_OR_TIME_SYNCHRONIZATION.value) # NOQA 95 asdu = iec103.ASDU( 96 type=iec103.AsduType.TIME_SYNCHRONIZATION, 97 cause=iec103.Cause.TIME_SYNCHRONIZATION, 98 address=asdu_address, 99 ios=[iec103.IO( 100 address=io_address, 101 elements=[iec103.IoElement_TIME_SYNCHRONIZATION( 102 time=time)])]) 103 104 data = self._encoder.encode_asdu(asdu) 105 106 self._comm_log.log(common.CommLogAction.SEND, asdu) 107 108 await self._conn.send(data)
110 async def interrogate(self, asdu_address: common.AsduAddress): 111 async with self._interrogate_lock: 112 if not self.is_open: 113 raise ConnectionError() 114 115 scan_number = next(self._next_req_ids) 116 asdu = iec103.ASDU( 117 type=iec103.AsduType.GENERAL_INTERROGATION, 118 cause=iec103.Cause.GENERAL_INTERROGATION, 119 address=asdu_address, 120 ios=[iec103.IO( 121 address=iec103.IoAddress( 122 function_type=_FunctionType.GLOBAL_FUNCTION_TYPE.value, # NOQA 123 information_number=_InformationNumber.GENERAL_INTERROGATION_OR_TIME_SYNCHRONIZATION.value), # NOQA 124 elements=[iec103.IoElement_GENERAL_INTERROGATION( # NOQA 125 scan_number=scan_number)])]) 126 127 data = self._encoder.encode_asdu(asdu) 128 129 try: 130 self._comm_log.log(common.CommLogAction.SEND, asdu) 131 132 self._interrogate_req_id = scan_number 133 self._interrogate_future = asyncio.Future() 134 await self._conn.send(data) 135 await self._interrogate_future 136 137 finally: 138 self._interrogate_req_id = None 139 self._interrogate_future = None
141 async def send_command(self, 142 asdu_address: common.AsduAddress, 143 io_address: common.IoAddress, 144 value: common.DoubleValue 145 ) -> bool: 146 async with self._send_command_lock: 147 if not self.is_open: 148 raise ConnectionError() 149 150 return_identifier = next(self._next_req_ids) 151 asdu = iec103.ASDU( 152 type=iec103.AsduType.GENERAL_COMMAND, 153 cause=iec103.Cause.GENERAL_COMMAND, 154 address=asdu_address, 155 ios=[iec103.IO( 156 address=io_address, 157 elements=[iec103.IoElement_GENERAL_COMMAND( 158 value=value, 159 return_identifier=return_identifier)])]) 160 161 data = self._encoder.encode_asdu(asdu) 162 163 try: 164 self._comm_log.log(common.CommLogAction.SEND, asdu) 165 166 self._send_command_req_id = return_identifier 167 self._send_command_future = asyncio.Future() 168 await self._conn.send(data) 169 return await self._send_command_future 170 171 finally: 172 self._send_command_req_id = None 173 self._send_command_future = None
175 async def interrogate_generic(self, asdu_address: common.AsduAddress): 176 async with self._interrogate_generic_lock: 177 if not self.is_open: 178 raise ConnectionError() 179 180 return_identifier = next(self._next_req_ids) 181 asdu = iec103.ASDU( 182 type=iec103.AsduType.GENERIC_COMMAND, 183 cause=iec103.Cause.GENERAL_INTERROGATION, 184 address=asdu_address, 185 ios=[iec103.IO( 186 address=iec103.IoAddress( 187 function_type=_FunctionType.GENERIC_FUNCTION_TYPE.value, # NOQA 188 information_number=_InformationNumber.GENERAL_INTERROGATION_OF_GENERIC_DATA.value), # NOQA 189 elements=[iec103.IoElement_GENERIC_COMMAND( 190 return_identifier=return_identifier, 191 data=[])])]) 192 193 data = self._encoder.encode_asdu(asdu) 194 195 try: 196 self._comm_log.log(common.CommLogAction.SEND, asdu) 197 198 self._interrogate_generic_req_id = return_identifier 199 self._interrogate_generic_future = asyncio.Future() 200 await self._conn.send(data) 201 await self._interrogate_generic_future 202 203 finally: 204 self._interrogate_generic_req_id = None 205 self._interrogate_generic_future = None