hat.drivers.iec60870.encodings.encoder
1import collections 2import typing 3 4from hat import util 5 6from hat.drivers.iec60870.encodings import common 7 8 9AsduType: typing.TypeAlias = int 10AsduTypeTimeSizes: typing.TypeAlias = dict[AsduType, common.TimeSize] 11DecodeIoElementCb: typing.TypeAlias = typing.Callable[[util.Bytes, AsduType], 12 tuple[typing.Any, 13 util.Bytes]] 14EncodeIoElementCb: typing.TypeAlias = typing.Callable[[typing.Any, AsduType], 15 typing.Iterable[int]] 16 17 18def decode_time(time_bytes: util.Bytes, 19 time_size: common.TimeSize 20 ) -> common.Time: 21 milliseconds = (time_bytes[1] << 8) | time_bytes[0] 22 invalid = (bool(time_bytes[2] & 0x80) if time_size.value > 2 else None) 23 minutes = (time_bytes[2] & 0x3F if time_size.value > 2 else None) 24 summer_time = (bool(time_bytes[3] & 0x80) if time_size.value > 3 else None) 25 hours = (time_bytes[3] & 0x1F if time_size.value > 3 else None) 26 day_of_week = (time_bytes[4] >> 5 if time_size.value > 4 else None) 27 day_of_month = (time_bytes[4] & 0x1F if time_size.value > 4 else None) 28 months = (time_bytes[5] & 0x0F if time_size.value > 4 else None) 29 years = (time_bytes[6] & 0x7F if time_size.value > 4 else None) 30 31 return common.Time(size=time_size, 32 milliseconds=milliseconds, 33 invalid=invalid, 34 minutes=minutes, 35 summer_time=summer_time, 36 hours=hours, 37 day_of_week=day_of_week, 38 day_of_month=day_of_month, 39 months=months, 40 years=years) 41 42 43def encode_time(time: common.Time, 44 time_size: common.TimeSize 45 ) -> typing.Iterable[int]: 46 if time_size.value > time.size.value: 47 raise ValueError('unsupported time size') 48 49 yield time.milliseconds & 0xFF 50 yield (time.milliseconds >> 8) & 0xFF 51 52 if time_size.value > 2: 53 yield ((0x80 if time.invalid else 0) | 54 (time.minutes & 0x3F)) 55 56 if time_size.value > 3: 57 yield ((0x80 if time.summer_time else 0) | 58 (time.hours & 0x1F)) 59 60 if time_size.value > 4: 61 yield (((time.day_of_week & 0x07) << 5) | 62 (time.day_of_month & 0x1F)) 63 yield time.months & 0x0F 64 yield time.years & 0x7F 65 66 67class Encoder: 68 69 def __init__(self, 70 cause_size: common.CauseSize, 71 asdu_address_size: common.AsduAddressSize, 72 io_address_size: common.IoAddressSize, 73 asdu_type_time_sizes: AsduTypeTimeSizes, 74 inverted_sequence_bit: bool, 75 decode_io_element_cb: DecodeIoElementCb, 76 encode_io_element_cb: EncodeIoElementCb): 77 self._cause_size = cause_size 78 self._asdu_address_size = asdu_address_size 79 self._io_address_size = io_address_size 80 self._asdu_type_time_sizes = asdu_type_time_sizes 81 self._inverted_sequence_bit = inverted_sequence_bit 82 self._decode_io_element_cb = decode_io_element_cb 83 self._encode_io_element_cb = encode_io_element_cb 84 85 @property 86 def cause_size(self) -> common.CauseSize: 87 return self._cause_size 88 89 @property 90 def asdu_address_size(self) -> common.AsduAddressSize: 91 return self._asdu_address_size 92 93 @property 94 def io_address_size(self) -> common.IoAddressSize: 95 return self._io_address_size 96 97 def decode_asdu(self, 98 asdu_bytes: util.Bytes 99 ) -> tuple[common.ASDU, util.Bytes]: 100 asdu_type = asdu_bytes[0] 101 io_number = asdu_bytes[1] & 0x7F 102 is_sequence = bool(asdu_bytes[1] & 0x80) 103 if self._inverted_sequence_bit: 104 is_sequence = not is_sequence 105 io_count = 1 if is_sequence else io_number 106 ioe_element_count = io_number if is_sequence else 1 107 108 rest = asdu_bytes[2:] 109 cause, rest = _decode_int(rest, self._cause_size.value) 110 address, rest = _decode_int(rest, self._asdu_address_size.value) 111 112 ios = collections.deque() 113 for _ in range(io_count): 114 io, rest = self._decode_io(asdu_type, ioe_element_count, rest) 115 ios.append(io) 116 117 asdu = common.ASDU(type=asdu_type, 118 cause=cause, 119 address=address, 120 ios=list(ios)) 121 return asdu, rest 122 123 def encode_asdu(self, asdu: common.ASDU) -> util.Bytes: 124 data = collections.deque() 125 data.append(asdu.type) 126 127 is_sequence = len(asdu.ios) == 1 and len(asdu.ios[0].elements) > 1 128 if is_sequence: 129 data.append( 130 (0x00 if self._inverted_sequence_bit else 0x80) | 131 len(asdu.ios[0].elements)) 132 else: 133 data.append( 134 (0x80 if self._inverted_sequence_bit else 0x00) | 135 len(asdu.ios)) 136 137 data.extend(_encode_int(asdu.cause, self._cause_size.value)) 138 data.extend(_encode_int(asdu.address, self._asdu_address_size.value)) 139 140 for io in asdu.ios: 141 if not is_sequence and len(io.elements) != 1: 142 raise ValueError('invalid number of IO elements') 143 data.extend(self._encode_io(asdu.type, io)) 144 145 return bytes(data) 146 147 def _decode_io(self, asdu_type, ioe_element_count, io_bytes): 148 address, rest = _decode_int(io_bytes, self._io_address_size.value) 149 150 elements = collections.deque() 151 for _ in range(ioe_element_count): 152 element, rest = self._decode_io_element_cb(rest, asdu_type) 153 elements.append(element) 154 155 time_size = self._asdu_type_time_sizes.get(asdu_type) 156 if time_size: 157 time, rest = decode_time(rest, time_size), rest[time_size.value:] 158 else: 159 time = None 160 161 io = common.IO(address=address, 162 elements=list(elements), 163 time=time) 164 return io, rest 165 166 def _encode_io(self, asdu_type, io): 167 yield from _encode_int(io.address, self._io_address_size.value) 168 169 for element in io.elements: 170 yield from self._encode_io_element_cb(element, asdu_type) 171 172 time_size = self._asdu_type_time_sizes.get(asdu_type) 173 if time_size: 174 yield from encode_time(io.time, time_size) 175 176 177def _decode_int(data, size): 178 return int.from_bytes(data[:size], 'little'), data[size:] 179 180 181def _encode_int(x, size): 182 return x.to_bytes(size, 'little')
AsduType: TypeAlias =
int
DecodeIoElementCb: TypeAlias =
Callable[[bytes | bytearray | memoryview, int], tuple[Any, bytes | bytearray | memoryview]]
EncodeIoElementCb: TypeAlias =
Callable[[Any, int], Iterable[int]]
def
decode_time( time_bytes: bytes | bytearray | memoryview, time_size: hat.drivers.iec60870.encodings.common.TimeSize) -> hat.drivers.iec60870.encodings.common.Time:
19def decode_time(time_bytes: util.Bytes, 20 time_size: common.TimeSize 21 ) -> common.Time: 22 milliseconds = (time_bytes[1] << 8) | time_bytes[0] 23 invalid = (bool(time_bytes[2] & 0x80) if time_size.value > 2 else None) 24 minutes = (time_bytes[2] & 0x3F if time_size.value > 2 else None) 25 summer_time = (bool(time_bytes[3] & 0x80) if time_size.value > 3 else None) 26 hours = (time_bytes[3] & 0x1F if time_size.value > 3 else None) 27 day_of_week = (time_bytes[4] >> 5 if time_size.value > 4 else None) 28 day_of_month = (time_bytes[4] & 0x1F if time_size.value > 4 else None) 29 months = (time_bytes[5] & 0x0F if time_size.value > 4 else None) 30 years = (time_bytes[6] & 0x7F if time_size.value > 4 else None) 31 32 return common.Time(size=time_size, 33 milliseconds=milliseconds, 34 invalid=invalid, 35 minutes=minutes, 36 summer_time=summer_time, 37 hours=hours, 38 day_of_week=day_of_week, 39 day_of_month=day_of_month, 40 months=months, 41 years=years)
def
encode_time( time: hat.drivers.iec60870.encodings.common.Time, time_size: hat.drivers.iec60870.encodings.common.TimeSize) -> Iterable[int]:
44def encode_time(time: common.Time, 45 time_size: common.TimeSize 46 ) -> typing.Iterable[int]: 47 if time_size.value > time.size.value: 48 raise ValueError('unsupported time size') 49 50 yield time.milliseconds & 0xFF 51 yield (time.milliseconds >> 8) & 0xFF 52 53 if time_size.value > 2: 54 yield ((0x80 if time.invalid else 0) | 55 (time.minutes & 0x3F)) 56 57 if time_size.value > 3: 58 yield ((0x80 if time.summer_time else 0) | 59 (time.hours & 0x1F)) 60 61 if time_size.value > 4: 62 yield (((time.day_of_week & 0x07) << 5) | 63 (time.day_of_month & 0x1F)) 64 yield time.months & 0x0F 65 yield time.years & 0x7F
class
Encoder:
68class Encoder: 69 70 def __init__(self, 71 cause_size: common.CauseSize, 72 asdu_address_size: common.AsduAddressSize, 73 io_address_size: common.IoAddressSize, 74 asdu_type_time_sizes: AsduTypeTimeSizes, 75 inverted_sequence_bit: bool, 76 decode_io_element_cb: DecodeIoElementCb, 77 encode_io_element_cb: EncodeIoElementCb): 78 self._cause_size = cause_size 79 self._asdu_address_size = asdu_address_size 80 self._io_address_size = io_address_size 81 self._asdu_type_time_sizes = asdu_type_time_sizes 82 self._inverted_sequence_bit = inverted_sequence_bit 83 self._decode_io_element_cb = decode_io_element_cb 84 self._encode_io_element_cb = encode_io_element_cb 85 86 @property 87 def cause_size(self) -> common.CauseSize: 88 return self._cause_size 89 90 @property 91 def asdu_address_size(self) -> common.AsduAddressSize: 92 return self._asdu_address_size 93 94 @property 95 def io_address_size(self) -> common.IoAddressSize: 96 return self._io_address_size 97 98 def decode_asdu(self, 99 asdu_bytes: util.Bytes 100 ) -> tuple[common.ASDU, util.Bytes]: 101 asdu_type = asdu_bytes[0] 102 io_number = asdu_bytes[1] & 0x7F 103 is_sequence = bool(asdu_bytes[1] & 0x80) 104 if self._inverted_sequence_bit: 105 is_sequence = not is_sequence 106 io_count = 1 if is_sequence else io_number 107 ioe_element_count = io_number if is_sequence else 1 108 109 rest = asdu_bytes[2:] 110 cause, rest = _decode_int(rest, self._cause_size.value) 111 address, rest = _decode_int(rest, self._asdu_address_size.value) 112 113 ios = collections.deque() 114 for _ in range(io_count): 115 io, rest = self._decode_io(asdu_type, ioe_element_count, rest) 116 ios.append(io) 117 118 asdu = common.ASDU(type=asdu_type, 119 cause=cause, 120 address=address, 121 ios=list(ios)) 122 return asdu, rest 123 124 def encode_asdu(self, asdu: common.ASDU) -> util.Bytes: 125 data = collections.deque() 126 data.append(asdu.type) 127 128 is_sequence = len(asdu.ios) == 1 and len(asdu.ios[0].elements) > 1 129 if is_sequence: 130 data.append( 131 (0x00 if self._inverted_sequence_bit else 0x80) | 132 len(asdu.ios[0].elements)) 133 else: 134 data.append( 135 (0x80 if self._inverted_sequence_bit else 0x00) | 136 len(asdu.ios)) 137 138 data.extend(_encode_int(asdu.cause, self._cause_size.value)) 139 data.extend(_encode_int(asdu.address, self._asdu_address_size.value)) 140 141 for io in asdu.ios: 142 if not is_sequence and len(io.elements) != 1: 143 raise ValueError('invalid number of IO elements') 144 data.extend(self._encode_io(asdu.type, io)) 145 146 return bytes(data) 147 148 def _decode_io(self, asdu_type, ioe_element_count, io_bytes): 149 address, rest = _decode_int(io_bytes, self._io_address_size.value) 150 151 elements = collections.deque() 152 for _ in range(ioe_element_count): 153 element, rest = self._decode_io_element_cb(rest, asdu_type) 154 elements.append(element) 155 156 time_size = self._asdu_type_time_sizes.get(asdu_type) 157 if time_size: 158 time, rest = decode_time(rest, time_size), rest[time_size.value:] 159 else: 160 time = None 161 162 io = common.IO(address=address, 163 elements=list(elements), 164 time=time) 165 return io, rest 166 167 def _encode_io(self, asdu_type, io): 168 yield from _encode_int(io.address, self._io_address_size.value) 169 170 for element in io.elements: 171 yield from self._encode_io_element_cb(element, asdu_type) 172 173 time_size = self._asdu_type_time_sizes.get(asdu_type) 174 if time_size: 175 yield from encode_time(io.time, time_size)
Encoder( cause_size: hat.drivers.iec60870.encodings.common.CauseSize, asdu_address_size: hat.drivers.iec60870.encodings.common.AsduAddressSize, io_address_size: hat.drivers.iec60870.encodings.common.IoAddressSize, asdu_type_time_sizes: dict[int, hat.drivers.iec60870.encodings.common.TimeSize], inverted_sequence_bit: bool, decode_io_element_cb: Callable[[bytes | bytearray | memoryview, int], tuple[Any, bytes | bytearray | memoryview]], encode_io_element_cb: Callable[[Any, int], Iterable[int]])
70 def __init__(self, 71 cause_size: common.CauseSize, 72 asdu_address_size: common.AsduAddressSize, 73 io_address_size: common.IoAddressSize, 74 asdu_type_time_sizes: AsduTypeTimeSizes, 75 inverted_sequence_bit: bool, 76 decode_io_element_cb: DecodeIoElementCb, 77 encode_io_element_cb: EncodeIoElementCb): 78 self._cause_size = cause_size 79 self._asdu_address_size = asdu_address_size 80 self._io_address_size = io_address_size 81 self._asdu_type_time_sizes = asdu_type_time_sizes 82 self._inverted_sequence_bit = inverted_sequence_bit 83 self._decode_io_element_cb = decode_io_element_cb 84 self._encode_io_element_cb = encode_io_element_cb
cause_size: hat.drivers.iec60870.encodings.common.CauseSize
asdu_address_size: hat.drivers.iec60870.encodings.common.AsduAddressSize
io_address_size: hat.drivers.iec60870.encodings.common.IoAddressSize
def
decode_asdu( self, asdu_bytes: bytes | bytearray | memoryview) -> tuple[hat.drivers.iec60870.encodings.common.ASDU, bytes | bytearray | memoryview]:
98 def decode_asdu(self, 99 asdu_bytes: util.Bytes 100 ) -> tuple[common.ASDU, util.Bytes]: 101 asdu_type = asdu_bytes[0] 102 io_number = asdu_bytes[1] & 0x7F 103 is_sequence = bool(asdu_bytes[1] & 0x80) 104 if self._inverted_sequence_bit: 105 is_sequence = not is_sequence 106 io_count = 1 if is_sequence else io_number 107 ioe_element_count = io_number if is_sequence else 1 108 109 rest = asdu_bytes[2:] 110 cause, rest = _decode_int(rest, self._cause_size.value) 111 address, rest = _decode_int(rest, self._asdu_address_size.value) 112 113 ios = collections.deque() 114 for _ in range(io_count): 115 io, rest = self._decode_io(asdu_type, ioe_element_count, rest) 116 ios.append(io) 117 118 asdu = common.ASDU(type=asdu_type, 119 cause=cause, 120 address=address, 121 ios=list(ios)) 122 return asdu, rest
def
encode_asdu( self, asdu: hat.drivers.iec60870.encodings.common.ASDU) -> bytes | bytearray | memoryview:
124 def encode_asdu(self, asdu: common.ASDU) -> util.Bytes: 125 data = collections.deque() 126 data.append(asdu.type) 127 128 is_sequence = len(asdu.ios) == 1 and len(asdu.ios[0].elements) > 1 129 if is_sequence: 130 data.append( 131 (0x00 if self._inverted_sequence_bit else 0x80) | 132 len(asdu.ios[0].elements)) 133 else: 134 data.append( 135 (0x80 if self._inverted_sequence_bit else 0x00) | 136 len(asdu.ios)) 137 138 data.extend(_encode_int(asdu.cause, self._cause_size.value)) 139 data.extend(_encode_int(asdu.address, self._asdu_address_size.value)) 140 141 for io in asdu.ios: 142 if not is_sequence and len(io.elements) != 1: 143 raise ValueError('invalid number of IO elements') 144 data.extend(self._encode_io(asdu.type, io)) 145 146 return bytes(data)