hat.drivers.iec60870.encodings.encoder

  1import collections
  2import typing
  3
  4from hat import util
  5
  6from hat.drivers.iec60870.encodings import common
  7
  8
  9AsduType: typing.TypeAlias = int
 10AsduTypeTimeSizes: typing.TypeAlias = dict[AsduType, common.TimeSize]
 11DecodeIoElementCb: typing.TypeAlias = typing.Callable[[util.Bytes, AsduType],
 12                                                      tuple[typing.Any,
 13                                                            util.Bytes]]
 14EncodeIoElementCb: typing.TypeAlias = typing.Callable[[typing.Any, AsduType],
 15                                                      typing.Iterable[int]]
 16
 17
 18def decode_time(time_bytes: util.Bytes,
 19                time_size: common.TimeSize
 20                ) -> common.Time:
 21    milliseconds = (time_bytes[1] << 8) | time_bytes[0]
 22    invalid = (bool(time_bytes[2] & 0x80) if time_size.value > 2 else None)
 23    minutes = (time_bytes[2] & 0x3F if time_size.value > 2 else None)
 24    summer_time = (bool(time_bytes[3] & 0x80) if time_size.value > 3 else None)
 25    hours = (time_bytes[3] & 0x1F if time_size.value > 3 else None)
 26    day_of_week = (time_bytes[4] >> 5 if time_size.value > 4 else None)
 27    day_of_month = (time_bytes[4] & 0x1F if time_size.value > 4 else None)
 28    months = (time_bytes[5] & 0x0F if time_size.value > 4 else None)
 29    years = (time_bytes[6] & 0x7F if time_size.value > 4 else None)
 30
 31    return common.Time(size=time_size,
 32                       milliseconds=milliseconds,
 33                       invalid=invalid,
 34                       minutes=minutes,
 35                       summer_time=summer_time,
 36                       hours=hours,
 37                       day_of_week=day_of_week,
 38                       day_of_month=day_of_month,
 39                       months=months,
 40                       years=years)
 41
 42
 43def encode_time(time: common.Time,
 44                time_size: common.TimeSize
 45                ) -> typing.Iterable[int]:
 46    if time_size.value > time.size.value:
 47        raise ValueError('unsupported time size')
 48
 49    yield time.milliseconds & 0xFF
 50    yield (time.milliseconds >> 8) & 0xFF
 51
 52    if time_size.value > 2:
 53        yield ((0x80 if time.invalid else 0) |
 54               (time.minutes & 0x3F))
 55
 56    if time_size.value > 3:
 57        yield ((0x80 if time.summer_time else 0) |
 58               (time.hours & 0x1F))
 59
 60    if time_size.value > 4:
 61        yield (((time.day_of_week & 0x07) << 5) |
 62               (time.day_of_month & 0x1F))
 63        yield time.months & 0x0F
 64        yield time.years & 0x7F
 65
 66
 67class Encoder:
 68
 69    def __init__(self,
 70                 cause_size: common.CauseSize,
 71                 asdu_address_size: common.AsduAddressSize,
 72                 io_address_size: common.IoAddressSize,
 73                 asdu_type_time_sizes: AsduTypeTimeSizes,
 74                 inverted_sequence_bit: bool,
 75                 decode_io_element_cb: DecodeIoElementCb,
 76                 encode_io_element_cb: EncodeIoElementCb):
 77        self._cause_size = cause_size
 78        self._asdu_address_size = asdu_address_size
 79        self._io_address_size = io_address_size
 80        self._asdu_type_time_sizes = asdu_type_time_sizes
 81        self._inverted_sequence_bit = inverted_sequence_bit
 82        self._decode_io_element_cb = decode_io_element_cb
 83        self._encode_io_element_cb = encode_io_element_cb
 84
 85    @property
 86    def cause_size(self) -> common.CauseSize:
 87        return self._cause_size
 88
 89    @property
 90    def asdu_address_size(self) -> common.AsduAddressSize:
 91        return self._asdu_address_size
 92
 93    @property
 94    def io_address_size(self) -> common.IoAddressSize:
 95        return self._io_address_size
 96
 97    def decode_asdu(self,
 98                    asdu_bytes: util.Bytes
 99                    ) -> tuple[common.ASDU, util.Bytes]:
100        asdu_type = asdu_bytes[0]
101        io_number = asdu_bytes[1] & 0x7F
102        is_sequence = bool(asdu_bytes[1] & 0x80)
103        if self._inverted_sequence_bit:
104            is_sequence = not is_sequence
105        io_count = 1 if is_sequence else io_number
106        ioe_element_count = io_number if is_sequence else 1
107
108        rest = asdu_bytes[2:]
109        cause, rest = _decode_int(rest, self._cause_size.value)
110        address, rest = _decode_int(rest, self._asdu_address_size.value)
111
112        ios = collections.deque()
113        for _ in range(io_count):
114            io, rest = self._decode_io(asdu_type, ioe_element_count, rest)
115            ios.append(io)
116
117        asdu = common.ASDU(type=asdu_type,
118                           cause=cause,
119                           address=address,
120                           ios=list(ios))
121        return asdu, rest
122
123    def encode_asdu(self, asdu: common.ASDU) -> util.Bytes:
124        data = collections.deque()
125        data.append(asdu.type)
126
127        is_sequence = len(asdu.ios) == 1 and len(asdu.ios[0].elements) > 1
128        if is_sequence:
129            data.append(
130                (0x00 if self._inverted_sequence_bit else 0x80) |
131                len(asdu.ios[0].elements))
132        else:
133            data.append(
134                (0x80 if self._inverted_sequence_bit else 0x00) |
135                len(asdu.ios))
136
137        data.extend(_encode_int(asdu.cause, self._cause_size.value))
138        data.extend(_encode_int(asdu.address, self._asdu_address_size.value))
139
140        for io in asdu.ios:
141            if not is_sequence and len(io.elements) != 1:
142                raise ValueError('invalid number of IO elements')
143            data.extend(self._encode_io(asdu.type, io))
144
145        return bytes(data)
146
147    def _decode_io(self, asdu_type, ioe_element_count, io_bytes):
148        address, rest = _decode_int(io_bytes, self._io_address_size.value)
149
150        elements = collections.deque()
151        for _ in range(ioe_element_count):
152            element, rest = self._decode_io_element_cb(rest, asdu_type)
153            elements.append(element)
154
155        time_size = self._asdu_type_time_sizes.get(asdu_type)
156        if time_size:
157            time, rest = decode_time(rest, time_size), rest[time_size.value:]
158        else:
159            time = None
160
161        io = common.IO(address=address,
162                       elements=list(elements),
163                       time=time)
164        return io, rest
165
166    def _encode_io(self, asdu_type, io):
167        yield from _encode_int(io.address, self._io_address_size.value)
168
169        for element in io.elements:
170            yield from self._encode_io_element_cb(element, asdu_type)
171
172        time_size = self._asdu_type_time_sizes.get(asdu_type)
173        if time_size:
174            yield from encode_time(io.time, time_size)
175
176
177def _decode_int(data, size):
178    return int.from_bytes(data[:size], 'little'), data[size:]
179
180
181def _encode_int(x, size):
182    return x.to_bytes(size, 'little')
AsduType: TypeAlias = int
AsduTypeTimeSizes: TypeAlias = dict[int, hat.drivers.iec60870.encodings.common.TimeSize]
DecodeIoElementCb: TypeAlias = Callable[[bytes | bytearray | memoryview, int], tuple[Any, bytes | bytearray | memoryview]]
EncodeIoElementCb: TypeAlias = Callable[[Any, int], Iterable[int]]
def decode_time( time_bytes: bytes | bytearray | memoryview, time_size: hat.drivers.iec60870.encodings.common.TimeSize) -> hat.drivers.iec60870.encodings.common.Time:
19def decode_time(time_bytes: util.Bytes,
20                time_size: common.TimeSize
21                ) -> common.Time:
22    milliseconds = (time_bytes[1] << 8) | time_bytes[0]
23    invalid = (bool(time_bytes[2] & 0x80) if time_size.value > 2 else None)
24    minutes = (time_bytes[2] & 0x3F if time_size.value > 2 else None)
25    summer_time = (bool(time_bytes[3] & 0x80) if time_size.value > 3 else None)
26    hours = (time_bytes[3] & 0x1F if time_size.value > 3 else None)
27    day_of_week = (time_bytes[4] >> 5 if time_size.value > 4 else None)
28    day_of_month = (time_bytes[4] & 0x1F if time_size.value > 4 else None)
29    months = (time_bytes[5] & 0x0F if time_size.value > 4 else None)
30    years = (time_bytes[6] & 0x7F if time_size.value > 4 else None)
31
32    return common.Time(size=time_size,
33                       milliseconds=milliseconds,
34                       invalid=invalid,
35                       minutes=minutes,
36                       summer_time=summer_time,
37                       hours=hours,
38                       day_of_week=day_of_week,
39                       day_of_month=day_of_month,
40                       months=months,
41                       years=years)
def encode_time( time: hat.drivers.iec60870.encodings.common.Time, time_size: hat.drivers.iec60870.encodings.common.TimeSize) -> Iterable[int]:
44def encode_time(time: common.Time,
45                time_size: common.TimeSize
46                ) -> typing.Iterable[int]:
47    if time_size.value > time.size.value:
48        raise ValueError('unsupported time size')
49
50    yield time.milliseconds & 0xFF
51    yield (time.milliseconds >> 8) & 0xFF
52
53    if time_size.value > 2:
54        yield ((0x80 if time.invalid else 0) |
55               (time.minutes & 0x3F))
56
57    if time_size.value > 3:
58        yield ((0x80 if time.summer_time else 0) |
59               (time.hours & 0x1F))
60
61    if time_size.value > 4:
62        yield (((time.day_of_week & 0x07) << 5) |
63               (time.day_of_month & 0x1F))
64        yield time.months & 0x0F
65        yield time.years & 0x7F
class Encoder:
 68class Encoder:
 69
 70    def __init__(self,
 71                 cause_size: common.CauseSize,
 72                 asdu_address_size: common.AsduAddressSize,
 73                 io_address_size: common.IoAddressSize,
 74                 asdu_type_time_sizes: AsduTypeTimeSizes,
 75                 inverted_sequence_bit: bool,
 76                 decode_io_element_cb: DecodeIoElementCb,
 77                 encode_io_element_cb: EncodeIoElementCb):
 78        self._cause_size = cause_size
 79        self._asdu_address_size = asdu_address_size
 80        self._io_address_size = io_address_size
 81        self._asdu_type_time_sizes = asdu_type_time_sizes
 82        self._inverted_sequence_bit = inverted_sequence_bit
 83        self._decode_io_element_cb = decode_io_element_cb
 84        self._encode_io_element_cb = encode_io_element_cb
 85
 86    @property
 87    def cause_size(self) -> common.CauseSize:
 88        return self._cause_size
 89
 90    @property
 91    def asdu_address_size(self) -> common.AsduAddressSize:
 92        return self._asdu_address_size
 93
 94    @property
 95    def io_address_size(self) -> common.IoAddressSize:
 96        return self._io_address_size
 97
 98    def decode_asdu(self,
 99                    asdu_bytes: util.Bytes
100                    ) -> tuple[common.ASDU, util.Bytes]:
101        asdu_type = asdu_bytes[0]
102        io_number = asdu_bytes[1] & 0x7F
103        is_sequence = bool(asdu_bytes[1] & 0x80)
104        if self._inverted_sequence_bit:
105            is_sequence = not is_sequence
106        io_count = 1 if is_sequence else io_number
107        ioe_element_count = io_number if is_sequence else 1
108
109        rest = asdu_bytes[2:]
110        cause, rest = _decode_int(rest, self._cause_size.value)
111        address, rest = _decode_int(rest, self._asdu_address_size.value)
112
113        ios = collections.deque()
114        for _ in range(io_count):
115            io, rest = self._decode_io(asdu_type, ioe_element_count, rest)
116            ios.append(io)
117
118        asdu = common.ASDU(type=asdu_type,
119                           cause=cause,
120                           address=address,
121                           ios=list(ios))
122        return asdu, rest
123
124    def encode_asdu(self, asdu: common.ASDU) -> util.Bytes:
125        data = collections.deque()
126        data.append(asdu.type)
127
128        is_sequence = len(asdu.ios) == 1 and len(asdu.ios[0].elements) > 1
129        if is_sequence:
130            data.append(
131                (0x00 if self._inverted_sequence_bit else 0x80) |
132                len(asdu.ios[0].elements))
133        else:
134            data.append(
135                (0x80 if self._inverted_sequence_bit else 0x00) |
136                len(asdu.ios))
137
138        data.extend(_encode_int(asdu.cause, self._cause_size.value))
139        data.extend(_encode_int(asdu.address, self._asdu_address_size.value))
140
141        for io in asdu.ios:
142            if not is_sequence and len(io.elements) != 1:
143                raise ValueError('invalid number of IO elements')
144            data.extend(self._encode_io(asdu.type, io))
145
146        return bytes(data)
147
148    def _decode_io(self, asdu_type, ioe_element_count, io_bytes):
149        address, rest = _decode_int(io_bytes, self._io_address_size.value)
150
151        elements = collections.deque()
152        for _ in range(ioe_element_count):
153            element, rest = self._decode_io_element_cb(rest, asdu_type)
154            elements.append(element)
155
156        time_size = self._asdu_type_time_sizes.get(asdu_type)
157        if time_size:
158            time, rest = decode_time(rest, time_size), rest[time_size.value:]
159        else:
160            time = None
161
162        io = common.IO(address=address,
163                       elements=list(elements),
164                       time=time)
165        return io, rest
166
167    def _encode_io(self, asdu_type, io):
168        yield from _encode_int(io.address, self._io_address_size.value)
169
170        for element in io.elements:
171            yield from self._encode_io_element_cb(element, asdu_type)
172
173        time_size = self._asdu_type_time_sizes.get(asdu_type)
174        if time_size:
175            yield from encode_time(io.time, time_size)
Encoder( cause_size: hat.drivers.iec60870.encodings.common.CauseSize, asdu_address_size: hat.drivers.iec60870.encodings.common.AsduAddressSize, io_address_size: hat.drivers.iec60870.encodings.common.IoAddressSize, asdu_type_time_sizes: dict[int, hat.drivers.iec60870.encodings.common.TimeSize], inverted_sequence_bit: bool, decode_io_element_cb: Callable[[bytes | bytearray | memoryview, int], tuple[Any, bytes | bytearray | memoryview]], encode_io_element_cb: Callable[[Any, int], Iterable[int]])
70    def __init__(self,
71                 cause_size: common.CauseSize,
72                 asdu_address_size: common.AsduAddressSize,
73                 io_address_size: common.IoAddressSize,
74                 asdu_type_time_sizes: AsduTypeTimeSizes,
75                 inverted_sequence_bit: bool,
76                 decode_io_element_cb: DecodeIoElementCb,
77                 encode_io_element_cb: EncodeIoElementCb):
78        self._cause_size = cause_size
79        self._asdu_address_size = asdu_address_size
80        self._io_address_size = io_address_size
81        self._asdu_type_time_sizes = asdu_type_time_sizes
82        self._inverted_sequence_bit = inverted_sequence_bit
83        self._decode_io_element_cb = decode_io_element_cb
84        self._encode_io_element_cb = encode_io_element_cb
86    @property
87    def cause_size(self) -> common.CauseSize:
88        return self._cause_size
90    @property
91    def asdu_address_size(self) -> common.AsduAddressSize:
92        return self._asdu_address_size
94    @property
95    def io_address_size(self) -> common.IoAddressSize:
96        return self._io_address_size
def decode_asdu( self, asdu_bytes: bytes | bytearray | memoryview) -> tuple[hat.drivers.iec60870.encodings.common.ASDU, bytes | bytearray | memoryview]:
 98    def decode_asdu(self,
 99                    asdu_bytes: util.Bytes
100                    ) -> tuple[common.ASDU, util.Bytes]:
101        asdu_type = asdu_bytes[0]
102        io_number = asdu_bytes[1] & 0x7F
103        is_sequence = bool(asdu_bytes[1] & 0x80)
104        if self._inverted_sequence_bit:
105            is_sequence = not is_sequence
106        io_count = 1 if is_sequence else io_number
107        ioe_element_count = io_number if is_sequence else 1
108
109        rest = asdu_bytes[2:]
110        cause, rest = _decode_int(rest, self._cause_size.value)
111        address, rest = _decode_int(rest, self._asdu_address_size.value)
112
113        ios = collections.deque()
114        for _ in range(io_count):
115            io, rest = self._decode_io(asdu_type, ioe_element_count, rest)
116            ios.append(io)
117
118        asdu = common.ASDU(type=asdu_type,
119                           cause=cause,
120                           address=address,
121                           ios=list(ios))
122        return asdu, rest
def encode_asdu( self, asdu: hat.drivers.iec60870.encodings.common.ASDU) -> bytes | bytearray | memoryview:
124    def encode_asdu(self, asdu: common.ASDU) -> util.Bytes:
125        data = collections.deque()
126        data.append(asdu.type)
127
128        is_sequence = len(asdu.ios) == 1 and len(asdu.ios[0].elements) > 1
129        if is_sequence:
130            data.append(
131                (0x00 if self._inverted_sequence_bit else 0x80) |
132                len(asdu.ios[0].elements))
133        else:
134            data.append(
135                (0x80 if self._inverted_sequence_bit else 0x00) |
136                len(asdu.ios))
137
138        data.extend(_encode_int(asdu.cause, self._cause_size.value))
139        data.extend(_encode_int(asdu.address, self._asdu_address_size.value))
140
141        for io in asdu.ios:
142            if not is_sequence and len(io.elements) != 1:
143                raise ValueError('invalid number of IO elements')
144            data.extend(self._encode_io(asdu.type, io))
145
146        return bytes(data)