hat.drivers.iec60870.encodings.encoder
1import collections 2import typing 3 4from hat import util 5 6from hat.drivers.iec60870.encodings import common 7 8 9AsduType: typing.TypeAlias = int 10AsduTypeTimeSizes: typing.TypeAlias = dict[AsduType, common.TimeSize] 11DecodeIoElementCb: typing.TypeAlias = typing.Callable[[util.Bytes, AsduType], 12 tuple[typing.Any, 13 util.Bytes]] 14EncodeIoElementCb: typing.TypeAlias = typing.Callable[[typing.Any, AsduType], 15 typing.Iterable[int]] 16 17 18def decode_time(time_bytes: util.Bytes, 19 time_size: common.TimeSize 20 ) -> common.Time: 21 milliseconds = (time_bytes[1] << 8) | time_bytes[0] 22 invalid = (bool(time_bytes[2] & 0x80) if time_size.value > 2 else None) 23 substituted = (bool(time_bytes[2] & 0x40) if time_size.value > 2 else None) 24 minutes = (time_bytes[2] & 0x3F if time_size.value > 2 else None) 25 summer_time = (bool(time_bytes[3] & 0x80) if time_size.value > 3 else None) 26 hours = (time_bytes[3] & 0x1F if time_size.value > 3 else None) 27 day_of_week = (time_bytes[4] >> 5 if time_size.value > 4 else None) 28 day_of_month = (time_bytes[4] & 0x1F if time_size.value > 4 else None) 29 months = (time_bytes[5] & 0x0F if time_size.value > 4 else None) 30 years = (time_bytes[6] & 0x7F if time_size.value > 4 else None) 31 32 return common.Time(size=time_size, 33 milliseconds=milliseconds, 34 invalid=invalid, 35 substituted=substituted, 36 minutes=minutes, 37 summer_time=summer_time, 38 hours=hours, 39 day_of_week=day_of_week, 40 day_of_month=day_of_month, 41 months=months, 42 years=years) 43 44 45def encode_time(time: common.Time, 46 time_size: common.TimeSize 47 ) -> typing.Iterable[int]: 48 if time_size.value > time.size.value: 49 raise ValueError('unsupported time size') 50 51 yield time.milliseconds & 0xFF 52 yield (time.milliseconds >> 8) & 0xFF 53 54 if time_size.value > 2: 55 yield ((0x80 if time.invalid else 0) | 56 (0x40 if time.substituted else 0) | 57 (time.minutes & 0x3F)) 58 59 if time_size.value > 3: 60 yield ((0x80 if time.summer_time else 0) | 61 (time.hours & 0x1F)) 62 63 if time_size.value > 4: 64 yield (((time.day_of_week & 0x07) << 5) | 65 (time.day_of_month & 0x1F)) 66 yield time.months & 0x0F 67 yield time.years & 0x7F 68 69 70class Encoder: 71 72 def __init__(self, 73 cause_size: common.CauseSize, 74 asdu_address_size: common.AsduAddressSize, 75 io_address_size: common.IoAddressSize, 76 asdu_type_time_sizes: AsduTypeTimeSizes, 77 inverted_sequence_bit: bool, 78 decode_io_element_cb: DecodeIoElementCb, 79 encode_io_element_cb: EncodeIoElementCb): 80 self._cause_size = cause_size 81 self._asdu_address_size = asdu_address_size 82 self._io_address_size = io_address_size 83 self._asdu_type_time_sizes = asdu_type_time_sizes 84 self._inverted_sequence_bit = inverted_sequence_bit 85 self._decode_io_element_cb = decode_io_element_cb 86 self._encode_io_element_cb = encode_io_element_cb 87 88 @property 89 def cause_size(self) -> common.CauseSize: 90 return self._cause_size 91 92 @property 93 def asdu_address_size(self) -> common.AsduAddressSize: 94 return self._asdu_address_size 95 96 @property 97 def io_address_size(self) -> common.IoAddressSize: 98 return self._io_address_size 99 100 def decode_asdu(self, 101 asdu_bytes: util.Bytes 102 ) -> tuple[common.ASDU, util.Bytes]: 103 asdu_type = asdu_bytes[0] 104 io_number = asdu_bytes[1] & 0x7F 105 is_sequence = bool(asdu_bytes[1] & 0x80) 106 if self._inverted_sequence_bit: 107 is_sequence = not is_sequence 108 io_count = 1 if is_sequence else io_number 109 ioe_element_count = io_number if is_sequence else 1 110 111 rest = asdu_bytes[2:] 112 cause, rest = _decode_int(rest, self._cause_size.value) 113 address, rest = _decode_int(rest, self._asdu_address_size.value) 114 115 ios = collections.deque() 116 for _ in range(io_count): 117 io, rest = self._decode_io(asdu_type, ioe_element_count, rest) 118 ios.append(io) 119 120 asdu = common.ASDU(type=asdu_type, 121 cause=cause, 122 address=address, 123 ios=list(ios)) 124 return asdu, rest 125 126 def encode_asdu(self, asdu: common.ASDU) -> util.Bytes: 127 data = collections.deque() 128 data.append(asdu.type) 129 130 is_sequence = len(asdu.ios) == 1 and len(asdu.ios[0].elements) > 1 131 if is_sequence: 132 data.append( 133 (0x00 if self._inverted_sequence_bit else 0x80) | 134 len(asdu.ios[0].elements)) 135 else: 136 data.append( 137 (0x80 if self._inverted_sequence_bit else 0x00) | 138 len(asdu.ios)) 139 140 data.extend(_encode_int(asdu.cause, self._cause_size.value)) 141 data.extend(_encode_int(asdu.address, self._asdu_address_size.value)) 142 143 for io in asdu.ios: 144 if not is_sequence and len(io.elements) != 1: 145 raise ValueError('invalid number of IO elements') 146 data.extend(self._encode_io(asdu.type, io)) 147 148 return bytes(data) 149 150 def _decode_io(self, asdu_type, ioe_element_count, io_bytes): 151 address, rest = _decode_int(io_bytes, self._io_address_size.value) 152 153 elements = collections.deque() 154 for _ in range(ioe_element_count): 155 element, rest = self._decode_io_element_cb(rest, asdu_type) 156 elements.append(element) 157 158 time_size = self._asdu_type_time_sizes.get(asdu_type) 159 if time_size: 160 time, rest = decode_time(rest, time_size), rest[time_size.value:] 161 else: 162 time = None 163 164 io = common.IO(address=address, 165 elements=list(elements), 166 time=time) 167 return io, rest 168 169 def _encode_io(self, asdu_type, io): 170 yield from _encode_int(io.address, self._io_address_size.value) 171 172 for element in io.elements: 173 yield from self._encode_io_element_cb(element, asdu_type) 174 175 time_size = self._asdu_type_time_sizes.get(asdu_type) 176 if time_size: 177 yield from encode_time(io.time, time_size) 178 179 180def _decode_int(data, size): 181 return int.from_bytes(data[:size], 'little'), data[size:] 182 183 184def _encode_int(x, size): 185 return x.to_bytes(size, 'little')
AsduType: TypeAlias =
int
DecodeIoElementCb: TypeAlias =
Callable[[bytes | bytearray | memoryview, int], tuple[Any, bytes | bytearray | memoryview]]
EncodeIoElementCb: TypeAlias =
Callable[[Any, int], Iterable[int]]
def
decode_time( time_bytes: bytes | bytearray | memoryview, time_size: hat.drivers.iec60870.encodings.common.TimeSize) -> hat.drivers.iec60870.encodings.common.Time:
19def decode_time(time_bytes: util.Bytes, 20 time_size: common.TimeSize 21 ) -> common.Time: 22 milliseconds = (time_bytes[1] << 8) | time_bytes[0] 23 invalid = (bool(time_bytes[2] & 0x80) if time_size.value > 2 else None) 24 substituted = (bool(time_bytes[2] & 0x40) if time_size.value > 2 else None) 25 minutes = (time_bytes[2] & 0x3F if time_size.value > 2 else None) 26 summer_time = (bool(time_bytes[3] & 0x80) if time_size.value > 3 else None) 27 hours = (time_bytes[3] & 0x1F if time_size.value > 3 else None) 28 day_of_week = (time_bytes[4] >> 5 if time_size.value > 4 else None) 29 day_of_month = (time_bytes[4] & 0x1F if time_size.value > 4 else None) 30 months = (time_bytes[5] & 0x0F if time_size.value > 4 else None) 31 years = (time_bytes[6] & 0x7F if time_size.value > 4 else None) 32 33 return common.Time(size=time_size, 34 milliseconds=milliseconds, 35 invalid=invalid, 36 substituted=substituted, 37 minutes=minutes, 38 summer_time=summer_time, 39 hours=hours, 40 day_of_week=day_of_week, 41 day_of_month=day_of_month, 42 months=months, 43 years=years)
def
encode_time( time: hat.drivers.iec60870.encodings.common.Time, time_size: hat.drivers.iec60870.encodings.common.TimeSize) -> Iterable[int]:
46def encode_time(time: common.Time, 47 time_size: common.TimeSize 48 ) -> typing.Iterable[int]: 49 if time_size.value > time.size.value: 50 raise ValueError('unsupported time size') 51 52 yield time.milliseconds & 0xFF 53 yield (time.milliseconds >> 8) & 0xFF 54 55 if time_size.value > 2: 56 yield ((0x80 if time.invalid else 0) | 57 (0x40 if time.substituted else 0) | 58 (time.minutes & 0x3F)) 59 60 if time_size.value > 3: 61 yield ((0x80 if time.summer_time else 0) | 62 (time.hours & 0x1F)) 63 64 if time_size.value > 4: 65 yield (((time.day_of_week & 0x07) << 5) | 66 (time.day_of_month & 0x1F)) 67 yield time.months & 0x0F 68 yield time.years & 0x7F
class
Encoder:
71class Encoder: 72 73 def __init__(self, 74 cause_size: common.CauseSize, 75 asdu_address_size: common.AsduAddressSize, 76 io_address_size: common.IoAddressSize, 77 asdu_type_time_sizes: AsduTypeTimeSizes, 78 inverted_sequence_bit: bool, 79 decode_io_element_cb: DecodeIoElementCb, 80 encode_io_element_cb: EncodeIoElementCb): 81 self._cause_size = cause_size 82 self._asdu_address_size = asdu_address_size 83 self._io_address_size = io_address_size 84 self._asdu_type_time_sizes = asdu_type_time_sizes 85 self._inverted_sequence_bit = inverted_sequence_bit 86 self._decode_io_element_cb = decode_io_element_cb 87 self._encode_io_element_cb = encode_io_element_cb 88 89 @property 90 def cause_size(self) -> common.CauseSize: 91 return self._cause_size 92 93 @property 94 def asdu_address_size(self) -> common.AsduAddressSize: 95 return self._asdu_address_size 96 97 @property 98 def io_address_size(self) -> common.IoAddressSize: 99 return self._io_address_size 100 101 def decode_asdu(self, 102 asdu_bytes: util.Bytes 103 ) -> tuple[common.ASDU, util.Bytes]: 104 asdu_type = asdu_bytes[0] 105 io_number = asdu_bytes[1] & 0x7F 106 is_sequence = bool(asdu_bytes[1] & 0x80) 107 if self._inverted_sequence_bit: 108 is_sequence = not is_sequence 109 io_count = 1 if is_sequence else io_number 110 ioe_element_count = io_number if is_sequence else 1 111 112 rest = asdu_bytes[2:] 113 cause, rest = _decode_int(rest, self._cause_size.value) 114 address, rest = _decode_int(rest, self._asdu_address_size.value) 115 116 ios = collections.deque() 117 for _ in range(io_count): 118 io, rest = self._decode_io(asdu_type, ioe_element_count, rest) 119 ios.append(io) 120 121 asdu = common.ASDU(type=asdu_type, 122 cause=cause, 123 address=address, 124 ios=list(ios)) 125 return asdu, rest 126 127 def encode_asdu(self, asdu: common.ASDU) -> util.Bytes: 128 data = collections.deque() 129 data.append(asdu.type) 130 131 is_sequence = len(asdu.ios) == 1 and len(asdu.ios[0].elements) > 1 132 if is_sequence: 133 data.append( 134 (0x00 if self._inverted_sequence_bit else 0x80) | 135 len(asdu.ios[0].elements)) 136 else: 137 data.append( 138 (0x80 if self._inverted_sequence_bit else 0x00) | 139 len(asdu.ios)) 140 141 data.extend(_encode_int(asdu.cause, self._cause_size.value)) 142 data.extend(_encode_int(asdu.address, self._asdu_address_size.value)) 143 144 for io in asdu.ios: 145 if not is_sequence and len(io.elements) != 1: 146 raise ValueError('invalid number of IO elements') 147 data.extend(self._encode_io(asdu.type, io)) 148 149 return bytes(data) 150 151 def _decode_io(self, asdu_type, ioe_element_count, io_bytes): 152 address, rest = _decode_int(io_bytes, self._io_address_size.value) 153 154 elements = collections.deque() 155 for _ in range(ioe_element_count): 156 element, rest = self._decode_io_element_cb(rest, asdu_type) 157 elements.append(element) 158 159 time_size = self._asdu_type_time_sizes.get(asdu_type) 160 if time_size: 161 time, rest = decode_time(rest, time_size), rest[time_size.value:] 162 else: 163 time = None 164 165 io = common.IO(address=address, 166 elements=list(elements), 167 time=time) 168 return io, rest 169 170 def _encode_io(self, asdu_type, io): 171 yield from _encode_int(io.address, self._io_address_size.value) 172 173 for element in io.elements: 174 yield from self._encode_io_element_cb(element, asdu_type) 175 176 time_size = self._asdu_type_time_sizes.get(asdu_type) 177 if time_size: 178 yield from encode_time(io.time, time_size)
Encoder( cause_size: hat.drivers.iec60870.encodings.common.CauseSize, asdu_address_size: hat.drivers.iec60870.encodings.common.AsduAddressSize, io_address_size: hat.drivers.iec60870.encodings.common.IoAddressSize, asdu_type_time_sizes: dict[int, hat.drivers.iec60870.encodings.common.TimeSize], inverted_sequence_bit: bool, decode_io_element_cb: Callable[[bytes | bytearray | memoryview, int], tuple[Any, bytes | bytearray | memoryview]], encode_io_element_cb: Callable[[Any, int], Iterable[int]])
73 def __init__(self, 74 cause_size: common.CauseSize, 75 asdu_address_size: common.AsduAddressSize, 76 io_address_size: common.IoAddressSize, 77 asdu_type_time_sizes: AsduTypeTimeSizes, 78 inverted_sequence_bit: bool, 79 decode_io_element_cb: DecodeIoElementCb, 80 encode_io_element_cb: EncodeIoElementCb): 81 self._cause_size = cause_size 82 self._asdu_address_size = asdu_address_size 83 self._io_address_size = io_address_size 84 self._asdu_type_time_sizes = asdu_type_time_sizes 85 self._inverted_sequence_bit = inverted_sequence_bit 86 self._decode_io_element_cb = decode_io_element_cb 87 self._encode_io_element_cb = encode_io_element_cb
cause_size: hat.drivers.iec60870.encodings.common.CauseSize
asdu_address_size: hat.drivers.iec60870.encodings.common.AsduAddressSize
io_address_size: hat.drivers.iec60870.encodings.common.IoAddressSize
def
decode_asdu( self, asdu_bytes: bytes | bytearray | memoryview) -> tuple[hat.drivers.iec60870.encodings.common.ASDU, bytes | bytearray | memoryview]:
101 def decode_asdu(self, 102 asdu_bytes: util.Bytes 103 ) -> tuple[common.ASDU, util.Bytes]: 104 asdu_type = asdu_bytes[0] 105 io_number = asdu_bytes[1] & 0x7F 106 is_sequence = bool(asdu_bytes[1] & 0x80) 107 if self._inverted_sequence_bit: 108 is_sequence = not is_sequence 109 io_count = 1 if is_sequence else io_number 110 ioe_element_count = io_number if is_sequence else 1 111 112 rest = asdu_bytes[2:] 113 cause, rest = _decode_int(rest, self._cause_size.value) 114 address, rest = _decode_int(rest, self._asdu_address_size.value) 115 116 ios = collections.deque() 117 for _ in range(io_count): 118 io, rest = self._decode_io(asdu_type, ioe_element_count, rest) 119 ios.append(io) 120 121 asdu = common.ASDU(type=asdu_type, 122 cause=cause, 123 address=address, 124 ios=list(ios)) 125 return asdu, rest
def
encode_asdu( self, asdu: hat.drivers.iec60870.encodings.common.ASDU) -> bytes | bytearray | memoryview:
127 def encode_asdu(self, asdu: common.ASDU) -> util.Bytes: 128 data = collections.deque() 129 data.append(asdu.type) 130 131 is_sequence = len(asdu.ios) == 1 and len(asdu.ios[0].elements) > 1 132 if is_sequence: 133 data.append( 134 (0x00 if self._inverted_sequence_bit else 0x80) | 135 len(asdu.ios[0].elements)) 136 else: 137 data.append( 138 (0x80 if self._inverted_sequence_bit else 0x00) | 139 len(asdu.ios)) 140 141 data.extend(_encode_int(asdu.cause, self._cause_size.value)) 142 data.extend(_encode_int(asdu.address, self._asdu_address_size.value)) 143 144 for io in asdu.ios: 145 if not is_sequence and len(io.elements) != 1: 146 raise ValueError('invalid number of IO elements') 147 data.extend(self._encode_io(asdu.type, io)) 148 149 return bytes(data)