hat.drivers.iec60870.apci
IEC 60870-5 APCI layer
1"""IEC 60870-5 APCI layer""" 2 3from hat.drivers.iec60870.apci.common import SequenceNumber 4from hat.drivers.iec60870.apci.connection import (ConnectionCb, 5 ConnectionDisabledError, 6 connect, 7 listen, 8 Connection) 9 10 11__all__ = ['SequenceNumber', 12 'ConnectionCb', 13 'ConnectionDisabledError', 14 'connect', 15 'listen', 16 'Connection']
SequenceNumber =
<class 'int'>
ConnectionCb =
typing.Callable[[ForwardRef('Connection')], None | collections.abc.Awaitable[None]]
class
ConnectionDisabledError(builtins.ConnectionError):
Connection error.
async def
connect( addr: hat.drivers.tcp.Address, response_timeout: float = 15, supervisory_timeout: float = 10, test_timeout: float = 20, send_window_size: int = 12, receive_window_size: int = 8, *, send_queue_size: int = 1024, receive_queue_size: int = 1024, **kwargs) -> Connection:
28async def connect(addr: tcp.Address, 29 response_timeout: float = 15, 30 supervisory_timeout: float = 10, 31 test_timeout: float = 20, 32 send_window_size: int = 12, 33 receive_window_size: int = 8, 34 *, 35 send_queue_size: int = 1024, 36 receive_queue_size: int = 1024, 37 **kwargs 38 ) -> 'Connection': 39 """Connect to remote device 40 41 Additional arguments are passed directly to `hat.drivers.tcp.connect`. 42 43 Args: 44 addr: remote server's address 45 response_timeout: response timeout (t1) in seconds 46 supervisory_timeout: supervisory timeout (t2) in seconds 47 test_timeout: test timeout (t3) in seconds 48 send_window_size: send window size (k) 49 receive_window_size: receive window size (w) 50 send_queue_size: size of send queue 51 receive_queue_size: size of receive queue 52 53 """ 54 conn = await tcp.connect(addr, **kwargs) 55 56 try: 57 apdu = common.APDUU(common.ApduFunction.STARTDT_ACT) 58 await _write_apdu(conn, apdu) 59 await aio.wait_for(_wait_startdt_con(conn), response_timeout) 60 61 except Exception: 62 await aio.uncancellable(conn.async_close()) 63 raise 64 65 return Connection(conn=conn, 66 always_enabled=True, 67 response_timeout=response_timeout, 68 supervisory_timeout=supervisory_timeout, 69 test_timeout=test_timeout, 70 send_window_size=send_window_size, 71 receive_window_size=receive_window_size, 72 send_queue_size=send_queue_size, 73 receive_queue_size=receive_queue_size)
Connect to remote device
Additional arguments are passed directly to hat.drivers.tcp.connect
.
Arguments:
- addr: remote server's address
- response_timeout: response timeout (t1) in seconds
- supervisory_timeout: supervisory timeout (t2) in seconds
- test_timeout: test timeout (t3) in seconds
- send_window_size: send window size (k)
- receive_window_size: receive window size (w)
- send_queue_size: size of send queue
- receive_queue_size: size of receive queue
async def
listen( connection_cb: Callable[[hat.drivers.acse.Connection], None | Awaitable[None]], addr: hat.drivers.tcp.Address = Address(host='0.0.0.0', port=2404), response_timeout: float = 15, supervisory_timeout: float = 10, test_timeout: float = 20, send_window_size: int = 12, receive_window_size: int = 8, *, send_queue_size: int = 1024, receive_queue_size: int = 1024, bind_connections: bool = True, **kwargs) -> hat.drivers.tcp.Server:
76async def listen(connection_cb: ConnectionCb, 77 addr: tcp.Address = tcp.Address('0.0.0.0', 2404), 78 response_timeout: float = 15, 79 supervisory_timeout: float = 10, 80 test_timeout: float = 20, 81 send_window_size: int = 12, 82 receive_window_size: int = 8, 83 *, 84 send_queue_size: int = 1024, 85 receive_queue_size: int = 1024, 86 bind_connections: bool = True, 87 **kwargs 88 ) -> tcp.Server: 89 """Create new IEC104 slave and listen for incoming connections 90 91 Additional arguments are passed directly to `hat.drivers.tcp.listen`. 92 93 Args: 94 connection_cb: new connection callback 95 addr: listening socket address 96 response_timeout: response timeout (t1) in seconds 97 supervisory_timeout: supervisory timeout (t2) in seconds 98 test_timeout: test timeout (t3) in seconds 99 send_window_size: send window size (k) 100 receive_window_size: receive window size (w) 101 bind_connections: bind connections (see `hat.drivers.tcp.listen`) 102 103 """ 104 105 async def on_connection(conn): 106 try: 107 try: 108 conn = Connection(conn=conn, 109 always_enabled=False, 110 response_timeout=response_timeout, 111 supervisory_timeout=supervisory_timeout, 112 test_timeout=test_timeout, 113 send_window_size=send_window_size, 114 receive_window_size=receive_window_size, 115 send_queue_size=send_queue_size, 116 receive_queue_size=receive_queue_size) 117 118 await aio.call(connection_cb, conn) 119 120 except BaseException: 121 await aio.uncancellable(conn.async_close()) 122 raise 123 124 except Exception as e: 125 mlog.error("on connection error: %s", e, exc_info=e) 126 127 return await tcp.listen(on_connection, addr, 128 bind_connections=bind_connections, 129 **kwargs)
Create new IEC104 slave and listen for incoming connections
Additional arguments are passed directly to hat.drivers.tcp.listen
.
Arguments:
- connection_cb: new connection callback
- addr: listening socket address
- response_timeout: response timeout (t1) in seconds
- supervisory_timeout: supervisory timeout (t2) in seconds
- test_timeout: test timeout (t3) in seconds
- send_window_size: send window size (k)
- receive_window_size: receive window size (w)
- bind_connections: bind connections (see
hat.drivers.tcp.listen
)
class
Connection(hat.aio.group.Resource):
132class Connection(aio.Resource): 133 """Connection 134 135 For creating new Connection instances see `connect` or `listen` coroutine. 136 137 """ 138 139 def __init__(self, 140 conn: tcp.Connection, 141 always_enabled: bool, 142 response_timeout: float, 143 supervisory_timeout: float, 144 test_timeout: float, 145 send_window_size: int, 146 receive_window_size: int, 147 send_queue_size: int, 148 receive_queue_size: int): 149 self._conn = conn 150 self._always_enabled = always_enabled 151 self._is_enabled = always_enabled 152 self._enabled_cbs = util.CallbackRegistry() 153 self._response_timeout = response_timeout 154 self._supervisory_timeout = supervisory_timeout 155 self._test_timeout = test_timeout 156 self._send_window_size = send_window_size 157 self._receive_window_size = receive_window_size 158 self._receive_queue = aio.Queue(receive_queue_size) 159 self._send_queue = aio.Queue(send_queue_size) 160 self._test_event = asyncio.Event() 161 self._ssn = 0 162 self._rsn = 0 163 self._ack = 0 164 self._w = 0 165 self._supervisory_handle = None 166 self._waiting_ack_handles = {} 167 self._waiting_ack_cv = asyncio.Condition() 168 self._loop = asyncio.get_running_loop() 169 170 self.async_group.spawn(self._read_loop) 171 self.async_group.spawn(self._write_loop) 172 self.async_group.spawn(self._test_loop) 173 174 @property 175 def async_group(self) -> aio.Group: 176 """Async group""" 177 return self._conn.async_group 178 179 @property 180 def info(self) -> tcp.ConnectionInfo: 181 """Connection info""" 182 return self._conn.info 183 184 @property 185 def ssl_object(self) -> ssl.SSLObject | ssl.SSLSocket | None: 186 """SSL Object""" 187 return self._conn.ssl_object 188 189 @property 190 def is_enabled(self) -> bool: 191 """Is enabled""" 192 return self._is_enabled 193 194 def register_enabled_cb(self, 195 cb: typing.Callable[[bool], None] 196 ) -> util.RegisterCallbackHandle: 197 """Register enable callback""" 198 return self._enabled_cbs.register(cb) 199 200 async def send(self, 201 data: util.Bytes, 202 wait_ack: bool = False): 203 """Send data and optionally wait for acknowledgement 204 205 Raises: 206 ConnectionDisabledError 207 ConnectionError 208 209 """ 210 future = self._loop.create_future() if wait_ack else None 211 entry = _SendQueueEntry(data, future, wait_ack) 212 213 try: 214 await self._send_queue.put(entry) 215 216 if wait_ack: 217 await future 218 219 except aio.QueueClosedError: 220 raise ConnectionError() 221 222 async def drain(self, wait_ack: bool = False): 223 """Drain and optionally wait for acknowledgement 224 225 Raises: 226 ConnectionError 227 228 """ 229 future = self._loop.create_future() 230 entry = _SendQueueEntry(None, future, wait_ack) 231 232 try: 233 await self._send_queue.put(entry) 234 await future 235 236 except aio.QueueClosedError: 237 raise ConnectionError() 238 239 async def receive(self) -> util.Bytes: 240 """Receive data 241 242 Raises: 243 ConnectionError 244 245 """ 246 try: 247 return await self._receive_queue.get() 248 249 except aio.QueueClosedError: 250 raise ConnectionError() 251 252 def _on_response_timeout(self): 253 mlog.warning("response timeout occured - closing connection") 254 self.close() 255 256 def _on_supervisory_timeout(self): 257 self.async_group.spawn(self._on_supervisory_timeout_async) 258 259 async def _on_supervisory_timeout_async(self): 260 try: 261 await self._write_apdus() 262 263 except Exception as e: 264 mlog.warning('supervisory timeout error: %s', e, exc_info=e) 265 266 async def _read_loop(self): 267 try: 268 while True: 269 apdu = await _read_apdu(self._conn) 270 271 if isinstance(apdu, common.APDUU): 272 await self._process_apduu(apdu) 273 274 elif isinstance(apdu, common.APDUS): 275 await self._process_apdus(apdu) 276 277 elif isinstance(apdu, common.APDUI): 278 await self._process_apdui(apdu) 279 280 else: 281 raise ValueError("unsupported APDU") 282 283 except (ConnectionError, aio.QueueClosedError): 284 pass 285 286 except Exception as e: 287 mlog.warning('read loop error: %s', e, exc_info=e) 288 289 finally: 290 self.close() 291 self._receive_queue.close() 292 293 async def _write_loop(self): 294 entry = None 295 296 try: 297 while True: 298 entry = await self._send_queue.get() 299 300 if entry.data is None: 301 await self._conn.drain() 302 ssn = (self._ssn or 0x8000) - 1 303 handle = self._waiting_ack_handles.get(ssn) 304 305 else: 306 handle = await self._write_apdui(entry.data) 307 if not handle and entry.future and not entry.future.done(): 308 entry.future.set_exception(ConnectionDisabledError()) 309 310 if not entry.future: 311 continue 312 313 if entry.wait_ack and handle and not entry.future.done(): 314 self.async_group.spawn(self._wait_ack, handle, 315 entry.future) 316 entry = None 317 318 elif not entry.future.done(): 319 entry.future.set_result(None) 320 321 except (ConnectionError, aio.QueueClosedError): 322 pass 323 324 except Exception as e: 325 mlog.warning('write loop error: %s', e, exc_info=e) 326 327 finally: 328 self.close() 329 self._stop_supervisory_timeout() 330 self._send_queue.close() 331 332 for f in self._waiting_ack_handles.values(): 333 f.cancel() 334 335 while True: 336 if entry and entry.future and not entry.future.done(): 337 entry.future.set_exception(ConnectionError()) 338 if self._send_queue.empty(): 339 break 340 entry = self._send_queue.get_nowait() 341 342 async def _test_loop(self): 343 # TODO: implement reset timeout on received frame (v2 5.2.) 344 try: 345 while True: 346 await asyncio.sleep(self._test_timeout) 347 348 self._test_event.clear() 349 await _write_apdu(self._conn, 350 common.APDUU(common.ApduFunction.TESTFR_ACT)) 351 352 await aio.wait_for(self._test_event.wait(), 353 self._response_timeout) 354 355 except Exception as e: 356 mlog.warning('test loop error: %s', e, exc_info=e) 357 358 finally: 359 self.close() 360 361 async def _process_apduu(self, apdu): 362 if apdu.function == common.ApduFunction.STARTDT_ACT: 363 self._is_enabled = True 364 await _write_apdu(self._conn, 365 common.APDUU(common.ApduFunction.STARTDT_CON)) 366 367 mlog.debug("send data enabled") 368 self._enabled_cbs.notify(True) 369 370 elif apdu.function == common.ApduFunction.STOPDT_ACT: 371 if not self._always_enabled: 372 await self._write_apdus() 373 self._is_enabled = False 374 await _write_apdu(self._conn, 375 common.APDUU(common.ApduFunction.STOPDT_CON)) 376 377 mlog.debug("send data disabled") 378 self._enabled_cbs.notify(False) 379 380 elif apdu.function == common.ApduFunction.TESTFR_ACT: 381 await _write_apdu(self._conn, 382 common.APDUU(common.ApduFunction.TESTFR_CON)) 383 384 elif apdu.function == common.ApduFunction.TESTFR_CON: 385 self._test_event.set() 386 387 async def _process_apdus(self, apdu): 388 await self._set_ack(apdu.rsn) 389 390 async def _process_apdui(self, apdu): 391 await self._set_ack(apdu.rsn) 392 393 if apdu.ssn != self._rsn: 394 raise Exception('missing apdu sequence number') 395 396 self._rsn = (self._rsn + 1) % 0x8000 397 self._start_supervisory_timeout() 398 399 if apdu.data: 400 await self._receive_queue.put(apdu.data) 401 402 self._w += 1 403 if self._w >= self._receive_window_size: 404 await self._write_apdus() 405 406 async def _write_apdui(self, data): 407 if self._ssn in self._waiting_ack_handles: 408 raise Exception("can not reuse already registered ssn") 409 410 async with self._waiting_ack_cv: 411 await self._waiting_ack_cv.wait_for( 412 lambda: (len(self._waiting_ack_handles) < 413 self._send_window_size)) 414 415 if not self._is_enabled: 416 mlog.debug("send data not enabled - discarding message") 417 return 418 419 await _write_apdu(self._conn, common.APDUI(ssn=self._ssn, 420 rsn=self._rsn, 421 data=data)) 422 self._w = 0 423 self._stop_supervisory_timeout() 424 425 handle = self._loop.call_later(self._response_timeout, 426 self._on_response_timeout) 427 self._waiting_ack_handles[self._ssn] = handle 428 self._ssn = (self._ssn + 1) % 0x8000 429 return handle 430 431 async def _write_apdus(self): 432 await _write_apdu(self._conn, common.APDUS(self._rsn)) 433 self._w = 0 434 self._stop_supervisory_timeout() 435 436 async def _wait_ack(self, handle, future): 437 try: 438 async with self._waiting_ack_cv: 439 await self._waiting_ack_cv.wait_for(handle.cancelled) 440 441 if not future.done(): 442 future.set_result(None) 443 444 finally: 445 if not future.done(): 446 future.set_exception(ConnectionError()) 447 448 async def _set_ack(self, ack): 449 if ack >= self._ack: 450 ssns = range(self._ack, ack) 451 else: 452 ssns = itertools.chain(range(self._ack, 0x8000), range(ack)) 453 454 for ssn in ssns: 455 handle = self._waiting_ack_handles.pop(ssn, None) 456 if not handle: 457 raise Exception("received ack for unsent sequence number") 458 handle.cancel() 459 460 self._ack = ack 461 async with self._waiting_ack_cv: 462 self._waiting_ack_cv.notify_all() 463 464 def _start_supervisory_timeout(self): 465 if self._supervisory_handle: 466 return 467 468 self._supervisory_handle = self._loop.call_later( 469 self._supervisory_timeout, self._on_supervisory_timeout) 470 471 def _stop_supervisory_timeout(self): 472 if not self._supervisory_handle: 473 return 474 475 self._supervisory_handle.cancel() 476 self._supervisory_handle = None
Connection( conn: hat.drivers.tcp.Connection, always_enabled: bool, response_timeout: float, supervisory_timeout: float, test_timeout: float, send_window_size: int, receive_window_size: int, send_queue_size: int, receive_queue_size: int)
139 def __init__(self, 140 conn: tcp.Connection, 141 always_enabled: bool, 142 response_timeout: float, 143 supervisory_timeout: float, 144 test_timeout: float, 145 send_window_size: int, 146 receive_window_size: int, 147 send_queue_size: int, 148 receive_queue_size: int): 149 self._conn = conn 150 self._always_enabled = always_enabled 151 self._is_enabled = always_enabled 152 self._enabled_cbs = util.CallbackRegistry() 153 self._response_timeout = response_timeout 154 self._supervisory_timeout = supervisory_timeout 155 self._test_timeout = test_timeout 156 self._send_window_size = send_window_size 157 self._receive_window_size = receive_window_size 158 self._receive_queue = aio.Queue(receive_queue_size) 159 self._send_queue = aio.Queue(send_queue_size) 160 self._test_event = asyncio.Event() 161 self._ssn = 0 162 self._rsn = 0 163 self._ack = 0 164 self._w = 0 165 self._supervisory_handle = None 166 self._waiting_ack_handles = {} 167 self._waiting_ack_cv = asyncio.Condition() 168 self._loop = asyncio.get_running_loop() 169 170 self.async_group.spawn(self._read_loop) 171 self.async_group.spawn(self._write_loop) 172 self.async_group.spawn(self._test_loop)
async_group: hat.aio.group.Group
174 @property 175 def async_group(self) -> aio.Group: 176 """Async group""" 177 return self._conn.async_group
Async group
179 @property 180 def info(self) -> tcp.ConnectionInfo: 181 """Connection info""" 182 return self._conn.info
Connection info
ssl_object: ssl.SSLObject | ssl.SSLSocket | None
184 @property 185 def ssl_object(self) -> ssl.SSLObject | ssl.SSLSocket | None: 186 """SSL Object""" 187 return self._conn.ssl_object
SSL Object
def
register_enabled_cb( self, cb: Callable[[bool], NoneType]) -> hat.util.callback.RegisterCallbackHandle:
194 def register_enabled_cb(self, 195 cb: typing.Callable[[bool], None] 196 ) -> util.RegisterCallbackHandle: 197 """Register enable callback""" 198 return self._enabled_cbs.register(cb)
Register enable callback
async def
send(self, data: bytes | bytearray | memoryview, wait_ack: bool = False):
200 async def send(self, 201 data: util.Bytes, 202 wait_ack: bool = False): 203 """Send data and optionally wait for acknowledgement 204 205 Raises: 206 ConnectionDisabledError 207 ConnectionError 208 209 """ 210 future = self._loop.create_future() if wait_ack else None 211 entry = _SendQueueEntry(data, future, wait_ack) 212 213 try: 214 await self._send_queue.put(entry) 215 216 if wait_ack: 217 await future 218 219 except aio.QueueClosedError: 220 raise ConnectionError()
Send data and optionally wait for acknowledgement
Raises:
- ConnectionDisabledError
- ConnectionError
async def
drain(self, wait_ack: bool = False):
222 async def drain(self, wait_ack: bool = False): 223 """Drain and optionally wait for acknowledgement 224 225 Raises: 226 ConnectionError 227 228 """ 229 future = self._loop.create_future() 230 entry = _SendQueueEntry(None, future, wait_ack) 231 232 try: 233 await self._send_queue.put(entry) 234 await future 235 236 except aio.QueueClosedError: 237 raise ConnectionError()
Drain and optionally wait for acknowledgement
Raises:
- ConnectionError
async def
receive(self) -> bytes | bytearray | memoryview:
239 async def receive(self) -> util.Bytes: 240 """Receive data 241 242 Raises: 243 ConnectionError 244 245 """ 246 try: 247 return await self._receive_queue.get() 248 249 except aio.QueueClosedError: 250 raise ConnectionError()
Receive data
Raises:
- ConnectionError