hat.drivers.iec103
IEC 60870-5-103 communication protocol
1"""IEC 60870-5-103 communication protocol""" 2 3from hat.drivers.iec103.common import (Description, 4 IoAddress, 5 Identification, 6 TimeSize, 7 Time, 8 ValueType, 9 NoneValue, 10 TextValue, 11 BitstringValue, 12 UIntValue, 13 IntValue, 14 UFixedValue, 15 FixedValue, 16 Real32Value, 17 Real64Value, 18 DoubleValue, 19 SingleValue, 20 ExtendedDoubleValue, 21 MeasurandValue, 22 TimeValue, 23 IdentificationValue, 24 RelativeTimeValue, 25 IoAddressValue, 26 DoubleWithTimeValue, 27 DoubleWithRelativeTimeValue, 28 MeasurandWithRelativeTimeValue, 29 TextNumberValue, 30 ReplyValue, 31 ArrayValue, 32 IndexValue, 33 Value, 34 AsduAddress, 35 DataCause, 36 GenericDataCause, 37 MeasurandType, 38 MeasurandValues, 39 Data, 40 GenericData, 41 time_from_datetime, 42 time_to_datetime) 43from hat.drivers.iec103.master import (DataCb, 44 GenericDataCb, 45 MasterConnection) 46 47 48__all__ = ['Description', 49 'IoAddress', 50 'Identification', 51 'TimeSize', 52 'Time', 53 'ValueType', 54 'NoneValue', 55 'TextValue', 56 'BitstringValue', 57 'UIntValue', 58 'IntValue', 59 'UFixedValue', 60 'FixedValue', 61 'Real32Value', 62 'Real64Value', 63 'DoubleValue', 64 'SingleValue', 65 'ExtendedDoubleValue', 66 'MeasurandValue', 67 'TimeValue', 68 'IdentificationValue', 69 'RelativeTimeValue', 70 'IoAddressValue', 71 'DoubleWithTimeValue', 72 'DoubleWithRelativeTimeValue', 73 'MeasurandWithRelativeTimeValue', 74 'TextNumberValue', 75 'ReplyValue', 76 'ArrayValue', 77 'IndexValue', 78 'Value', 79 'AsduAddress', 80 'DataCause', 81 'GenericDataCause', 82 'MeasurandType', 83 'MeasurandValues', 84 'Data', 85 'GenericData', 86 'time_from_datetime', 87 'time_to_datetime', 88 'DataCb', 89 'GenericDataCb', 90 'MasterConnection']
67class Description(enum.Enum): 68 NOT_SPECIFIED = 0 69 ACTUAL_VALUE = 1 70 DEFAULT_VALUE = 2 71 RANGE = 3 72 PRECISION = 5 73 FACTOR = 6 74 REFERENCE = 7 75 ENUMERATION = 8 76 DIMENSION = 9 77 DESCRIPTION = 10 78 PASSWORD = 12 79 READ_ONLY = 13 80 WRITE_ONLY = 14 81 IO_ADDRESS = 19 82 EVENT = 20 83 TEXT_ARRAY = 21 84 VALUE_ARRAY = 22 85 RELATED = 23
An enumeration.
126class IoAddress(typing.NamedTuple): 127 function_type: int 128 """function_type is in range [0, 255]""" 129 information_number: int 130 """information_number is in range [0, 255]"""
IoAddress(function_type, information_number)
133class Identification(typing.NamedTuple): 134 group_id: int 135 """group_id in range [0, 255]""" 136 entry_id: int 137 """entry_id in range [0, 255]"""
Identification(group_id, entry_id)
An enumeration.
31class Time(typing.NamedTuple): 32 size: TimeSize 33 milliseconds: int 34 """milliseconds in range [0, 59999]""" 35 invalid: bool | None 36 """available for size THREE, FOUR, SEVEN""" 37 minutes: int | None 38 """available for size THREE, FOUR, SEVEN (minutes in range [0, 59])""" 39 summer_time: bool | None 40 """available for size FOUR, SEVEN""" 41 hours: int | None 42 """available for size FOUR, SEVEN (hours in range [0, 23])""" 43 day_of_week: int | None 44 """available for size SEVEN (day_of_week in range [1, 7])""" 45 day_of_month: int | None 46 """available for size SEVEN (day_of_month in range [1, 31])""" 47 months: int | None 48 """available for size SEVEN (months in range [1, 12])""" 49 years: int | None 50 """available for size SEVEN (years in range [0, 99])"""
Time(size, milliseconds, invalid, minutes, summer_time, hours, day_of_week, day_of_month, months, years)
Create new instance of Time(size, milliseconds, invalid, minutes, summer_time, hours, day_of_week, day_of_month, months, years)
140class ValueType(enum.Enum): 141 NONE = 0 142 TEXT = 1 143 BITSTRING = 2 144 UINT = 3 145 INT = 4 146 UFIXED = 5 147 FIXED = 6 148 REAL32 = 7 149 REAL64 = 8 150 DOUBLE = 9 151 SINGLE = 10 152 EXTENDED_DOUBLE = 11 153 MEASURAND = 12 154 TIME = 14 155 IDENTIFICATION = 15 156 RELATIVE_TIME = 16 157 IO_ADDRESS = 17 158 DOUBLE_WITH_TIME = 18 159 DOUBLE_WITH_RELATIVE_TIME = 19 160 MEASURAND_WITH_RELATIVE_TIME = 20 161 TEXT_NUMBER = 21 162 REPLY = 22 163 ARRAY = 23 164 INDEX = 24
An enumeration.
NoneValue()
TextValue(value,)
BitstringValue(value,)
UIntValue(value,)
IntValue(value,)
UFixedValue(value,)
FixedValue(value,)
Real32Value(value,)
Real64Value(value,)
An enumeration.
An enumeration.
An enumeration.
224class MeasurandValue(typing.NamedTuple): 225 overflow: bool 226 invalid: bool 227 value: float 228 """value in range [-1.0, 1.0)"""
MeasurandValue(overflow, invalid, value)
TimeValue(value,)
IdentificationValue(value,)
RelativeTimeValue(value,)
IoAddressValue(value,)
249class DoubleWithTimeValue(typing.NamedTuple): 250 value: DoubleValue 251 time: Time 252 """time size is FOUR""" 253 supplementary: int 254 """supplementary in range [0, 255]"""
DoubleWithTimeValue(value, time, supplementary)
Create new instance of DoubleWithTimeValue(value, time, supplementary)
257class DoubleWithRelativeTimeValue(typing.NamedTuple): 258 value: DoubleValue 259 relative_time: int 260 """relative_time in range [0, 65535]""" 261 fault_number: int 262 """fault_number in range [0, 65535]""" 263 time: Time 264 """time size is FOUR""" 265 supplementary: int 266 """supplementary in range [0, 255]"""
DoubleWithRelativeTimeValue(value, relative_time, fault_number, time, supplementary)
Create new instance of DoubleWithRelativeTimeValue(value, relative_time, fault_number, time, supplementary)
269class MeasurandWithRelativeTimeValue(typing.NamedTuple): 270 value: float 271 relative_time: int 272 """relative_time in range [0, 65535]""" 273 fault_number: int 274 """fault_number in range [0, 65535]""" 275 time: Time 276 """time size is FOUR"""
MeasurandWithRelativeTimeValue(value, relative_time, fault_number, time)
Create new instance of MeasurandWithRelativeTimeValue(value, relative_time, fault_number, time)
TextNumberValue(value,)
283class ReplyValue(enum.Enum): 284 ACK = 0 285 INVALID_IDENTIFICATION = 1 286 DATA_NOT_EXISTS = 2 287 DATA_NOT_AVAILABLE = 3 288 VERIFY_ERROR = 4 289 OUT_OF_RANGE = 5 290 ENTRY_TO_LARGE = 6 291 TOO_MANY_COMMANDS = 7 292 ENTRY_READ_ONLY = 8 293 PASSWORD_PROTECTED = 9 294 IN_PROGRESS = 10 295 FOLLOWING_DESCRIPTION = 11
An enumeration.
298class ArrayValue(typing.NamedTuple): 299 value_type: ValueType 300 more_follows: bool 301 values: typing.List['Value']
ArrayValue(value_type, more_follows, values)
Create new instance of ArrayValue(value_type, more_follows, values)
Alias for field number 2
IndexValue(value,)
44class DataCause(enum.Enum): 45 SPONTANEOUS = iec103.Cause.SPONTANEOUS.value 46 CYCLIC = iec103.Cause.CYCLIC.value 47 TEST_MODE = iec103.Cause.TEST_MODE.value 48 GENERAL_INTERROGATION = iec103.Cause.GENERAL_INTERROGATION.value 49 LOCAL_OPERATION = iec103.Cause.LOCAL_OPERATION.value 50 REMOTE_OPERATION = iec103.Cause.REMOTE_OPERATION.value
An enumeration.
53class GenericDataCause(enum.Enum): 54 SPONTANEOUS = iec103.Cause.SPONTANEOUS.value 55 CYCLIC = iec103.Cause.CYCLIC.value 56 TEST_MODE = iec103.Cause.TEST_MODE.value 57 GENERAL_INTERROGATION = iec103.Cause.GENERAL_INTERROGATION.value 58 LOCAL_OPERATION = iec103.Cause.LOCAL_OPERATION.value 59 REMOTE_OPERATION = iec103.Cause.REMOTE_OPERATION.value 60 WRITE_ACK = iec103.Cause.GENERIC_WRITE_COMMAND.value 61 WRITE_NACK = iec103.Cause.GENERIC_WRITE_COMMAND_NACK.value 62 READ_ACK = iec103.Cause.GENERIC_READ_COMMAND.value 63 READ_NACK = iec103.Cause.GENERIC_READ_COMMAND_NACK.value 64 WRITE_CONFIRMATION = iec103.Cause.GENERIC_WRITE_CONFIRMATION.value
An enumeration.
67class MeasurandType(enum.Enum): 68 M1_I_L2 = (iec103.AsduType.MEASURANDS_1.value, 0) 69 M1_U_L12 = (iec103.AsduType.MEASURANDS_1.value, 1) 70 M1_P = (iec103.AsduType.MEASURANDS_1.value, 2) 71 M1_Q = (iec103.AsduType.MEASURANDS_1.value, 3) 72 M2_I_L1 = (iec103.AsduType.MEASURANDS_2.value, 0) 73 M2_I_L2 = (iec103.AsduType.MEASURANDS_2.value, 1) 74 M2_I_L3 = (iec103.AsduType.MEASURANDS_2.value, 2) 75 M2_U_L1E = (iec103.AsduType.MEASURANDS_2.value, 3) 76 M2_U_L2E = (iec103.AsduType.MEASURANDS_2.value, 4) 77 M2_U_L3E = (iec103.AsduType.MEASURANDS_2.value, 5) 78 M2_P = (iec103.AsduType.MEASURANDS_2.value, 6) 79 M2_Q = (iec103.AsduType.MEASURANDS_2.value, 7) 80 M2_F = (iec103.AsduType.MEASURANDS_2.value, 8)
An enumeration.
MeasurandValues(values,)
Create new instance of MeasurandValues(values,)
87class Data(typing.NamedTuple): 88 asdu_address: AsduAddress 89 io_address: IoAddress 90 cause: DataCause | OtherCause 91 value: (DoubleWithTimeValue | 92 DoubleWithRelativeTimeValue | 93 MeasurandValues | 94 MeasurandWithRelativeTimeValue)
Data(asdu_address, io_address, cause, value)
Create new instance of Data(asdu_address, io_address, cause, value)
Alias for field number 3
97class GenericData(typing.NamedTuple): 98 asdu_address: AsduAddress 99 io_address: IoAddress 100 cause: GenericDataCause | OtherCause 101 identification: Identification 102 description: Description 103 value: ArrayValue
GenericData(asdu_address, io_address, cause, identification, description, value)
Create new instance of GenericData(asdu_address, io_address, cause, identification, description, value)
66def time_from_datetime(dt: datetime.datetime, 67 invalid: bool = False 68 ) -> Time: 69 """Create Time from datetime.datetime""" 70 # TODO document edge cases (local time, os implementation, ...) 71 # rounding microseconds to the nearest millisecond 72 dt_rounded = ( 73 dt.replace(microsecond=0) + 74 datetime.timedelta(milliseconds=round(dt.microsecond / 1000))) 75 local_time = time.localtime(dt_rounded.timestamp()) 76 77 return Time( 78 size=TimeSize.SEVEN, 79 milliseconds=(local_time.tm_sec * 1000 + 80 dt_rounded.microsecond // 1000), 81 invalid=invalid, 82 minutes=local_time.tm_min, 83 summer_time=bool(local_time.tm_isdst), 84 hours=local_time.tm_hour, 85 day_of_week=local_time.tm_wday + 1, 86 day_of_month=local_time.tm_mday, 87 months=local_time.tm_mon, 88 years=local_time.tm_year % 100)
Create Time from datetime.datetime
91def time_to_datetime(t: Time 92 ) -> datetime.datetime: 93 """Convert Time to datetime.datetime""" 94 # TODO document edge cases (local time, os implementation, ...) 95 # TODO maybe allow diferent time size (use now for time) 96 if t.size != TimeSize.SEVEN: 97 raise ValueError('unsupported time size') 98 99 local_dt = datetime.datetime( 100 year=2000 + t.years if t.years < 70 else 1900 + t.years, 101 month=t.months, 102 day=t.day_of_month, 103 hour=t.hours, 104 minute=t.minutes, 105 second=int(t.milliseconds / 1000), 106 microsecond=(t.milliseconds % 1000) * 1000, 107 fold=not t.summer_time) 108 109 return local_dt.astimezone(tz=datetime.timezone.utc)
Convert Time to datetime.datetime
24class MasterConnection(aio.Resource): 25 26 def __init__(self, 27 conn: unbalanced.MasterConnection, 28 *, 29 data_cb: DataCb | None = None, 30 generic_data_cb: GenericDataCb | None = None): 31 self._conn = conn 32 self._data_cb = data_cb 33 self._generic_data_cb = generic_data_cb 34 35 self._encoder = iec103.Encoder() 36 37 self._interrogate_lock = asyncio.Lock() 38 self._interrogate_req_id = None 39 self._interrogate_future = None 40 41 self._send_command_lock = asyncio.Lock() 42 self._send_command_req_id = None 43 self._send_command_future = None 44 45 self._interrogate_generic_lock = asyncio.Lock() 46 self._interrogate_generic_req_id = None 47 self._interrogate_generic_future = None 48 49 self._next_req_ids = (i % 0x100 for i in itertools.count(0)) 50 51 self._process_single_element_fns = { 52 iec103.AsduType.TIME_TAGGED_MESSAGE: self._process_TIME_TAGGED_MESSAGE, # NOQA 53 iec103.AsduType.TIME_TAGGED_MESSAGE_WITH_RELATIVE_TIME: self._process_TIME_TAGGED_MESSAGE_WITH_RELATIVE_TIME, # NOQA 54 iec103.AsduType.TIME_TAGGED_MEASURANDS_WITH_RELATIVE_TIME: self._process_TIME_TAGGED_MEASURANDS_WITH_RELATIVE_TIME, # NOQA 55 iec103.AsduType.IDENTIFICATION: self._process_IDENTIFICATION, 56 iec103.AsduType.TIME_SYNCHRONIZATION: self._process_TIME_SYNCHRONIZATION, # NOQA 57 iec103.AsduType.GENERAL_INTERROGATION_TERMINATION: self._process_GENERAL_INTERROGATION_TERMINATION, # NOQA 58 iec103.AsduType.GENERIC_DATA: self._process_GENERIC_DATA, 59 iec103.AsduType.GENERIC_IDENTIFICATION: self._process_GENERIC_IDENTIFICATION, # NOQA 60 iec103.AsduType.READY_FOR_TRANSMISSION_OF_DISTURBANCE_DATA: self._process_READY_FOR_TRANSMISSION_OF_DISTURBANCE_DATA, # NOQA 61 iec103.AsduType.READY_FOR_TRANSMISSION_OF_A_CHANNEL: self._process_READY_FOR_TRANSMISSION_OF_A_CHANNEL, # NOQA 62 iec103.AsduType.READY_FOR_TRANSMISSION_OF_TAGS: self._process_READY_FOR_TRANSMISSION_OF_TAGS, # NOQA 63 iec103.AsduType.TRANSMISSION_OF_TAGS: self._process_TRANSMISSION_OF_TAGS, # NOQA 64 iec103.AsduType.TRANSMISSION_OF_DISTURBANCE_VALUES: self._process_TRANSMISSION_OF_DISTURBANCE_VALUES, # NOQA 65 iec103.AsduType.END_OF_TRANSMISSION: self._process_END_OF_TRANSMISSION} # NOQA 66 self._process_multiple_elements_fns = { 67 iec103.AsduType.MEASURANDS_1: self._process_MEASURANDS_1, 68 iec103.AsduType.MEASURANDS_2: self._process_MEASURANDS_2, 69 iec103.AsduType.LIST_OF_RECORDED_DISTURBANCES: self._process_LIST_OF_RECORDED_DISTURBANCES} # NOQA 70 71 self.async_group.spawn(self._receive_loop) 72 73 @property 74 def async_group(self): 75 return self._conn.async_group 76 77 async def time_sync(self, 78 time: common.Time | None = None, 79 asdu_address: common.AsduAddress = 0xFF): 80 if not self.is_open: 81 raise ConnectionError() 82 83 time = time or common.time_from_datetime(datetime.datetime.now()) 84 io_address = common.IoAddress( 85 _FunctionType.GLOBAL_FUNCTION_TYPE.value, 86 _InformationNumber.GENERAL_INTERROGATION_OR_TIME_SYNCHRONIZATION.value) # NOQA 87 asdu = iec103.ASDU( 88 type=iec103.AsduType.TIME_SYNCHRONIZATION, 89 cause=iec103.Cause.TIME_SYNCHRONIZATION, 90 address=asdu_address, 91 ios=[iec103.IO( 92 address=io_address, 93 elements=[iec103.IoElement_TIME_SYNCHRONIZATION( 94 time=time)])]) 95 data = self._encoder.encode_asdu(asdu) 96 97 await self._conn.send(data) 98 99 async def interrogate(self, asdu_address: common.AsduAddress): 100 async with self._interrogate_lock: 101 if not self.is_open: 102 raise ConnectionError() 103 104 scan_number = next(self._next_req_ids) 105 asdu = iec103.ASDU( 106 type=iec103.AsduType.GENERAL_INTERROGATION, 107 cause=iec103.Cause.GENERAL_INTERROGATION, 108 address=asdu_address, 109 ios=[iec103.IO( 110 address=iec103.IoAddress( 111 function_type=_FunctionType.GLOBAL_FUNCTION_TYPE.value, # NOQA 112 information_number=_InformationNumber.GENERAL_INTERROGATION_OR_TIME_SYNCHRONIZATION.value), # NOQA 113 elements=[iec103.IoElement_GENERAL_INTERROGATION( # NOQA 114 scan_number=scan_number)])]) 115 data = self._encoder.encode_asdu(asdu) 116 117 try: 118 self._interrogate_req_id = scan_number 119 self._interrogate_future = asyncio.Future() 120 await self._conn.send(data) 121 await self._interrogate_future 122 123 finally: 124 self._interrogate_req_id = None 125 self._interrogate_future = None 126 127 async def send_command(self, 128 asdu_address: common.AsduAddress, 129 io_address: common.IoAddress, 130 value: common.DoubleValue 131 ) -> bool: 132 async with self._send_command_lock: 133 if not self.is_open: 134 raise ConnectionError() 135 136 return_identifier = next(self._next_req_ids) 137 asdu = iec103.ASDU( 138 type=iec103.AsduType.GENERAL_COMMAND, 139 cause=iec103.Cause.GENERAL_COMMAND, 140 address=asdu_address, 141 ios=[iec103.IO( 142 address=io_address, 143 elements=[iec103.IoElement_GENERAL_COMMAND( 144 value=value, 145 return_identifier=return_identifier)])]) 146 data = self._encoder.encode_asdu(asdu) 147 148 try: 149 self._send_command_req_id = return_identifier 150 self._send_command_future = asyncio.Future() 151 await self._conn.send(data) 152 return await self._send_command_future 153 154 finally: 155 self._send_command_req_id = None 156 self._send_command_future = None 157 158 async def interrogate_generic(self, asdu_address: common.AsduAddress): 159 async with self._interrogate_generic_lock: 160 if not self.is_open: 161 raise ConnectionError() 162 163 return_identifier = next(self._next_req_ids) 164 asdu = iec103.ASDU( 165 type=iec103.AsduType.GENERIC_COMMAND, 166 cause=iec103.Cause.GENERAL_INTERROGATION, 167 address=asdu_address, 168 ios=[iec103.IO( 169 address=iec103.IoAddress( 170 function_type=_FunctionType.GENERIC_FUNCTION_TYPE.value, # NOQA 171 information_number=_InformationNumber.GENERAL_INTERROGATION_OF_GENERIC_DATA.value), # NOQA 172 elements=[iec103.IoElement_GENERIC_COMMAND( 173 return_identifier=return_identifier, 174 data=[])])]) 175 data = self._encoder.encode_asdu(asdu) 176 177 try: 178 self._interrogate_generic_req_id = return_identifier 179 self._interrogate_generic_future = asyncio.Future() 180 await self._conn.send(data) 181 await self._interrogate_generic_future 182 183 finally: 184 self._interrogate_generic_req_id = None 185 self._interrogate_generic_future = None 186 187 async def _receive_loop(self): 188 try: 189 while True: 190 data = await self._conn.receive() 191 asdu, _ = self._encoder.decode_asdu(data) 192 193 for io in asdu.ios: 194 if asdu.type in self._process_single_element_fns: 195 fn = self._process_single_element_fns[asdu.type] 196 for element in io.elements: 197 await fn(asdu.cause, asdu.address, io.address, 198 element) 199 200 elif asdu.type in self._process_multiple_elements_fns: 201 fn = self._process_multiple_elements_fns[asdu.type] 202 await fn(asdu.cause, asdu.address, io.address, 203 io.elements) 204 205 else: 206 raise ValueError('unsupported asdu type') 207 208 except Exception as e: 209 mlog.error("receive loop error: %s", e, exc_info=e) 210 211 finally: 212 self.close() 213 _try_set_exception(self._interrogate_future, ConnectionError()) 214 _try_set_exception(self._send_command_future, ConnectionError()) 215 _try_set_exception(self._interrogate_generic_future, 216 ConnectionError()) 217 218 async def _process_TIME_TAGGED_MESSAGE(self, cause, asdu_address, io_address, element): # NOQA 219 if cause == iec103.Cause.GENERAL_COMMAND: 220 if element.value.supplementary == self._send_command_req_id: 221 _try_set_result(self._send_command_future, True) 222 223 elif cause == iec103.Cause.GENERAL_COMMAND_NACK: 224 if element.value.supplementary == self._send_command_req_id: 225 _try_set_result(self._send_command_future, False) 226 227 else: 228 await _try_aio_call(self._data_cb, common.Data( 229 asdu_address=asdu_address, 230 io_address=io_address, 231 cause=_try_decode_cause(cause, common.DataCause), 232 value=element.value)) 233 234 async def _process_TIME_TAGGED_MESSAGE_WITH_RELATIVE_TIME(self, cause, asdu_address, io_address, element): # NOQA 235 await _try_aio_call(self._data_cb, common.Data( 236 asdu_address=asdu_address, 237 io_address=io_address, 238 cause=_try_decode_cause(cause, common.DataCause), 239 value=element.value)) 240 241 async def _process_MEASURANDS_1(self, cause, asdu_address, io_address, elements): # NOQA 242 value = common.MeasurandValues(values={}) 243 for i, element in enumerate(elements): 244 measurand_type = common.MeasurandType(( 245 iec103.AsduType.MEASURANDS_1.value, i)) 246 value.values[measurand_type] = element.value 247 248 await _try_aio_call(self._data_cb, common.Data( 249 asdu_address=asdu_address, 250 io_address=io_address, 251 cause=_try_decode_cause(cause, common.DataCause), 252 value=value)) 253 254 async def _process_TIME_TAGGED_MEASURANDS_WITH_RELATIVE_TIME(self, cause, asdu_address, io_address, element): # NOQA 255 await _try_aio_call(self._data_cb, common.Data( 256 asdu_address=asdu_address, 257 io_address=io_address, 258 cause=_try_decode_cause(cause, common.DataCause), 259 value=element.value)) 260 261 async def _process_IDENTIFICATION(self, cause, asdu_address, io_address, element): # NOQA 262 mlog.debug("received device identification " 263 "(compatibility: %s; value: %s; software: %s)", 264 element.compatibility, 265 bytes(element.value), 266 bytes(element.software)) 267 268 async def _process_TIME_SYNCHRONIZATION(self, cause, asdu_address, io_address, element): # NOQA 269 mlog.info("received time sync response") 270 271 async def _process_GENERAL_INTERROGATION_TERMINATION(self, cause, asdu_address, io_address, element): # NOQA 272 if element.scan_number != self._interrogate_req_id: 273 return 274 _try_set_result(self._interrogate_future, None) 275 276 async def _process_MEASURANDS_2(self, cause, asdu_address, io_address, elements): # NOQA 277 value = common.MeasurandValues(values={}) 278 for i, element in enumerate(elements): 279 measurand_type = common.MeasurandType(( 280 iec103.AsduType.MEASURANDS_2.value, i)) 281 value.values[measurand_type] = element.value 282 283 await _try_aio_call(self._data_cb, common.Data( 284 asdu_address=asdu_address, 285 io_address=io_address, 286 cause=_try_decode_cause(cause, common.DataCause), 287 value=value)) 288 289 async def _process_GENERIC_DATA(self, cause, asdu_address, io_address, element): # NOQA 290 if cause == iec103.Cause.TERMINATION_OF_GENERAL_INTERROGATION: # NOQA 291 if element.return_identifier == self._interrogate_generic_req_id: 292 _try_set_result(self._interrogate_generic_future, None) 293 294 else: 295 data_cause = _try_decode_cause(cause, common.GenericDataCause) 296 for identification, data in element.data: 297 await _try_aio_call(self._generic_data_cb, common.GenericData( 298 asdu_address=asdu_address, 299 io_address=io_address, 300 cause=data_cause, 301 identification=identification, 302 description=data.description, 303 value=data.value)) 304 305 async def _process_GENERIC_IDENTIFICATION(self, cause, asdu_address, io_address, element): # NOQA 306 pass 307 308 async def _process_LIST_OF_RECORDED_DISTURBANCES(self, cause, asdu_address, io_address, elements): # NOQA 309 pass 310 311 async def _process_READY_FOR_TRANSMISSION_OF_DISTURBANCE_DATA(self, cause, asdu_address, io_address, element): # NOQA 312 pass 313 314 async def _process_READY_FOR_TRANSMISSION_OF_A_CHANNEL(self, cause, asdu_address, io_address, element): # NOQA 315 pass 316 317 async def _process_READY_FOR_TRANSMISSION_OF_TAGS(self, cause, asdu_address, io_address, element): # NOQA 318 pass 319 320 async def _process_TRANSMISSION_OF_TAGS(self, cause, asdu_address, io_address, element): # NOQA 321 pass 322 323 async def _process_TRANSMISSION_OF_DISTURBANCE_VALUES(self, cause, asdu_address, io_address, element): # NOQA 324 pass 325 326 async def _process_END_OF_TRANSMISSION(self, cause, asdu_address, io_address, element): # NOQA 327 pass
Resource with lifetime control based on Group
.
26 def __init__(self, 27 conn: unbalanced.MasterConnection, 28 *, 29 data_cb: DataCb | None = None, 30 generic_data_cb: GenericDataCb | None = None): 31 self._conn = conn 32 self._data_cb = data_cb 33 self._generic_data_cb = generic_data_cb 34 35 self._encoder = iec103.Encoder() 36 37 self._interrogate_lock = asyncio.Lock() 38 self._interrogate_req_id = None 39 self._interrogate_future = None 40 41 self._send_command_lock = asyncio.Lock() 42 self._send_command_req_id = None 43 self._send_command_future = None 44 45 self._interrogate_generic_lock = asyncio.Lock() 46 self._interrogate_generic_req_id = None 47 self._interrogate_generic_future = None 48 49 self._next_req_ids = (i % 0x100 for i in itertools.count(0)) 50 51 self._process_single_element_fns = { 52 iec103.AsduType.TIME_TAGGED_MESSAGE: self._process_TIME_TAGGED_MESSAGE, # NOQA 53 iec103.AsduType.TIME_TAGGED_MESSAGE_WITH_RELATIVE_TIME: self._process_TIME_TAGGED_MESSAGE_WITH_RELATIVE_TIME, # NOQA 54 iec103.AsduType.TIME_TAGGED_MEASURANDS_WITH_RELATIVE_TIME: self._process_TIME_TAGGED_MEASURANDS_WITH_RELATIVE_TIME, # NOQA 55 iec103.AsduType.IDENTIFICATION: self._process_IDENTIFICATION, 56 iec103.AsduType.TIME_SYNCHRONIZATION: self._process_TIME_SYNCHRONIZATION, # NOQA 57 iec103.AsduType.GENERAL_INTERROGATION_TERMINATION: self._process_GENERAL_INTERROGATION_TERMINATION, # NOQA 58 iec103.AsduType.GENERIC_DATA: self._process_GENERIC_DATA, 59 iec103.AsduType.GENERIC_IDENTIFICATION: self._process_GENERIC_IDENTIFICATION, # NOQA 60 iec103.AsduType.READY_FOR_TRANSMISSION_OF_DISTURBANCE_DATA: self._process_READY_FOR_TRANSMISSION_OF_DISTURBANCE_DATA, # NOQA 61 iec103.AsduType.READY_FOR_TRANSMISSION_OF_A_CHANNEL: self._process_READY_FOR_TRANSMISSION_OF_A_CHANNEL, # NOQA 62 iec103.AsduType.READY_FOR_TRANSMISSION_OF_TAGS: self._process_READY_FOR_TRANSMISSION_OF_TAGS, # NOQA 63 iec103.AsduType.TRANSMISSION_OF_TAGS: self._process_TRANSMISSION_OF_TAGS, # NOQA 64 iec103.AsduType.TRANSMISSION_OF_DISTURBANCE_VALUES: self._process_TRANSMISSION_OF_DISTURBANCE_VALUES, # NOQA 65 iec103.AsduType.END_OF_TRANSMISSION: self._process_END_OF_TRANSMISSION} # NOQA 66 self._process_multiple_elements_fns = { 67 iec103.AsduType.MEASURANDS_1: self._process_MEASURANDS_1, 68 iec103.AsduType.MEASURANDS_2: self._process_MEASURANDS_2, 69 iec103.AsduType.LIST_OF_RECORDED_DISTURBANCES: self._process_LIST_OF_RECORDED_DISTURBANCES} # NOQA 70 71 self.async_group.spawn(self._receive_loop)
77 async def time_sync(self, 78 time: common.Time | None = None, 79 asdu_address: common.AsduAddress = 0xFF): 80 if not self.is_open: 81 raise ConnectionError() 82 83 time = time or common.time_from_datetime(datetime.datetime.now()) 84 io_address = common.IoAddress( 85 _FunctionType.GLOBAL_FUNCTION_TYPE.value, 86 _InformationNumber.GENERAL_INTERROGATION_OR_TIME_SYNCHRONIZATION.value) # NOQA 87 asdu = iec103.ASDU( 88 type=iec103.AsduType.TIME_SYNCHRONIZATION, 89 cause=iec103.Cause.TIME_SYNCHRONIZATION, 90 address=asdu_address, 91 ios=[iec103.IO( 92 address=io_address, 93 elements=[iec103.IoElement_TIME_SYNCHRONIZATION( 94 time=time)])]) 95 data = self._encoder.encode_asdu(asdu) 96 97 await self._conn.send(data)
99 async def interrogate(self, asdu_address: common.AsduAddress): 100 async with self._interrogate_lock: 101 if not self.is_open: 102 raise ConnectionError() 103 104 scan_number = next(self._next_req_ids) 105 asdu = iec103.ASDU( 106 type=iec103.AsduType.GENERAL_INTERROGATION, 107 cause=iec103.Cause.GENERAL_INTERROGATION, 108 address=asdu_address, 109 ios=[iec103.IO( 110 address=iec103.IoAddress( 111 function_type=_FunctionType.GLOBAL_FUNCTION_TYPE.value, # NOQA 112 information_number=_InformationNumber.GENERAL_INTERROGATION_OR_TIME_SYNCHRONIZATION.value), # NOQA 113 elements=[iec103.IoElement_GENERAL_INTERROGATION( # NOQA 114 scan_number=scan_number)])]) 115 data = self._encoder.encode_asdu(asdu) 116 117 try: 118 self._interrogate_req_id = scan_number 119 self._interrogate_future = asyncio.Future() 120 await self._conn.send(data) 121 await self._interrogate_future 122 123 finally: 124 self._interrogate_req_id = None 125 self._interrogate_future = None
127 async def send_command(self, 128 asdu_address: common.AsduAddress, 129 io_address: common.IoAddress, 130 value: common.DoubleValue 131 ) -> bool: 132 async with self._send_command_lock: 133 if not self.is_open: 134 raise ConnectionError() 135 136 return_identifier = next(self._next_req_ids) 137 asdu = iec103.ASDU( 138 type=iec103.AsduType.GENERAL_COMMAND, 139 cause=iec103.Cause.GENERAL_COMMAND, 140 address=asdu_address, 141 ios=[iec103.IO( 142 address=io_address, 143 elements=[iec103.IoElement_GENERAL_COMMAND( 144 value=value, 145 return_identifier=return_identifier)])]) 146 data = self._encoder.encode_asdu(asdu) 147 148 try: 149 self._send_command_req_id = return_identifier 150 self._send_command_future = asyncio.Future() 151 await self._conn.send(data) 152 return await self._send_command_future 153 154 finally: 155 self._send_command_req_id = None 156 self._send_command_future = None
158 async def interrogate_generic(self, asdu_address: common.AsduAddress): 159 async with self._interrogate_generic_lock: 160 if not self.is_open: 161 raise ConnectionError() 162 163 return_identifier = next(self._next_req_ids) 164 asdu = iec103.ASDU( 165 type=iec103.AsduType.GENERIC_COMMAND, 166 cause=iec103.Cause.GENERAL_INTERROGATION, 167 address=asdu_address, 168 ios=[iec103.IO( 169 address=iec103.IoAddress( 170 function_type=_FunctionType.GENERIC_FUNCTION_TYPE.value, # NOQA 171 information_number=_InformationNumber.GENERAL_INTERROGATION_OF_GENERIC_DATA.value), # NOQA 172 elements=[iec103.IoElement_GENERIC_COMMAND( 173 return_identifier=return_identifier, 174 data=[])])]) 175 data = self._encoder.encode_asdu(asdu) 176 177 try: 178 self._interrogate_generic_req_id = return_identifier 179 self._interrogate_generic_future = asyncio.Future() 180 await self._conn.send(data) 181 await self._interrogate_generic_future 182 183 finally: 184 self._interrogate_generic_req_id = None 185 self._interrogate_generic_future = None