hat.drivers.iec60870.apci

IEC 60870-5 APCI layer

 1"""IEC 60870-5 APCI layer"""
 2
 3from hat.drivers.iec60870.apci.common import SequenceNumber
 4from hat.drivers.iec60870.apci.connection import (ConnectionCb,
 5                                                  ConnectionDisabledError,
 6                                                  connect,
 7                                                  listen,
 8                                                  Connection)
 9
10
11__all__ = ['SequenceNumber',
12           'ConnectionCb',
13           'ConnectionDisabledError',
14           'connect',
15           'listen',
16           'Connection']
SequenceNumber = <class 'int'>
ConnectionCb = typing.Callable[[ForwardRef('Connection')], None | collections.abc.Awaitable[None]]
class ConnectionDisabledError(builtins.ConnectionError):
24class ConnectionDisabledError(ConnectionError):
25    pass

Connection error.

async def connect( addr: hat.drivers.tcp.Address, response_timeout: float = 15, supervisory_timeout: float = 10, test_timeout: float = 20, send_window_size: int = 12, receive_window_size: int = 8, *, send_queue_size: int = 1024, receive_queue_size: int = 1024, **kwargs) -> Connection:
28async def connect(addr: tcp.Address,
29                  response_timeout: float = 15,
30                  supervisory_timeout: float = 10,
31                  test_timeout: float = 20,
32                  send_window_size: int = 12,
33                  receive_window_size: int = 8,
34                  *,
35                  send_queue_size: int = 1024,
36                  receive_queue_size: int = 1024,
37                  **kwargs
38                  ) -> 'Connection':
39    """Connect to remote device
40
41    Additional arguments are passed directly to `hat.drivers.tcp.connect`.
42
43    Args:
44        addr: remote server's address
45        response_timeout: response timeout (t1) in seconds
46        supervisory_timeout: supervisory timeout (t2) in seconds
47        test_timeout: test timeout (t3) in seconds
48        send_window_size: send window size (k)
49        receive_window_size: receive window size (w)
50        send_queue_size: size of send queue
51        receive_queue_size: size of receive queue
52
53    """
54    conn = await tcp.connect(addr, **kwargs)
55
56    try:
57        apdu = common.APDUU(common.ApduFunction.STARTDT_ACT)
58        await _write_apdu(conn, apdu)
59        await aio.wait_for(_wait_startdt_con(conn), response_timeout)
60
61    except Exception:
62        await aio.uncancellable(conn.async_close())
63        raise
64
65    return Connection(conn=conn,
66                      always_enabled=True,
67                      response_timeout=response_timeout,
68                      supervisory_timeout=supervisory_timeout,
69                      test_timeout=test_timeout,
70                      send_window_size=send_window_size,
71                      receive_window_size=receive_window_size,
72                      send_queue_size=send_queue_size,
73                      receive_queue_size=receive_queue_size)

Connect to remote device

Additional arguments are passed directly to hat.drivers.tcp.connect.

Arguments:
  • addr: remote server's address
  • response_timeout: response timeout (t1) in seconds
  • supervisory_timeout: supervisory timeout (t2) in seconds
  • test_timeout: test timeout (t3) in seconds
  • send_window_size: send window size (k)
  • receive_window_size: receive window size (w)
  • send_queue_size: size of send queue
  • receive_queue_size: size of receive queue
async def listen( connection_cb: Callable[[hat.drivers.acse.Connection], None | Awaitable[None]], addr: hat.drivers.tcp.Address = Address(host='0.0.0.0', port=2404), response_timeout: float = 15, supervisory_timeout: float = 10, test_timeout: float = 20, send_window_size: int = 12, receive_window_size: int = 8, *, send_queue_size: int = 1024, receive_queue_size: int = 1024, bind_connections: bool = True, **kwargs) -> hat.drivers.tcp.Server:
 76async def listen(connection_cb: ConnectionCb,
 77                 addr: tcp.Address = tcp.Address('0.0.0.0', 2404),
 78                 response_timeout: float = 15,
 79                 supervisory_timeout: float = 10,
 80                 test_timeout: float = 20,
 81                 send_window_size: int = 12,
 82                 receive_window_size: int = 8,
 83                 *,
 84                 send_queue_size: int = 1024,
 85                 receive_queue_size: int = 1024,
 86                 bind_connections: bool = True,
 87                 **kwargs
 88                 ) -> tcp.Server:
 89    """Create new IEC104 slave and listen for incoming connections
 90
 91    Additional arguments are passed directly to `hat.drivers.tcp.listen`.
 92
 93    Args:
 94        connection_cb: new connection callback
 95        addr: listening socket address
 96        response_timeout: response timeout (t1) in seconds
 97        supervisory_timeout: supervisory timeout (t2) in seconds
 98        test_timeout: test timeout (t3) in seconds
 99        send_window_size: send window size (k)
100        receive_window_size: receive window size (w)
101        bind_connections: bind connections (see `hat.drivers.tcp.listen`)
102
103    """
104
105    async def on_connection(conn):
106        try:
107            try:
108                conn = Connection(conn=conn,
109                                  always_enabled=False,
110                                  response_timeout=response_timeout,
111                                  supervisory_timeout=supervisory_timeout,
112                                  test_timeout=test_timeout,
113                                  send_window_size=send_window_size,
114                                  receive_window_size=receive_window_size,
115                                  send_queue_size=send_queue_size,
116                                  receive_queue_size=receive_queue_size)
117
118                await aio.call(connection_cb, conn)
119
120            except BaseException:
121                await aio.uncancellable(conn.async_close())
122                raise
123
124        except Exception as e:
125            mlog.error("on connection error: %s", e, exc_info=e)
126
127    return await tcp.listen(on_connection, addr,
128                            bind_connections=bind_connections,
129                            **kwargs)

Create new IEC104 slave and listen for incoming connections

Additional arguments are passed directly to hat.drivers.tcp.listen.

Arguments:
  • connection_cb: new connection callback
  • addr: listening socket address
  • response_timeout: response timeout (t1) in seconds
  • supervisory_timeout: supervisory timeout (t2) in seconds
  • test_timeout: test timeout (t3) in seconds
  • send_window_size: send window size (k)
  • receive_window_size: receive window size (w)
  • bind_connections: bind connections (see hat.drivers.tcp.listen)
class Connection(hat.aio.group.Resource):
132class Connection(aio.Resource):
133    """Connection
134
135    For creating new Connection instances see `connect` or `listen` coroutine.
136
137    """
138
139    def __init__(self,
140                 conn: tcp.Connection,
141                 always_enabled: bool,
142                 response_timeout: float,
143                 supervisory_timeout: float,
144                 test_timeout: float,
145                 send_window_size: int,
146                 receive_window_size: int,
147                 send_queue_size: int,
148                 receive_queue_size: int):
149        self._conn = conn
150        self._always_enabled = always_enabled
151        self._is_enabled = always_enabled
152        self._enabled_cbs = util.CallbackRegistry()
153        self._response_timeout = response_timeout
154        self._supervisory_timeout = supervisory_timeout
155        self._test_timeout = test_timeout
156        self._send_window_size = send_window_size
157        self._receive_window_size = receive_window_size
158        self._receive_queue = aio.Queue(receive_queue_size)
159        self._send_queue = aio.Queue(send_queue_size)
160        self._test_event = asyncio.Event()
161        self._ssn = 0
162        self._rsn = 0
163        self._ack = 0
164        self._w = 0
165        self._supervisory_handle = None
166        self._waiting_ack_handles = {}
167        self._waiting_ack_cv = asyncio.Condition()
168        self._loop = asyncio.get_running_loop()
169
170        self.async_group.spawn(self._read_loop)
171        self.async_group.spawn(self._write_loop)
172        self.async_group.spawn(self._test_loop)
173
174    @property
175    def async_group(self) -> aio.Group:
176        """Async group"""
177        return self._conn.async_group
178
179    @property
180    def info(self) -> tcp.ConnectionInfo:
181        """Connection info"""
182        return self._conn.info
183
184    @property
185    def ssl_object(self) -> ssl.SSLObject | ssl.SSLSocket | None:
186        """SSL Object"""
187        return self._conn.ssl_object
188
189    @property
190    def is_enabled(self) -> bool:
191        """Is enabled"""
192        return self._is_enabled
193
194    def register_enabled_cb(self,
195                            cb: typing.Callable[[bool], None]
196                            ) -> util.RegisterCallbackHandle:
197        """Register enable callback"""
198        return self._enabled_cbs.register(cb)
199
200    async def send(self,
201                   data: util.Bytes,
202                   wait_ack: bool = False):
203        """Send data and optionally wait for acknowledgement
204
205        Raises:
206            ConnectionDisabledError
207            ConnectionError
208
209        """
210        future = self._loop.create_future() if wait_ack else None
211        entry = _SendQueueEntry(data, future, wait_ack)
212
213        try:
214            await self._send_queue.put(entry)
215
216            if wait_ack:
217                await future
218
219        except aio.QueueClosedError:
220            raise ConnectionError()
221
222    async def drain(self, wait_ack: bool = False):
223        """Drain and optionally wait for acknowledgement
224
225        Raises:
226            ConnectionError
227
228        """
229        future = self._loop.create_future()
230        entry = _SendQueueEntry(None, future, wait_ack)
231
232        try:
233            await self._send_queue.put(entry)
234            await future
235
236        except aio.QueueClosedError:
237            raise ConnectionError()
238
239    async def receive(self) -> util.Bytes:
240        """Receive data
241
242        Raises:
243            ConnectionError
244
245        """
246        try:
247            return await self._receive_queue.get()
248
249        except aio.QueueClosedError:
250            raise ConnectionError()
251
252    def _on_response_timeout(self):
253        mlog.warning("response timeout occured - closing connection")
254        self.close()
255
256    def _on_supervisory_timeout(self):
257        self.async_group.spawn(self._on_supervisory_timeout_async)
258
259    async def _on_supervisory_timeout_async(self):
260        try:
261            await self._write_apdus()
262
263        except Exception as e:
264            mlog.warning('supervisory timeout error: %s', e, exc_info=e)
265
266    async def _read_loop(self):
267        try:
268            while True:
269                apdu = await _read_apdu(self._conn)
270
271                if isinstance(apdu, common.APDUU):
272                    await self._process_apduu(apdu)
273
274                elif isinstance(apdu, common.APDUS):
275                    await self._process_apdus(apdu)
276
277                elif isinstance(apdu, common.APDUI):
278                    await self._process_apdui(apdu)
279
280                else:
281                    raise ValueError("unsupported APDU")
282
283        except (ConnectionError, aio.QueueClosedError):
284            pass
285
286        except Exception as e:
287            mlog.warning('read loop error: %s', e, exc_info=e)
288
289        finally:
290            self.close()
291            self._receive_queue.close()
292
293    async def _write_loop(self):
294        entry = None
295
296        try:
297            while True:
298                entry = await self._send_queue.get()
299
300                if entry.data is None:
301                    await self._conn.drain()
302                    ssn = (self._ssn or 0x8000) - 1
303                    handle = self._waiting_ack_handles.get(ssn)
304
305                else:
306                    handle = await self._write_apdui(entry.data)
307                    if not handle and entry.future and not entry.future.done():
308                        entry.future.set_exception(ConnectionDisabledError())
309
310                if not entry.future:
311                    continue
312
313                if entry.wait_ack and handle and not entry.future.done():
314                    self.async_group.spawn(self._wait_ack, handle,
315                                           entry.future)
316                    entry = None
317
318                elif not entry.future.done():
319                    entry.future.set_result(None)
320
321        except (ConnectionError, aio.QueueClosedError):
322            pass
323
324        except Exception as e:
325            mlog.warning('write loop error: %s', e, exc_info=e)
326
327        finally:
328            self.close()
329            self._stop_supervisory_timeout()
330            self._send_queue.close()
331
332            for f in self._waiting_ack_handles.values():
333                f.cancel()
334
335            while True:
336                if entry and entry.future and not entry.future.done():
337                    entry.future.set_exception(ConnectionError())
338                if self._send_queue.empty():
339                    break
340                entry = self._send_queue.get_nowait()
341
342    async def _test_loop(self):
343        # TODO: implement reset timeout on received frame (v2 5.2.)
344        try:
345            while True:
346                await asyncio.sleep(self._test_timeout)
347
348                self._test_event.clear()
349                await _write_apdu(self._conn,
350                                  common.APDUU(common.ApduFunction.TESTFR_ACT))
351
352                await aio.wait_for(self._test_event.wait(),
353                                   self._response_timeout)
354
355        except Exception as e:
356            mlog.warning('test loop error: %s', e, exc_info=e)
357
358        finally:
359            self.close()
360
361    async def _process_apduu(self, apdu):
362        if apdu.function == common.ApduFunction.STARTDT_ACT:
363            self._is_enabled = True
364            await _write_apdu(self._conn,
365                              common.APDUU(common.ApduFunction.STARTDT_CON))
366
367            mlog.debug("send data enabled")
368            self._enabled_cbs.notify(True)
369
370        elif apdu.function == common.ApduFunction.STOPDT_ACT:
371            if not self._always_enabled:
372                await self._write_apdus()
373                self._is_enabled = False
374                await _write_apdu(self._conn,
375                                  common.APDUU(common.ApduFunction.STOPDT_CON))
376
377                mlog.debug("send data disabled")
378                self._enabled_cbs.notify(False)
379
380        elif apdu.function == common.ApduFunction.TESTFR_ACT:
381            await _write_apdu(self._conn,
382                              common.APDUU(common.ApduFunction.TESTFR_CON))
383
384        elif apdu.function == common.ApduFunction.TESTFR_CON:
385            self._test_event.set()
386
387    async def _process_apdus(self, apdu):
388        await self._set_ack(apdu.rsn)
389
390    async def _process_apdui(self, apdu):
391        await self._set_ack(apdu.rsn)
392
393        if apdu.ssn != self._rsn:
394            raise Exception('missing apdu sequence number')
395
396        self._rsn = (self._rsn + 1) % 0x8000
397        self._start_supervisory_timeout()
398
399        if apdu.data:
400            await self._receive_queue.put(apdu.data)
401
402        self._w += 1
403        if self._w >= self._receive_window_size:
404            await self._write_apdus()
405
406    async def _write_apdui(self, data):
407        if self._ssn in self._waiting_ack_handles:
408            raise Exception("can not reuse already registered ssn")
409
410        async with self._waiting_ack_cv:
411            await self._waiting_ack_cv.wait_for(
412                lambda: (len(self._waiting_ack_handles) <
413                         self._send_window_size))
414
415        if not self._is_enabled:
416            mlog.debug("send data not enabled - discarding message")
417            return
418
419        await _write_apdu(self._conn, common.APDUI(ssn=self._ssn,
420                                                   rsn=self._rsn,
421                                                   data=data))
422        self._w = 0
423        self._stop_supervisory_timeout()
424
425        handle = self._loop.call_later(self._response_timeout,
426                                       self._on_response_timeout)
427        self._waiting_ack_handles[self._ssn] = handle
428        self._ssn = (self._ssn + 1) % 0x8000
429        return handle
430
431    async def _write_apdus(self):
432        await _write_apdu(self._conn, common.APDUS(self._rsn))
433        self._w = 0
434        self._stop_supervisory_timeout()
435
436    async def _wait_ack(self, handle, future):
437        try:
438            async with self._waiting_ack_cv:
439                await self._waiting_ack_cv.wait_for(handle.cancelled)
440
441            if not future.done():
442                future.set_result(None)
443
444        finally:
445            if not future.done():
446                future.set_exception(ConnectionError())
447
448    async def _set_ack(self, ack):
449        if ack >= self._ack:
450            ssns = range(self._ack, ack)
451        else:
452            ssns = itertools.chain(range(self._ack, 0x8000), range(ack))
453
454        for ssn in ssns:
455            handle = self._waiting_ack_handles.pop(ssn, None)
456            if not handle:
457                raise Exception("received ack for unsent sequence number")
458            handle.cancel()
459
460        self._ack = ack
461        async with self._waiting_ack_cv:
462            self._waiting_ack_cv.notify_all()
463
464    def _start_supervisory_timeout(self):
465        if self._supervisory_handle:
466            return
467
468        self._supervisory_handle = self._loop.call_later(
469            self._supervisory_timeout, self._on_supervisory_timeout)
470
471    def _stop_supervisory_timeout(self):
472        if not self._supervisory_handle:
473            return
474
475        self._supervisory_handle.cancel()
476        self._supervisory_handle = None

Connection

For creating new Connection instances see connect or listen coroutine.

Connection( conn: hat.drivers.tcp.Connection, always_enabled: bool, response_timeout: float, supervisory_timeout: float, test_timeout: float, send_window_size: int, receive_window_size: int, send_queue_size: int, receive_queue_size: int)
139    def __init__(self,
140                 conn: tcp.Connection,
141                 always_enabled: bool,
142                 response_timeout: float,
143                 supervisory_timeout: float,
144                 test_timeout: float,
145                 send_window_size: int,
146                 receive_window_size: int,
147                 send_queue_size: int,
148                 receive_queue_size: int):
149        self._conn = conn
150        self._always_enabled = always_enabled
151        self._is_enabled = always_enabled
152        self._enabled_cbs = util.CallbackRegistry()
153        self._response_timeout = response_timeout
154        self._supervisory_timeout = supervisory_timeout
155        self._test_timeout = test_timeout
156        self._send_window_size = send_window_size
157        self._receive_window_size = receive_window_size
158        self._receive_queue = aio.Queue(receive_queue_size)
159        self._send_queue = aio.Queue(send_queue_size)
160        self._test_event = asyncio.Event()
161        self._ssn = 0
162        self._rsn = 0
163        self._ack = 0
164        self._w = 0
165        self._supervisory_handle = None
166        self._waiting_ack_handles = {}
167        self._waiting_ack_cv = asyncio.Condition()
168        self._loop = asyncio.get_running_loop()
169
170        self.async_group.spawn(self._read_loop)
171        self.async_group.spawn(self._write_loop)
172        self.async_group.spawn(self._test_loop)
async_group: hat.aio.group.Group
174    @property
175    def async_group(self) -> aio.Group:
176        """Async group"""
177        return self._conn.async_group

Async group

info: hat.drivers.tcp.ConnectionInfo
179    @property
180    def info(self) -> tcp.ConnectionInfo:
181        """Connection info"""
182        return self._conn.info

Connection info

ssl_object: ssl.SSLObject | ssl.SSLSocket | None
184    @property
185    def ssl_object(self) -> ssl.SSLObject | ssl.SSLSocket | None:
186        """SSL Object"""
187        return self._conn.ssl_object

SSL Object

is_enabled: bool
189    @property
190    def is_enabled(self) -> bool:
191        """Is enabled"""
192        return self._is_enabled

Is enabled

def register_enabled_cb( self, cb: Callable[[bool], NoneType]) -> hat.util.callback.RegisterCallbackHandle:
194    def register_enabled_cb(self,
195                            cb: typing.Callable[[bool], None]
196                            ) -> util.RegisterCallbackHandle:
197        """Register enable callback"""
198        return self._enabled_cbs.register(cb)

Register enable callback

async def send(self, data: bytes | bytearray | memoryview, wait_ack: bool = False):
200    async def send(self,
201                   data: util.Bytes,
202                   wait_ack: bool = False):
203        """Send data and optionally wait for acknowledgement
204
205        Raises:
206            ConnectionDisabledError
207            ConnectionError
208
209        """
210        future = self._loop.create_future() if wait_ack else None
211        entry = _SendQueueEntry(data, future, wait_ack)
212
213        try:
214            await self._send_queue.put(entry)
215
216            if wait_ack:
217                await future
218
219        except aio.QueueClosedError:
220            raise ConnectionError()

Send data and optionally wait for acknowledgement

Raises:
  • ConnectionDisabledError
  • ConnectionError
async def drain(self, wait_ack: bool = False):
222    async def drain(self, wait_ack: bool = False):
223        """Drain and optionally wait for acknowledgement
224
225        Raises:
226            ConnectionError
227
228        """
229        future = self._loop.create_future()
230        entry = _SendQueueEntry(None, future, wait_ack)
231
232        try:
233            await self._send_queue.put(entry)
234            await future
235
236        except aio.QueueClosedError:
237            raise ConnectionError()

Drain and optionally wait for acknowledgement

Raises:
  • ConnectionError
async def receive(self) -> bytes | bytearray | memoryview:
239    async def receive(self) -> util.Bytes:
240        """Receive data
241
242        Raises:
243            ConnectionError
244
245        """
246        try:
247            return await self._receive_queue.get()
248
249        except aio.QueueClosedError:
250            raise ConnectionError()

Receive data

Raises:
  • ConnectionError