hat.drivers.iec60870.apci
IEC 60870-5 APCI layer
1"""IEC 60870-5 APCI layer""" 2 3from hat.drivers.iec60870.apci.common import SequenceNumber 4from hat.drivers.iec60870.apci.connection import (ConnectionCb, 5 ConnectionDisabledError, 6 connect, 7 listen, 8 Connection) 9 10 11__all__ = ['SequenceNumber', 12 'ConnectionCb', 13 'ConnectionDisabledError', 14 'connect', 15 'listen', 16 'Connection']
SequenceNumber =
<class 'int'>
ConnectionCb =
typing.Callable[[ForwardRef('Connection')], None | collections.abc.Awaitable[None]]
class
ConnectionDisabledError(builtins.ConnectionError):
Connection error.
async def
connect( addr: hat.drivers.tcp.Address, response_timeout: float = 15, supervisory_timeout: float = 10, test_timeout: float = 20, send_window_size: int = 12, receive_window_size: int = 8, *, send_queue_size: int = 1024, receive_queue_size: int = 1024, **kwargs) -> Connection:
28async def connect(addr: tcp.Address, 29 response_timeout: float = 15, 30 supervisory_timeout: float = 10, 31 test_timeout: float = 20, 32 send_window_size: int = 12, 33 receive_window_size: int = 8, 34 *, 35 send_queue_size: int = 1024, 36 receive_queue_size: int = 1024, 37 **kwargs 38 ) -> 'Connection': 39 """Connect to remote device 40 41 Additional arguments are passed directly to `hat.drivers.tcp.connect`. 42 43 Args: 44 addr: remote server's address 45 response_timeout: response timeout (t1) in seconds 46 supervisory_timeout: supervisory timeout (t2) in seconds 47 test_timeout: test timeout (t3) in seconds 48 send_window_size: send window size (k) 49 receive_window_size: receive window size (w) 50 send_queue_size: size of send queue 51 receive_queue_size: size of receive queue 52 53 """ 54 conn = await tcp.connect(addr, **kwargs) 55 56 try: 57 transport = Transport(conn) 58 59 apdu = common.APDUU(common.ApduFunction.STARTDT_ACT) 60 await transport.write(apdu) 61 62 await aio.wait_for(_wait_startdt_con(transport), response_timeout) 63 64 except BaseException: 65 await aio.uncancellable(conn.async_close()) 66 raise 67 68 return Connection(transport=transport, 69 always_enabled=True, 70 response_timeout=response_timeout, 71 supervisory_timeout=supervisory_timeout, 72 test_timeout=test_timeout, 73 send_window_size=send_window_size, 74 receive_window_size=receive_window_size, 75 send_queue_size=send_queue_size, 76 receive_queue_size=receive_queue_size)
Connect to remote device
Additional arguments are passed directly to hat.drivers.tcp.connect.
Arguments:
- addr: remote server's address
- response_timeout: response timeout (t1) in seconds
- supervisory_timeout: supervisory timeout (t2) in seconds
- test_timeout: test timeout (t3) in seconds
- send_window_size: send window size (k)
- receive_window_size: receive window size (w)
- send_queue_size: size of send queue
- receive_queue_size: size of receive queue
async def
listen( connection_cb: Callable[[Connection], None | Awaitable[None]], addr: hat.drivers.tcp.Address = Address(host='0.0.0.0', port=2404), response_timeout: float = 15, supervisory_timeout: float = 10, test_timeout: float = 20, send_window_size: int = 12, receive_window_size: int = 8, *, send_queue_size: int = 1024, receive_queue_size: int = 1024, bind_connections: bool = True, **kwargs) -> hat.drivers.tcp.Server:
79async def listen(connection_cb: ConnectionCb, 80 addr: tcp.Address = tcp.Address('0.0.0.0', 2404), 81 response_timeout: float = 15, 82 supervisory_timeout: float = 10, 83 test_timeout: float = 20, 84 send_window_size: int = 12, 85 receive_window_size: int = 8, 86 *, 87 send_queue_size: int = 1024, 88 receive_queue_size: int = 1024, 89 bind_connections: bool = True, 90 **kwargs 91 ) -> tcp.Server: 92 """Create new IEC104 slave and listen for incoming connections 93 94 Additional arguments are passed directly to `hat.drivers.tcp.listen`. 95 96 Args: 97 connection_cb: new connection callback 98 addr: listening socket address 99 response_timeout: response timeout (t1) in seconds 100 supervisory_timeout: supervisory timeout (t2) in seconds 101 test_timeout: test timeout (t3) in seconds 102 send_window_size: send window size (k) 103 receive_window_size: receive window size (w) 104 bind_connections: bind connections (see `hat.drivers.tcp.listen`) 105 106 """ 107 108 log = mlog 109 110 async def on_connection(conn): 111 try: 112 try: 113 conn = Connection(transport=Transport(conn), 114 always_enabled=False, 115 response_timeout=response_timeout, 116 supervisory_timeout=supervisory_timeout, 117 test_timeout=test_timeout, 118 send_window_size=send_window_size, 119 receive_window_size=receive_window_size, 120 send_queue_size=send_queue_size, 121 receive_queue_size=receive_queue_size) 122 123 await aio.call(connection_cb, conn) 124 125 except BaseException: 126 await aio.uncancellable(conn.async_close()) 127 raise 128 129 except Exception as e: 130 log.error("on connection error: %s", e, exc_info=e) 131 132 server = await tcp.listen(on_connection, addr, 133 bind_connections=bind_connections, 134 **kwargs) 135 136 log = _create_server_logger_adapter(server.info) 137 138 return server
Create new IEC104 slave and listen for incoming connections
Additional arguments are passed directly to hat.drivers.tcp.listen.
Arguments:
- connection_cb: new connection callback
- addr: listening socket address
- response_timeout: response timeout (t1) in seconds
- supervisory_timeout: supervisory timeout (t2) in seconds
- test_timeout: test timeout (t3) in seconds
- send_window_size: send window size (k)
- receive_window_size: receive window size (w)
- bind_connections: bind connections (see
hat.drivers.tcp.listen)
class
Connection(hat.aio.group.Resource):
141class Connection(aio.Resource): 142 """Connection 143 144 For creating new Connection instances see `connect` or `listen` coroutine. 145 146 """ 147 148 def __init__(self, 149 transport: Transport, 150 always_enabled: bool, 151 response_timeout: float, 152 supervisory_timeout: float, 153 test_timeout: float, 154 send_window_size: int, 155 receive_window_size: int, 156 send_queue_size: int, 157 receive_queue_size: int): 158 self._transport = transport 159 self._always_enabled = always_enabled 160 self._is_enabled = always_enabled 161 self._enabled_cbs = util.CallbackRegistry() 162 self._response_timeout = response_timeout 163 self._supervisory_timeout = supervisory_timeout 164 self._test_timeout = test_timeout 165 self._send_window_size = send_window_size 166 self._receive_window_size = receive_window_size 167 self._receive_queue = aio.Queue(receive_queue_size) 168 self._send_queue = aio.Queue(send_queue_size) 169 self._test_event = asyncio.Event() 170 self._ssn = 0 171 self._rsn = 0 172 self._ack = 0 173 self._w = 0 174 self._supervisory_handle = None 175 self._waiting_ack_handles = {} 176 self._waiting_ack_cv = asyncio.Condition() 177 self._loop = asyncio.get_running_loop() 178 self._log = _create_connection_logger_adapter(transport.info) 179 180 self.async_group.spawn(self._read_loop) 181 self.async_group.spawn(self._write_loop) 182 self.async_group.spawn(self._test_loop) 183 184 @property 185 def async_group(self) -> aio.Group: 186 """Async group""" 187 return self._transport.async_group 188 189 @property 190 def info(self) -> tcp.ConnectionInfo: 191 """Connection info""" 192 return self._transport.info 193 194 @property 195 def ssl_object(self) -> ssl.SSLObject | ssl.SSLSocket | None: 196 """SSL Object""" 197 return self._transport.ssl_object 198 199 @property 200 def is_enabled(self) -> bool: 201 """Is enabled""" 202 return self._is_enabled 203 204 def register_enabled_cb(self, 205 cb: typing.Callable[[bool], None] 206 ) -> util.RegisterCallbackHandle: 207 """Register enable callback""" 208 return self._enabled_cbs.register(cb) 209 210 async def send(self, 211 data: util.Bytes, 212 wait_ack: bool = False): 213 """Send data and optionally wait for acknowledgement 214 215 Raises: 216 ConnectionDisabledError 217 ConnectionError 218 219 """ 220 future = self._loop.create_future() if wait_ack else None 221 entry = _SendQueueEntry(data, future, wait_ack) 222 223 try: 224 await self._send_queue.put(entry) 225 226 if wait_ack: 227 await future 228 229 except aio.QueueClosedError: 230 raise ConnectionError() 231 232 async def drain(self, wait_ack: bool = False): 233 """Drain and optionally wait for acknowledgement 234 235 Raises: 236 ConnectionError 237 238 """ 239 future = self._loop.create_future() 240 entry = _SendQueueEntry(None, future, wait_ack) 241 242 try: 243 await self._send_queue.put(entry) 244 await future 245 246 except aio.QueueClosedError: 247 raise ConnectionError() 248 249 async def receive(self) -> util.Bytes: 250 """Receive data 251 252 Raises: 253 ConnectionError 254 255 """ 256 try: 257 return await self._receive_queue.get() 258 259 except aio.QueueClosedError: 260 raise ConnectionError() 261 262 def _on_response_timeout(self): 263 self._log.warning("response timeout occured - closing connection") 264 self.close() 265 266 def _on_supervisory_timeout(self): 267 self.async_group.spawn(self._on_supervisory_timeout_async) 268 269 async def _on_supervisory_timeout_async(self): 270 try: 271 await self._write_apdus() 272 273 except Exception as e: 274 self._log.warning('supervisory timeout error: %s', e, exc_info=e) 275 276 async def _read_loop(self): 277 try: 278 while True: 279 apdu = await self._transport.read() 280 281 if isinstance(apdu, common.APDUU): 282 await self._process_apduu(apdu) 283 284 elif isinstance(apdu, common.APDUS): 285 await self._process_apdus(apdu) 286 287 elif isinstance(apdu, common.APDUI): 288 await self._process_apdui(apdu) 289 290 else: 291 raise ValueError("unsupported APDU") 292 293 except (ConnectionError, aio.QueueClosedError): 294 pass 295 296 except Exception as e: 297 self._log.warning('read loop error: %s', e, exc_info=e) 298 299 finally: 300 self.close() 301 self._receive_queue.close() 302 303 async def _write_loop(self): 304 entry = None 305 306 try: 307 while True: 308 entry = await self._send_queue.get() 309 310 if entry.data is None: 311 await self._transport.drain() 312 ssn = (self._ssn or 0x8000) - 1 313 handle = self._waiting_ack_handles.get(ssn) 314 315 else: 316 handle = await self._write_apdui(entry.data) 317 if not handle and entry.future and not entry.future.done(): 318 entry.future.set_exception(ConnectionDisabledError()) 319 320 if not entry.future: 321 continue 322 323 if entry.wait_ack and handle and not entry.future.done(): 324 self.async_group.spawn(self._wait_ack, handle, 325 entry.future) 326 entry = None 327 328 elif not entry.future.done(): 329 entry.future.set_result(None) 330 331 except (ConnectionError, aio.QueueClosedError): 332 pass 333 334 except Exception as e: 335 self._log.warning('write loop error: %s', e, exc_info=e) 336 337 finally: 338 self.close() 339 self._stop_supervisory_timeout() 340 self._send_queue.close() 341 342 for f in self._waiting_ack_handles.values(): 343 f.cancel() 344 345 while True: 346 if entry and entry.future and not entry.future.done(): 347 entry.future.set_exception(ConnectionError()) 348 if self._send_queue.empty(): 349 break 350 entry = self._send_queue.get_nowait() 351 352 async def _test_loop(self): 353 # TODO: implement reset timeout on received frame (v2 5.2.) 354 try: 355 while True: 356 await asyncio.sleep(self._test_timeout) 357 358 self._test_event.clear() 359 await self._transport.write( 360 common.APDUU(common.ApduFunction.TESTFR_ACT)) 361 362 await aio.wait_for(self._test_event.wait(), 363 self._response_timeout) 364 365 except Exception as e: 366 self._log.warning('test loop error: %s', e, exc_info=e) 367 368 finally: 369 self.close() 370 371 async def _process_apduu(self, apdu): 372 if apdu.function == common.ApduFunction.STARTDT_ACT: 373 self._is_enabled = True 374 await self._transport.write( 375 common.APDUU(common.ApduFunction.STARTDT_CON)) 376 377 self._log.debug("send data enabled") 378 self._enabled_cbs.notify(True) 379 380 elif apdu.function == common.ApduFunction.STOPDT_ACT: 381 if not self._always_enabled: 382 await self._write_apdus() 383 self._is_enabled = False 384 await self._transport.write( 385 common.APDUU(common.ApduFunction.STOPDT_CON)) 386 387 self._log.debug("send data disabled") 388 self._enabled_cbs.notify(False) 389 390 elif apdu.function == common.ApduFunction.TESTFR_ACT: 391 await self._transport.write( 392 common.APDUU(common.ApduFunction.TESTFR_CON)) 393 394 elif apdu.function == common.ApduFunction.TESTFR_CON: 395 self._test_event.set() 396 397 async def _process_apdus(self, apdu): 398 await self._set_ack(apdu.rsn) 399 400 async def _process_apdui(self, apdu): 401 await self._set_ack(apdu.rsn) 402 403 if apdu.ssn != self._rsn: 404 raise Exception('missing apdu sequence number') 405 406 self._rsn = (self._rsn + 1) % 0x8000 407 self._start_supervisory_timeout() 408 409 if apdu.data: 410 await self._receive_queue.put(apdu.data) 411 412 self._w += 1 413 if self._w >= self._receive_window_size: 414 await self._write_apdus() 415 416 async def _write_apdui(self, data): 417 if self._ssn in self._waiting_ack_handles: 418 raise Exception("can not reuse already registered ssn") 419 420 async with self._waiting_ack_cv: 421 await self._waiting_ack_cv.wait_for( 422 lambda: (len(self._waiting_ack_handles) < 423 self._send_window_size)) 424 425 if not self._is_enabled: 426 self._log.debug("send data not enabled - discarding message") 427 return 428 429 await self._transport.write(common.APDUI(ssn=self._ssn, 430 rsn=self._rsn, 431 data=data)) 432 self._w = 0 433 self._stop_supervisory_timeout() 434 435 handle = self._loop.call_later(self._response_timeout, 436 self._on_response_timeout) 437 self._waiting_ack_handles[self._ssn] = handle 438 self._ssn = (self._ssn + 1) % 0x8000 439 return handle 440 441 async def _write_apdus(self): 442 await self._transport.write(common.APDUS(self._rsn)) 443 self._w = 0 444 self._stop_supervisory_timeout() 445 446 async def _wait_ack(self, handle, future): 447 try: 448 async with self._waiting_ack_cv: 449 await self._waiting_ack_cv.wait_for(handle.cancelled) 450 451 if not future.done(): 452 future.set_result(None) 453 454 finally: 455 if not future.done(): 456 future.set_exception(ConnectionError()) 457 458 async def _set_ack(self, ack): 459 if ack >= self._ack: 460 ssns = range(self._ack, ack) 461 else: 462 ssns = itertools.chain(range(self._ack, 0x8000), range(ack)) 463 464 for ssn in ssns: 465 handle = self._waiting_ack_handles.pop(ssn, None) 466 if not handle: 467 raise Exception("received ack for unsent sequence number") 468 handle.cancel() 469 470 self._ack = ack 471 async with self._waiting_ack_cv: 472 self._waiting_ack_cv.notify_all() 473 474 def _start_supervisory_timeout(self): 475 if self._supervisory_handle: 476 return 477 478 self._supervisory_handle = self._loop.call_later( 479 self._supervisory_timeout, self._on_supervisory_timeout) 480 481 def _stop_supervisory_timeout(self): 482 if not self._supervisory_handle: 483 return 484 485 self._supervisory_handle.cancel() 486 self._supervisory_handle = None
Connection( transport: hat.drivers.iec60870.apci.transport.Transport, always_enabled: bool, response_timeout: float, supervisory_timeout: float, test_timeout: float, send_window_size: int, receive_window_size: int, send_queue_size: int, receive_queue_size: int)
148 def __init__(self, 149 transport: Transport, 150 always_enabled: bool, 151 response_timeout: float, 152 supervisory_timeout: float, 153 test_timeout: float, 154 send_window_size: int, 155 receive_window_size: int, 156 send_queue_size: int, 157 receive_queue_size: int): 158 self._transport = transport 159 self._always_enabled = always_enabled 160 self._is_enabled = always_enabled 161 self._enabled_cbs = util.CallbackRegistry() 162 self._response_timeout = response_timeout 163 self._supervisory_timeout = supervisory_timeout 164 self._test_timeout = test_timeout 165 self._send_window_size = send_window_size 166 self._receive_window_size = receive_window_size 167 self._receive_queue = aio.Queue(receive_queue_size) 168 self._send_queue = aio.Queue(send_queue_size) 169 self._test_event = asyncio.Event() 170 self._ssn = 0 171 self._rsn = 0 172 self._ack = 0 173 self._w = 0 174 self._supervisory_handle = None 175 self._waiting_ack_handles = {} 176 self._waiting_ack_cv = asyncio.Condition() 177 self._loop = asyncio.get_running_loop() 178 self._log = _create_connection_logger_adapter(transport.info) 179 180 self.async_group.spawn(self._read_loop) 181 self.async_group.spawn(self._write_loop) 182 self.async_group.spawn(self._test_loop)
async_group: hat.aio.group.Group
184 @property 185 def async_group(self) -> aio.Group: 186 """Async group""" 187 return self._transport.async_group
Async group
189 @property 190 def info(self) -> tcp.ConnectionInfo: 191 """Connection info""" 192 return self._transport.info
Connection info
ssl_object: ssl.SSLObject | ssl.SSLSocket | None
194 @property 195 def ssl_object(self) -> ssl.SSLObject | ssl.SSLSocket | None: 196 """SSL Object""" 197 return self._transport.ssl_object
SSL Object
def
register_enabled_cb( self, cb: Callable[[bool], NoneType]) -> hat.util.callback.RegisterCallbackHandle:
204 def register_enabled_cb(self, 205 cb: typing.Callable[[bool], None] 206 ) -> util.RegisterCallbackHandle: 207 """Register enable callback""" 208 return self._enabled_cbs.register(cb)
Register enable callback
async def
send(self, data: bytes | bytearray | memoryview, wait_ack: bool = False):
210 async def send(self, 211 data: util.Bytes, 212 wait_ack: bool = False): 213 """Send data and optionally wait for acknowledgement 214 215 Raises: 216 ConnectionDisabledError 217 ConnectionError 218 219 """ 220 future = self._loop.create_future() if wait_ack else None 221 entry = _SendQueueEntry(data, future, wait_ack) 222 223 try: 224 await self._send_queue.put(entry) 225 226 if wait_ack: 227 await future 228 229 except aio.QueueClosedError: 230 raise ConnectionError()
Send data and optionally wait for acknowledgement
Raises:
- ConnectionDisabledError
- ConnectionError
async def
drain(self, wait_ack: bool = False):
232 async def drain(self, wait_ack: bool = False): 233 """Drain and optionally wait for acknowledgement 234 235 Raises: 236 ConnectionError 237 238 """ 239 future = self._loop.create_future() 240 entry = _SendQueueEntry(None, future, wait_ack) 241 242 try: 243 await self._send_queue.put(entry) 244 await future 245 246 except aio.QueueClosedError: 247 raise ConnectionError()
Drain and optionally wait for acknowledgement
Raises:
- ConnectionError
async def
receive(self) -> bytes | bytearray | memoryview:
249 async def receive(self) -> util.Bytes: 250 """Receive data 251 252 Raises: 253 ConnectionError 254 255 """ 256 try: 257 return await self._receive_queue.get() 258 259 except aio.QueueClosedError: 260 raise ConnectionError()
Receive data
Raises:
- ConnectionError