hat.drivers.iec60870.apci

IEC 60870-5 APCI layer

 1"""IEC 60870-5 APCI layer"""
 2
 3from hat.drivers.iec60870.apci.common import SequenceNumber
 4from hat.drivers.iec60870.apci.connection import (ConnectionCb,
 5                                                  ConnectionDisabledError,
 6                                                  connect,
 7                                                  listen,
 8                                                  Connection)
 9
10
11__all__ = ['SequenceNumber',
12           'ConnectionCb',
13           'ConnectionDisabledError',
14           'connect',
15           'listen',
16           'Connection']
SequenceNumber = <class 'int'>
ConnectionCb = typing.Callable[[ForwardRef('Connection')], None | collections.abc.Awaitable[None]]
class ConnectionDisabledError(builtins.ConnectionError):
24class ConnectionDisabledError(ConnectionError):
25    pass

Connection error.

async def connect( addr: hat.drivers.tcp.Address, response_timeout: float = 15, supervisory_timeout: float = 10, test_timeout: float = 20, send_window_size: int = 12, receive_window_size: int = 8, *, send_queue_size: int = 1024, receive_queue_size: int = 1024, **kwargs) -> Connection:
28async def connect(addr: tcp.Address,
29                  response_timeout: float = 15,
30                  supervisory_timeout: float = 10,
31                  test_timeout: float = 20,
32                  send_window_size: int = 12,
33                  receive_window_size: int = 8,
34                  *,
35                  send_queue_size: int = 1024,
36                  receive_queue_size: int = 1024,
37                  **kwargs
38                  ) -> 'Connection':
39    """Connect to remote device
40
41    Additional arguments are passed directly to `hat.drivers.tcp.connect`.
42
43    Args:
44        addr: remote server's address
45        response_timeout: response timeout (t1) in seconds
46        supervisory_timeout: supervisory timeout (t2) in seconds
47        test_timeout: test timeout (t3) in seconds
48        send_window_size: send window size (k)
49        receive_window_size: receive window size (w)
50        send_queue_size: size of send queue
51        receive_queue_size: size of receive queue
52
53    """
54    conn = await tcp.connect(addr, **kwargs)
55
56    try:
57        transport = Transport(conn)
58
59        apdu = common.APDUU(common.ApduFunction.STARTDT_ACT)
60        await transport.write(apdu)
61
62        await aio.wait_for(_wait_startdt_con(transport), response_timeout)
63
64    except BaseException:
65        await aio.uncancellable(conn.async_close())
66        raise
67
68    return Connection(transport=transport,
69                      always_enabled=True,
70                      response_timeout=response_timeout,
71                      supervisory_timeout=supervisory_timeout,
72                      test_timeout=test_timeout,
73                      send_window_size=send_window_size,
74                      receive_window_size=receive_window_size,
75                      send_queue_size=send_queue_size,
76                      receive_queue_size=receive_queue_size)

Connect to remote device

Additional arguments are passed directly to hat.drivers.tcp.connect.

Arguments:
  • addr: remote server's address
  • response_timeout: response timeout (t1) in seconds
  • supervisory_timeout: supervisory timeout (t2) in seconds
  • test_timeout: test timeout (t3) in seconds
  • send_window_size: send window size (k)
  • receive_window_size: receive window size (w)
  • send_queue_size: size of send queue
  • receive_queue_size: size of receive queue
async def listen( connection_cb: Callable[[Connection], None | Awaitable[None]], addr: hat.drivers.tcp.Address = Address(host='0.0.0.0', port=2404), response_timeout: float = 15, supervisory_timeout: float = 10, test_timeout: float = 20, send_window_size: int = 12, receive_window_size: int = 8, *, send_queue_size: int = 1024, receive_queue_size: int = 1024, bind_connections: bool = True, **kwargs) -> hat.drivers.tcp.Server:
 79async def listen(connection_cb: ConnectionCb,
 80                 addr: tcp.Address = tcp.Address('0.0.0.0', 2404),
 81                 response_timeout: float = 15,
 82                 supervisory_timeout: float = 10,
 83                 test_timeout: float = 20,
 84                 send_window_size: int = 12,
 85                 receive_window_size: int = 8,
 86                 *,
 87                 send_queue_size: int = 1024,
 88                 receive_queue_size: int = 1024,
 89                 bind_connections: bool = True,
 90                 **kwargs
 91                 ) -> tcp.Server:
 92    """Create new IEC104 slave and listen for incoming connections
 93
 94    Additional arguments are passed directly to `hat.drivers.tcp.listen`.
 95
 96    Args:
 97        connection_cb: new connection callback
 98        addr: listening socket address
 99        response_timeout: response timeout (t1) in seconds
100        supervisory_timeout: supervisory timeout (t2) in seconds
101        test_timeout: test timeout (t3) in seconds
102        send_window_size: send window size (k)
103        receive_window_size: receive window size (w)
104        bind_connections: bind connections (see `hat.drivers.tcp.listen`)
105
106    """
107
108    log = mlog
109
110    async def on_connection(conn):
111        try:
112            try:
113                conn = Connection(transport=Transport(conn),
114                                  always_enabled=False,
115                                  response_timeout=response_timeout,
116                                  supervisory_timeout=supervisory_timeout,
117                                  test_timeout=test_timeout,
118                                  send_window_size=send_window_size,
119                                  receive_window_size=receive_window_size,
120                                  send_queue_size=send_queue_size,
121                                  receive_queue_size=receive_queue_size)
122
123                await aio.call(connection_cb, conn)
124
125            except BaseException:
126                await aio.uncancellable(conn.async_close())
127                raise
128
129        except Exception as e:
130            log.error("on connection error: %s", e, exc_info=e)
131
132    server = await tcp.listen(on_connection, addr,
133                              bind_connections=bind_connections,
134                              **kwargs)
135
136    log = _create_server_logger_adapter(server.info)
137
138    return server

Create new IEC104 slave and listen for incoming connections

Additional arguments are passed directly to hat.drivers.tcp.listen.

Arguments:
  • connection_cb: new connection callback
  • addr: listening socket address
  • response_timeout: response timeout (t1) in seconds
  • supervisory_timeout: supervisory timeout (t2) in seconds
  • test_timeout: test timeout (t3) in seconds
  • send_window_size: send window size (k)
  • receive_window_size: receive window size (w)
  • bind_connections: bind connections (see hat.drivers.tcp.listen)
class Connection(hat.aio.group.Resource):
141class Connection(aio.Resource):
142    """Connection
143
144    For creating new Connection instances see `connect` or `listen` coroutine.
145
146    """
147
148    def __init__(self,
149                 transport: Transport,
150                 always_enabled: bool,
151                 response_timeout: float,
152                 supervisory_timeout: float,
153                 test_timeout: float,
154                 send_window_size: int,
155                 receive_window_size: int,
156                 send_queue_size: int,
157                 receive_queue_size: int):
158        self._transport = transport
159        self._always_enabled = always_enabled
160        self._is_enabled = always_enabled
161        self._enabled_cbs = util.CallbackRegistry()
162        self._response_timeout = response_timeout
163        self._supervisory_timeout = supervisory_timeout
164        self._test_timeout = test_timeout
165        self._send_window_size = send_window_size
166        self._receive_window_size = receive_window_size
167        self._receive_queue = aio.Queue(receive_queue_size)
168        self._send_queue = aio.Queue(send_queue_size)
169        self._test_event = asyncio.Event()
170        self._ssn = 0
171        self._rsn = 0
172        self._ack = 0
173        self._w = 0
174        self._supervisory_handle = None
175        self._waiting_ack_handles = {}
176        self._waiting_ack_cv = asyncio.Condition()
177        self._loop = asyncio.get_running_loop()
178        self._log = _create_connection_logger_adapter(transport.info)
179
180        self.async_group.spawn(self._read_loop)
181        self.async_group.spawn(self._write_loop)
182        self.async_group.spawn(self._test_loop)
183
184    @property
185    def async_group(self) -> aio.Group:
186        """Async group"""
187        return self._transport.async_group
188
189    @property
190    def info(self) -> tcp.ConnectionInfo:
191        """Connection info"""
192        return self._transport.info
193
194    @property
195    def ssl_object(self) -> ssl.SSLObject | ssl.SSLSocket | None:
196        """SSL Object"""
197        return self._transport.ssl_object
198
199    @property
200    def is_enabled(self) -> bool:
201        """Is enabled"""
202        return self._is_enabled
203
204    def register_enabled_cb(self,
205                            cb: typing.Callable[[bool], None]
206                            ) -> util.RegisterCallbackHandle:
207        """Register enable callback"""
208        return self._enabled_cbs.register(cb)
209
210    async def send(self,
211                   data: util.Bytes,
212                   wait_ack: bool = False):
213        """Send data and optionally wait for acknowledgement
214
215        Raises:
216            ConnectionDisabledError
217            ConnectionError
218
219        """
220        future = self._loop.create_future() if wait_ack else None
221        entry = _SendQueueEntry(data, future, wait_ack)
222
223        try:
224            await self._send_queue.put(entry)
225
226            if wait_ack:
227                await future
228
229        except aio.QueueClosedError:
230            raise ConnectionError()
231
232    async def drain(self, wait_ack: bool = False):
233        """Drain and optionally wait for acknowledgement
234
235        Raises:
236            ConnectionError
237
238        """
239        future = self._loop.create_future()
240        entry = _SendQueueEntry(None, future, wait_ack)
241
242        try:
243            await self._send_queue.put(entry)
244            await future
245
246        except aio.QueueClosedError:
247            raise ConnectionError()
248
249    async def receive(self) -> util.Bytes:
250        """Receive data
251
252        Raises:
253            ConnectionError
254
255        """
256        try:
257            return await self._receive_queue.get()
258
259        except aio.QueueClosedError:
260            raise ConnectionError()
261
262    def _on_response_timeout(self):
263        self._log.warning("response timeout occured - closing connection")
264        self.close()
265
266    def _on_supervisory_timeout(self):
267        self.async_group.spawn(self._on_supervisory_timeout_async)
268
269    async def _on_supervisory_timeout_async(self):
270        try:
271            await self._write_apdus()
272
273        except Exception as e:
274            self._log.warning('supervisory timeout error: %s', e, exc_info=e)
275
276    async def _read_loop(self):
277        try:
278            while True:
279                apdu = await self._transport.read()
280
281                if isinstance(apdu, common.APDUU):
282                    await self._process_apduu(apdu)
283
284                elif isinstance(apdu, common.APDUS):
285                    await self._process_apdus(apdu)
286
287                elif isinstance(apdu, common.APDUI):
288                    await self._process_apdui(apdu)
289
290                else:
291                    raise ValueError("unsupported APDU")
292
293        except (ConnectionError, aio.QueueClosedError):
294            pass
295
296        except Exception as e:
297            self._log.warning('read loop error: %s', e, exc_info=e)
298
299        finally:
300            self.close()
301            self._receive_queue.close()
302
303    async def _write_loop(self):
304        entry = None
305
306        try:
307            while True:
308                entry = await self._send_queue.get()
309
310                if entry.data is None:
311                    await self._transport.drain()
312                    ssn = (self._ssn or 0x8000) - 1
313                    handle = self._waiting_ack_handles.get(ssn)
314
315                else:
316                    handle = await self._write_apdui(entry.data)
317                    if not handle and entry.future and not entry.future.done():
318                        entry.future.set_exception(ConnectionDisabledError())
319
320                if not entry.future:
321                    continue
322
323                if entry.wait_ack and handle and not entry.future.done():
324                    self.async_group.spawn(self._wait_ack, handle,
325                                           entry.future)
326                    entry = None
327
328                elif not entry.future.done():
329                    entry.future.set_result(None)
330
331        except (ConnectionError, aio.QueueClosedError):
332            pass
333
334        except Exception as e:
335            self._log.warning('write loop error: %s', e, exc_info=e)
336
337        finally:
338            self.close()
339            self._stop_supervisory_timeout()
340            self._send_queue.close()
341
342            for f in self._waiting_ack_handles.values():
343                f.cancel()
344
345            while True:
346                if entry and entry.future and not entry.future.done():
347                    entry.future.set_exception(ConnectionError())
348                if self._send_queue.empty():
349                    break
350                entry = self._send_queue.get_nowait()
351
352    async def _test_loop(self):
353        # TODO: implement reset timeout on received frame (v2 5.2.)
354        try:
355            while True:
356                await asyncio.sleep(self._test_timeout)
357
358                self._test_event.clear()
359                await self._transport.write(
360                    common.APDUU(common.ApduFunction.TESTFR_ACT))
361
362                await aio.wait_for(self._test_event.wait(),
363                                   self._response_timeout)
364
365        except Exception as e:
366            self._log.warning('test loop error: %s', e, exc_info=e)
367
368        finally:
369            self.close()
370
371    async def _process_apduu(self, apdu):
372        if apdu.function == common.ApduFunction.STARTDT_ACT:
373            self._is_enabled = True
374            await self._transport.write(
375                common.APDUU(common.ApduFunction.STARTDT_CON))
376
377            self._log.debug("send data enabled")
378            self._enabled_cbs.notify(True)
379
380        elif apdu.function == common.ApduFunction.STOPDT_ACT:
381            if not self._always_enabled:
382                await self._write_apdus()
383                self._is_enabled = False
384                await self._transport.write(
385                    common.APDUU(common.ApduFunction.STOPDT_CON))
386
387                self._log.debug("send data disabled")
388                self._enabled_cbs.notify(False)
389
390        elif apdu.function == common.ApduFunction.TESTFR_ACT:
391            await self._transport.write(
392                common.APDUU(common.ApduFunction.TESTFR_CON))
393
394        elif apdu.function == common.ApduFunction.TESTFR_CON:
395            self._test_event.set()
396
397    async def _process_apdus(self, apdu):
398        await self._set_ack(apdu.rsn)
399
400    async def _process_apdui(self, apdu):
401        await self._set_ack(apdu.rsn)
402
403        if apdu.ssn != self._rsn:
404            raise Exception('missing apdu sequence number')
405
406        self._rsn = (self._rsn + 1) % 0x8000
407        self._start_supervisory_timeout()
408
409        if apdu.data:
410            await self._receive_queue.put(apdu.data)
411
412        self._w += 1
413        if self._w >= self._receive_window_size:
414            await self._write_apdus()
415
416    async def _write_apdui(self, data):
417        if self._ssn in self._waiting_ack_handles:
418            raise Exception("can not reuse already registered ssn")
419
420        async with self._waiting_ack_cv:
421            await self._waiting_ack_cv.wait_for(
422                lambda: (len(self._waiting_ack_handles) <
423                         self._send_window_size))
424
425        if not self._is_enabled:
426            self._log.debug("send data not enabled - discarding message")
427            return
428
429        await self._transport.write(common.APDUI(ssn=self._ssn,
430                                                 rsn=self._rsn,
431                                                 data=data))
432        self._w = 0
433        self._stop_supervisory_timeout()
434
435        handle = self._loop.call_later(self._response_timeout,
436                                       self._on_response_timeout)
437        self._waiting_ack_handles[self._ssn] = handle
438        self._ssn = (self._ssn + 1) % 0x8000
439        return handle
440
441    async def _write_apdus(self):
442        await self._transport.write(common.APDUS(self._rsn))
443        self._w = 0
444        self._stop_supervisory_timeout()
445
446    async def _wait_ack(self, handle, future):
447        try:
448            async with self._waiting_ack_cv:
449                await self._waiting_ack_cv.wait_for(handle.cancelled)
450
451            if not future.done():
452                future.set_result(None)
453
454        finally:
455            if not future.done():
456                future.set_exception(ConnectionError())
457
458    async def _set_ack(self, ack):
459        if ack >= self._ack:
460            ssns = range(self._ack, ack)
461        else:
462            ssns = itertools.chain(range(self._ack, 0x8000), range(ack))
463
464        for ssn in ssns:
465            handle = self._waiting_ack_handles.pop(ssn, None)
466            if not handle:
467                raise Exception("received ack for unsent sequence number")
468            handle.cancel()
469
470        self._ack = ack
471        async with self._waiting_ack_cv:
472            self._waiting_ack_cv.notify_all()
473
474    def _start_supervisory_timeout(self):
475        if self._supervisory_handle:
476            return
477
478        self._supervisory_handle = self._loop.call_later(
479            self._supervisory_timeout, self._on_supervisory_timeout)
480
481    def _stop_supervisory_timeout(self):
482        if not self._supervisory_handle:
483            return
484
485        self._supervisory_handle.cancel()
486        self._supervisory_handle = None

Connection

For creating new Connection instances see connect or listen coroutine.

Connection( transport: hat.drivers.iec60870.apci.transport.Transport, always_enabled: bool, response_timeout: float, supervisory_timeout: float, test_timeout: float, send_window_size: int, receive_window_size: int, send_queue_size: int, receive_queue_size: int)
148    def __init__(self,
149                 transport: Transport,
150                 always_enabled: bool,
151                 response_timeout: float,
152                 supervisory_timeout: float,
153                 test_timeout: float,
154                 send_window_size: int,
155                 receive_window_size: int,
156                 send_queue_size: int,
157                 receive_queue_size: int):
158        self._transport = transport
159        self._always_enabled = always_enabled
160        self._is_enabled = always_enabled
161        self._enabled_cbs = util.CallbackRegistry()
162        self._response_timeout = response_timeout
163        self._supervisory_timeout = supervisory_timeout
164        self._test_timeout = test_timeout
165        self._send_window_size = send_window_size
166        self._receive_window_size = receive_window_size
167        self._receive_queue = aio.Queue(receive_queue_size)
168        self._send_queue = aio.Queue(send_queue_size)
169        self._test_event = asyncio.Event()
170        self._ssn = 0
171        self._rsn = 0
172        self._ack = 0
173        self._w = 0
174        self._supervisory_handle = None
175        self._waiting_ack_handles = {}
176        self._waiting_ack_cv = asyncio.Condition()
177        self._loop = asyncio.get_running_loop()
178        self._log = _create_connection_logger_adapter(transport.info)
179
180        self.async_group.spawn(self._read_loop)
181        self.async_group.spawn(self._write_loop)
182        self.async_group.spawn(self._test_loop)
async_group: hat.aio.group.Group
184    @property
185    def async_group(self) -> aio.Group:
186        """Async group"""
187        return self._transport.async_group

Async group

info: hat.drivers.tcp.ConnectionInfo
189    @property
190    def info(self) -> tcp.ConnectionInfo:
191        """Connection info"""
192        return self._transport.info

Connection info

ssl_object: ssl.SSLObject | ssl.SSLSocket | None
194    @property
195    def ssl_object(self) -> ssl.SSLObject | ssl.SSLSocket | None:
196        """SSL Object"""
197        return self._transport.ssl_object

SSL Object

is_enabled: bool
199    @property
200    def is_enabled(self) -> bool:
201        """Is enabled"""
202        return self._is_enabled

Is enabled

def register_enabled_cb( self, cb: Callable[[bool], NoneType]) -> hat.util.callback.RegisterCallbackHandle:
204    def register_enabled_cb(self,
205                            cb: typing.Callable[[bool], None]
206                            ) -> util.RegisterCallbackHandle:
207        """Register enable callback"""
208        return self._enabled_cbs.register(cb)

Register enable callback

async def send(self, data: bytes | bytearray | memoryview, wait_ack: bool = False):
210    async def send(self,
211                   data: util.Bytes,
212                   wait_ack: bool = False):
213        """Send data and optionally wait for acknowledgement
214
215        Raises:
216            ConnectionDisabledError
217            ConnectionError
218
219        """
220        future = self._loop.create_future() if wait_ack else None
221        entry = _SendQueueEntry(data, future, wait_ack)
222
223        try:
224            await self._send_queue.put(entry)
225
226            if wait_ack:
227                await future
228
229        except aio.QueueClosedError:
230            raise ConnectionError()

Send data and optionally wait for acknowledgement

Raises:
  • ConnectionDisabledError
  • ConnectionError
async def drain(self, wait_ack: bool = False):
232    async def drain(self, wait_ack: bool = False):
233        """Drain and optionally wait for acknowledgement
234
235        Raises:
236            ConnectionError
237
238        """
239        future = self._loop.create_future()
240        entry = _SendQueueEntry(None, future, wait_ack)
241
242        try:
243            await self._send_queue.put(entry)
244            await future
245
246        except aio.QueueClosedError:
247            raise ConnectionError()

Drain and optionally wait for acknowledgement

Raises:
  • ConnectionError
async def receive(self) -> bytes | bytearray | memoryview:
249    async def receive(self) -> util.Bytes:
250        """Receive data
251
252        Raises:
253            ConnectionError
254
255        """
256        try:
257            return await self._receive_queue.get()
258
259        except aio.QueueClosedError:
260            raise ConnectionError()

Receive data

Raises:
  • ConnectionError