hat.drivers.iec60870.encodings.encoder

  1import collections
  2import typing
  3
  4from hat import util
  5
  6from hat.drivers.iec60870.encodings import common
  7
  8
  9AsduType: typing.TypeAlias = int
 10AsduTypeTimeSizes: typing.TypeAlias = dict[AsduType, common.TimeSize]
 11DecodeIoElementCb: typing.TypeAlias = typing.Callable[[util.Bytes, AsduType],
 12                                                      tuple[typing.Any,
 13                                                            util.Bytes]]
 14EncodeIoElementCb: typing.TypeAlias = typing.Callable[[typing.Any, AsduType],
 15                                                      typing.Iterable[int]]
 16
 17
 18def decode_time(time_bytes: util.Bytes,
 19                time_size: common.TimeSize
 20                ) -> common.Time:
 21    milliseconds = (time_bytes[1] << 8) | time_bytes[0]
 22    invalid = (bool(time_bytes[2] & 0x80) if time_size.value > 2 else None)
 23    substituted = (bool(time_bytes[2] & 0x40) if time_size.value > 2 else None)
 24    minutes = (time_bytes[2] & 0x3F if time_size.value > 2 else None)
 25    summer_time = (bool(time_bytes[3] & 0x80) if time_size.value > 3 else None)
 26    hours = (time_bytes[3] & 0x1F if time_size.value > 3 else None)
 27    day_of_week = (time_bytes[4] >> 5 if time_size.value > 4 else None)
 28    day_of_month = (time_bytes[4] & 0x1F if time_size.value > 4 else None)
 29    months = (time_bytes[5] & 0x0F if time_size.value > 4 else None)
 30    years = (time_bytes[6] & 0x7F if time_size.value > 4 else None)
 31
 32    return common.Time(size=time_size,
 33                       milliseconds=milliseconds,
 34                       invalid=invalid,
 35                       substituted=substituted,
 36                       minutes=minutes,
 37                       summer_time=summer_time,
 38                       hours=hours,
 39                       day_of_week=day_of_week,
 40                       day_of_month=day_of_month,
 41                       months=months,
 42                       years=years)
 43
 44
 45def encode_time(time: common.Time,
 46                time_size: common.TimeSize
 47                ) -> typing.Iterable[int]:
 48    if time_size.value > time.size.value:
 49        raise ValueError('unsupported time size')
 50
 51    yield time.milliseconds & 0xFF
 52    yield (time.milliseconds >> 8) & 0xFF
 53
 54    if time_size.value > 2:
 55        yield ((0x80 if time.invalid else 0) |
 56               (0x40 if time.substituted else 0) |
 57               (time.minutes & 0x3F))
 58
 59    if time_size.value > 3:
 60        yield ((0x80 if time.summer_time else 0) |
 61               (time.hours & 0x1F))
 62
 63    if time_size.value > 4:
 64        yield (((time.day_of_week & 0x07) << 5) |
 65               (time.day_of_month & 0x1F))
 66        yield time.months & 0x0F
 67        yield time.years & 0x7F
 68
 69
 70class Encoder:
 71
 72    def __init__(self,
 73                 cause_size: common.CauseSize,
 74                 asdu_address_size: common.AsduAddressSize,
 75                 io_address_size: common.IoAddressSize,
 76                 asdu_type_time_sizes: AsduTypeTimeSizes,
 77                 inverted_sequence_bit: bool,
 78                 decode_io_element_cb: DecodeIoElementCb,
 79                 encode_io_element_cb: EncodeIoElementCb):
 80        self._cause_size = cause_size
 81        self._asdu_address_size = asdu_address_size
 82        self._io_address_size = io_address_size
 83        self._asdu_type_time_sizes = asdu_type_time_sizes
 84        self._inverted_sequence_bit = inverted_sequence_bit
 85        self._decode_io_element_cb = decode_io_element_cb
 86        self._encode_io_element_cb = encode_io_element_cb
 87
 88    @property
 89    def cause_size(self) -> common.CauseSize:
 90        return self._cause_size
 91
 92    @property
 93    def asdu_address_size(self) -> common.AsduAddressSize:
 94        return self._asdu_address_size
 95
 96    @property
 97    def io_address_size(self) -> common.IoAddressSize:
 98        return self._io_address_size
 99
100    def decode_asdu(self,
101                    asdu_bytes: util.Bytes
102                    ) -> tuple[common.ASDU, util.Bytes]:
103        asdu_type = asdu_bytes[0]
104        io_number = asdu_bytes[1] & 0x7F
105        is_sequence = bool(asdu_bytes[1] & 0x80)
106        if self._inverted_sequence_bit:
107            is_sequence = not is_sequence
108        io_count = 1 if is_sequence else io_number
109        ioe_element_count = io_number if is_sequence else 1
110
111        rest = asdu_bytes[2:]
112        cause, rest = _decode_int(rest, self._cause_size.value)
113        address, rest = _decode_int(rest, self._asdu_address_size.value)
114
115        ios = collections.deque()
116        for _ in range(io_count):
117            io, rest = self._decode_io(asdu_type, ioe_element_count, rest)
118            ios.append(io)
119
120        asdu = common.ASDU(type=asdu_type,
121                           cause=cause,
122                           address=address,
123                           ios=list(ios))
124        return asdu, rest
125
126    def encode_asdu(self, asdu: common.ASDU) -> util.Bytes:
127        data = collections.deque()
128        data.append(asdu.type)
129
130        is_sequence = len(asdu.ios) == 1 and len(asdu.ios[0].elements) > 1
131        if is_sequence:
132            data.append(
133                (0x00 if self._inverted_sequence_bit else 0x80) |
134                len(asdu.ios[0].elements))
135        else:
136            data.append(
137                (0x80 if self._inverted_sequence_bit else 0x00) |
138                len(asdu.ios))
139
140        data.extend(_encode_int(asdu.cause, self._cause_size.value))
141        data.extend(_encode_int(asdu.address, self._asdu_address_size.value))
142
143        for io in asdu.ios:
144            if not is_sequence and len(io.elements) != 1:
145                raise ValueError('invalid number of IO elements')
146            data.extend(self._encode_io(asdu.type, io))
147
148        return bytes(data)
149
150    def _decode_io(self, asdu_type, ioe_element_count, io_bytes):
151        address, rest = _decode_int(io_bytes, self._io_address_size.value)
152
153        elements = collections.deque()
154        for _ in range(ioe_element_count):
155            element, rest = self._decode_io_element_cb(rest, asdu_type)
156            elements.append(element)
157
158        time_size = self._asdu_type_time_sizes.get(asdu_type)
159        if time_size:
160            time, rest = decode_time(rest, time_size), rest[time_size.value:]
161        else:
162            time = None
163
164        io = common.IO(address=address,
165                       elements=list(elements),
166                       time=time)
167        return io, rest
168
169    def _encode_io(self, asdu_type, io):
170        yield from _encode_int(io.address, self._io_address_size.value)
171
172        for element in io.elements:
173            yield from self._encode_io_element_cb(element, asdu_type)
174
175        time_size = self._asdu_type_time_sizes.get(asdu_type)
176        if time_size:
177            yield from encode_time(io.time, time_size)
178
179
180def _decode_int(data, size):
181    return int.from_bytes(data[:size], 'little'), data[size:]
182
183
184def _encode_int(x, size):
185    return x.to_bytes(size, 'little')
AsduType: TypeAlias = int
AsduTypeTimeSizes: TypeAlias = dict[int, hat.drivers.iec60870.encodings.common.TimeSize]
DecodeIoElementCb: TypeAlias = Callable[[bytes | bytearray | memoryview, int], tuple[Any, bytes | bytearray | memoryview]]
EncodeIoElementCb: TypeAlias = Callable[[Any, int], Iterable[int]]
def decode_time( time_bytes: bytes | bytearray | memoryview, time_size: hat.drivers.iec60870.encodings.common.TimeSize) -> hat.drivers.iec60870.encodings.common.Time:
19def decode_time(time_bytes: util.Bytes,
20                time_size: common.TimeSize
21                ) -> common.Time:
22    milliseconds = (time_bytes[1] << 8) | time_bytes[0]
23    invalid = (bool(time_bytes[2] & 0x80) if time_size.value > 2 else None)
24    substituted = (bool(time_bytes[2] & 0x40) if time_size.value > 2 else None)
25    minutes = (time_bytes[2] & 0x3F if time_size.value > 2 else None)
26    summer_time = (bool(time_bytes[3] & 0x80) if time_size.value > 3 else None)
27    hours = (time_bytes[3] & 0x1F if time_size.value > 3 else None)
28    day_of_week = (time_bytes[4] >> 5 if time_size.value > 4 else None)
29    day_of_month = (time_bytes[4] & 0x1F if time_size.value > 4 else None)
30    months = (time_bytes[5] & 0x0F if time_size.value > 4 else None)
31    years = (time_bytes[6] & 0x7F if time_size.value > 4 else None)
32
33    return common.Time(size=time_size,
34                       milliseconds=milliseconds,
35                       invalid=invalid,
36                       substituted=substituted,
37                       minutes=minutes,
38                       summer_time=summer_time,
39                       hours=hours,
40                       day_of_week=day_of_week,
41                       day_of_month=day_of_month,
42                       months=months,
43                       years=years)
def encode_time( time: hat.drivers.iec60870.encodings.common.Time, time_size: hat.drivers.iec60870.encodings.common.TimeSize) -> Iterable[int]:
46def encode_time(time: common.Time,
47                time_size: common.TimeSize
48                ) -> typing.Iterable[int]:
49    if time_size.value > time.size.value:
50        raise ValueError('unsupported time size')
51
52    yield time.milliseconds & 0xFF
53    yield (time.milliseconds >> 8) & 0xFF
54
55    if time_size.value > 2:
56        yield ((0x80 if time.invalid else 0) |
57               (0x40 if time.substituted else 0) |
58               (time.minutes & 0x3F))
59
60    if time_size.value > 3:
61        yield ((0x80 if time.summer_time else 0) |
62               (time.hours & 0x1F))
63
64    if time_size.value > 4:
65        yield (((time.day_of_week & 0x07) << 5) |
66               (time.day_of_month & 0x1F))
67        yield time.months & 0x0F
68        yield time.years & 0x7F
class Encoder:
 71class Encoder:
 72
 73    def __init__(self,
 74                 cause_size: common.CauseSize,
 75                 asdu_address_size: common.AsduAddressSize,
 76                 io_address_size: common.IoAddressSize,
 77                 asdu_type_time_sizes: AsduTypeTimeSizes,
 78                 inverted_sequence_bit: bool,
 79                 decode_io_element_cb: DecodeIoElementCb,
 80                 encode_io_element_cb: EncodeIoElementCb):
 81        self._cause_size = cause_size
 82        self._asdu_address_size = asdu_address_size
 83        self._io_address_size = io_address_size
 84        self._asdu_type_time_sizes = asdu_type_time_sizes
 85        self._inverted_sequence_bit = inverted_sequence_bit
 86        self._decode_io_element_cb = decode_io_element_cb
 87        self._encode_io_element_cb = encode_io_element_cb
 88
 89    @property
 90    def cause_size(self) -> common.CauseSize:
 91        return self._cause_size
 92
 93    @property
 94    def asdu_address_size(self) -> common.AsduAddressSize:
 95        return self._asdu_address_size
 96
 97    @property
 98    def io_address_size(self) -> common.IoAddressSize:
 99        return self._io_address_size
100
101    def decode_asdu(self,
102                    asdu_bytes: util.Bytes
103                    ) -> tuple[common.ASDU, util.Bytes]:
104        asdu_type = asdu_bytes[0]
105        io_number = asdu_bytes[1] & 0x7F
106        is_sequence = bool(asdu_bytes[1] & 0x80)
107        if self._inverted_sequence_bit:
108            is_sequence = not is_sequence
109        io_count = 1 if is_sequence else io_number
110        ioe_element_count = io_number if is_sequence else 1
111
112        rest = asdu_bytes[2:]
113        cause, rest = _decode_int(rest, self._cause_size.value)
114        address, rest = _decode_int(rest, self._asdu_address_size.value)
115
116        ios = collections.deque()
117        for _ in range(io_count):
118            io, rest = self._decode_io(asdu_type, ioe_element_count, rest)
119            ios.append(io)
120
121        asdu = common.ASDU(type=asdu_type,
122                           cause=cause,
123                           address=address,
124                           ios=list(ios))
125        return asdu, rest
126
127    def encode_asdu(self, asdu: common.ASDU) -> util.Bytes:
128        data = collections.deque()
129        data.append(asdu.type)
130
131        is_sequence = len(asdu.ios) == 1 and len(asdu.ios[0].elements) > 1
132        if is_sequence:
133            data.append(
134                (0x00 if self._inverted_sequence_bit else 0x80) |
135                len(asdu.ios[0].elements))
136        else:
137            data.append(
138                (0x80 if self._inverted_sequence_bit else 0x00) |
139                len(asdu.ios))
140
141        data.extend(_encode_int(asdu.cause, self._cause_size.value))
142        data.extend(_encode_int(asdu.address, self._asdu_address_size.value))
143
144        for io in asdu.ios:
145            if not is_sequence and len(io.elements) != 1:
146                raise ValueError('invalid number of IO elements')
147            data.extend(self._encode_io(asdu.type, io))
148
149        return bytes(data)
150
151    def _decode_io(self, asdu_type, ioe_element_count, io_bytes):
152        address, rest = _decode_int(io_bytes, self._io_address_size.value)
153
154        elements = collections.deque()
155        for _ in range(ioe_element_count):
156            element, rest = self._decode_io_element_cb(rest, asdu_type)
157            elements.append(element)
158
159        time_size = self._asdu_type_time_sizes.get(asdu_type)
160        if time_size:
161            time, rest = decode_time(rest, time_size), rest[time_size.value:]
162        else:
163            time = None
164
165        io = common.IO(address=address,
166                       elements=list(elements),
167                       time=time)
168        return io, rest
169
170    def _encode_io(self, asdu_type, io):
171        yield from _encode_int(io.address, self._io_address_size.value)
172
173        for element in io.elements:
174            yield from self._encode_io_element_cb(element, asdu_type)
175
176        time_size = self._asdu_type_time_sizes.get(asdu_type)
177        if time_size:
178            yield from encode_time(io.time, time_size)
Encoder( cause_size: hat.drivers.iec60870.encodings.common.CauseSize, asdu_address_size: hat.drivers.iec60870.encodings.common.AsduAddressSize, io_address_size: hat.drivers.iec60870.encodings.common.IoAddressSize, asdu_type_time_sizes: dict[int, hat.drivers.iec60870.encodings.common.TimeSize], inverted_sequence_bit: bool, decode_io_element_cb: Callable[[bytes | bytearray | memoryview, int], tuple[Any, bytes | bytearray | memoryview]], encode_io_element_cb: Callable[[Any, int], Iterable[int]])
73    def __init__(self,
74                 cause_size: common.CauseSize,
75                 asdu_address_size: common.AsduAddressSize,
76                 io_address_size: common.IoAddressSize,
77                 asdu_type_time_sizes: AsduTypeTimeSizes,
78                 inverted_sequence_bit: bool,
79                 decode_io_element_cb: DecodeIoElementCb,
80                 encode_io_element_cb: EncodeIoElementCb):
81        self._cause_size = cause_size
82        self._asdu_address_size = asdu_address_size
83        self._io_address_size = io_address_size
84        self._asdu_type_time_sizes = asdu_type_time_sizes
85        self._inverted_sequence_bit = inverted_sequence_bit
86        self._decode_io_element_cb = decode_io_element_cb
87        self._encode_io_element_cb = encode_io_element_cb
89    @property
90    def cause_size(self) -> common.CauseSize:
91        return self._cause_size
93    @property
94    def asdu_address_size(self) -> common.AsduAddressSize:
95        return self._asdu_address_size
97    @property
98    def io_address_size(self) -> common.IoAddressSize:
99        return self._io_address_size
def decode_asdu( self, asdu_bytes: bytes | bytearray | memoryview) -> tuple[hat.drivers.iec60870.encodings.common.ASDU, bytes | bytearray | memoryview]:
101    def decode_asdu(self,
102                    asdu_bytes: util.Bytes
103                    ) -> tuple[common.ASDU, util.Bytes]:
104        asdu_type = asdu_bytes[0]
105        io_number = asdu_bytes[1] & 0x7F
106        is_sequence = bool(asdu_bytes[1] & 0x80)
107        if self._inverted_sequence_bit:
108            is_sequence = not is_sequence
109        io_count = 1 if is_sequence else io_number
110        ioe_element_count = io_number if is_sequence else 1
111
112        rest = asdu_bytes[2:]
113        cause, rest = _decode_int(rest, self._cause_size.value)
114        address, rest = _decode_int(rest, self._asdu_address_size.value)
115
116        ios = collections.deque()
117        for _ in range(io_count):
118            io, rest = self._decode_io(asdu_type, ioe_element_count, rest)
119            ios.append(io)
120
121        asdu = common.ASDU(type=asdu_type,
122                           cause=cause,
123                           address=address,
124                           ios=list(ios))
125        return asdu, rest
def encode_asdu( self, asdu: hat.drivers.iec60870.encodings.common.ASDU) -> bytes | bytearray | memoryview:
127    def encode_asdu(self, asdu: common.ASDU) -> util.Bytes:
128        data = collections.deque()
129        data.append(asdu.type)
130
131        is_sequence = len(asdu.ios) == 1 and len(asdu.ios[0].elements) > 1
132        if is_sequence:
133            data.append(
134                (0x00 if self._inverted_sequence_bit else 0x80) |
135                len(asdu.ios[0].elements))
136        else:
137            data.append(
138                (0x80 if self._inverted_sequence_bit else 0x00) |
139                len(asdu.ios))
140
141        data.extend(_encode_int(asdu.cause, self._cause_size.value))
142        data.extend(_encode_int(asdu.address, self._asdu_address_size.value))
143
144        for io in asdu.ios:
145            if not is_sequence and len(io.elements) != 1:
146                raise ValueError('invalid number of IO elements')
147            data.extend(self._encode_io(asdu.type, io))
148
149        return bytes(data)