hat.drivers.cotp
Connection oriented transport protocol
class
ConnectionInfo(typing.NamedTuple):
22class ConnectionInfo(typing.NamedTuple): 23 name: str | None 24 local_addr: tcp.Address 25 local_tsel: int | None 26 remote_addr: tcp.Address 27 remote_tsel: int | None
ConnectionInfo(name, local_addr, local_tsel, remote_addr, remote_tsel)
ConnectionInfo( name: str | None, local_addr: hat.drivers.tcp.Address, local_tsel: int | None, remote_addr: hat.drivers.tcp.Address, remote_tsel: int | None)
Create new instance of ConnectionInfo(name, local_addr, local_tsel, remote_addr, remote_tsel)
ConnectionCb =
typing.Callable[[ForwardRef('Connection')], None | collections.abc.Awaitable[None]]
async def
connect( addr: hat.drivers.tcp.Address, *, local_tsel: int | None = None, remote_tsel: int | None = None, cotp_receive_queue_size: int = 1024, cotp_send_queue_size: int = 1024, **kwargs) -> Connection:
34async def connect(addr: tcp.Address, 35 *, 36 local_tsel: int | None = None, 37 remote_tsel: int | None = None, 38 cotp_receive_queue_size: int = 1024, 39 cotp_send_queue_size: int = 1024, 40 **kwargs 41 ) -> 'Connection': 42 """Create new COTP connection 43 44 Additional arguments are passed directly to `hat.drivers.tpkt.connect`. 45 46 """ 47 conn = await tpkt.connect(addr, **kwargs) 48 49 try: 50 cr_tpdu = common.CR(src=next(_next_srcs), 51 cls=0, 52 calling_tsel=local_tsel, 53 called_tsel=remote_tsel, 54 max_tpdu=2048, 55 pref_max_tpdu=None) 56 cr_tpdu_bytes = encoder.encode(cr_tpdu) 57 await conn.send(cr_tpdu_bytes) 58 59 cc_tpdu_bytes = await conn.receive() 60 cc_tpdu = encoder.decode(memoryview(cc_tpdu_bytes)) 61 _validate_connect_response(cr_tpdu, cc_tpdu) 62 63 max_tpdu = _calculate_max_tpdu(cr_tpdu, cc_tpdu) 64 calling_tsel, called_tsel = _get_tsels(cr_tpdu, cc_tpdu) 65 66 return Connection(conn, max_tpdu, calling_tsel, called_tsel, 67 cotp_receive_queue_size, cotp_send_queue_size) 68 69 except BaseException: 70 await aio.uncancellable(conn.async_close()) 71 raise
Create new COTP connection
Additional arguments are passed directly to hat.drivers.tpkt.connect.
async def
listen( connection_cb: Callable[[Connection], None | Awaitable[None]], addr: hat.drivers.tcp.Address = Address(host='0.0.0.0', port=102), *, cotp_receive_queue_size: int = 1024, cotp_send_queue_size: int = 1024, **kwargs) -> Server:
74async def listen(connection_cb: ConnectionCb, 75 addr: tcp.Address = tcp.Address('0.0.0.0', 102), 76 *, 77 cotp_receive_queue_size: int = 1024, 78 cotp_send_queue_size: int = 1024, 79 **kwargs 80 ) -> 'Server': 81 """Create new COTP listening server 82 83 Additional arguments are passed directly to `hat.drivers.tpkt.listen`. 84 85 """ 86 server = Server() 87 server._connection_cb = connection_cb 88 server._receive_queue_size = cotp_receive_queue_size 89 server._send_queue_size = cotp_send_queue_size 90 server._log = mlog 91 92 server._srv = await tpkt.listen(server._on_connection, addr, **kwargs) 93 94 server._log = _create_server_logger_adapter(server._srv.info) 95 96 return server
Create new COTP listening server
Additional arguments are passed directly to hat.drivers.tpkt.listen.
class
Server(hat.aio.group.Resource):
99class Server(aio.Resource): 100 """COTP listening server 101 102 For creation of new instance see `listen` coroutine. 103 104 """ 105 106 @property 107 def async_group(self) -> aio.Group: 108 """Async group""" 109 return self._srv.async_group 110 111 @property 112 def info(self) -> tcp.ServerInfo: 113 """Server info""" 114 return self._srv.info 115 116 async def _on_connection(self, tpkt_conn): 117 try: 118 try: 119 cr_tpdu_bytes = await tpkt_conn.receive() 120 cr_tpdu = encoder.decode(memoryview(cr_tpdu_bytes)) 121 _validate_connect_request(cr_tpdu) 122 123 cc_tpdu = common.CC(dst=cr_tpdu.src, 124 src=next(_next_srcs), 125 cls=0, 126 calling_tsel=cr_tpdu.calling_tsel, 127 called_tsel=cr_tpdu.called_tsel, 128 max_tpdu=_calculate_cc_max_tpdu(cr_tpdu), 129 pref_max_tpdu=None) 130 cc_tpdu_bytes = encoder.encode(cc_tpdu) 131 await tpkt_conn.send(cc_tpdu_bytes) 132 133 max_tpdu = _calculate_max_tpdu(cr_tpdu, cc_tpdu) 134 calling_tsel, called_tsel = _get_tsels(cr_tpdu, cc_tpdu) 135 conn = Connection(tpkt_conn, max_tpdu, 136 called_tsel, calling_tsel, 137 self._receive_queue_size, 138 self._send_queue_size) 139 140 except BaseException: 141 await aio.uncancellable(tpkt_conn.async_close()) 142 raise 143 144 try: 145 await aio.call(self._connection_cb, conn) 146 147 except BaseException: 148 await aio.uncancellable(conn.async_close()) 149 raise 150 151 except Exception as e: 152 self._log.error("error creating new incomming connection: %s", 153 e, exc_info=e)
COTP listening server
For creation of new instance see listen coroutine.
async_group: hat.aio.group.Group
106 @property 107 def async_group(self) -> aio.Group: 108 """Async group""" 109 return self._srv.async_group
Async group
class
Connection(hat.aio.group.Resource):
156class Connection(aio.Resource): 157 """COTP connection 158 159 For creation of new instance see `connect` or `listen`. 160 161 """ 162 163 def __init__(self, 164 conn: tpkt.Connection, 165 max_tpdu: int, 166 local_tsel: int | None, 167 remote_tsel: int | None, 168 receive_queue_size: int, 169 send_queue_size: int): 170 self._conn = conn 171 self._max_tpdu = max_tpdu 172 self._loop = asyncio.get_running_loop() 173 self._info = ConnectionInfo(local_tsel=local_tsel, 174 remote_tsel=remote_tsel, 175 **conn.info._asdict()) 176 self._receive_queue = aio.Queue(receive_queue_size) 177 self._send_queue = aio.Queue(send_queue_size) 178 self._log = _create_connection_logger_adapter(False, self._info) 179 self._comm_log = _create_connection_logger_adapter(True, self._info) 180 181 self.async_group.spawn(self._receive_loop) 182 self.async_group.spawn(self._send_loop) 183 184 self._comm_log.debug('connection established') 185 186 @property 187 def async_group(self) -> aio.Group: 188 """Async group""" 189 return self._conn.async_group 190 191 @property 192 def info(self) -> ConnectionInfo: 193 """Connection info""" 194 return self._info 195 196 async def receive(self) -> util.Bytes: 197 """Receive data""" 198 try: 199 return await self._receive_queue.get() 200 201 except aio.QueueClosedError: 202 raise ConnectionError() 203 204 async def send(self, data: util.Bytes): 205 """Send data""" 206 try: 207 await self._send_queue.put((data, None)) 208 209 except aio.QueueClosedError: 210 raise ConnectionError() 211 212 async def drain(self): 213 """Drain output buffer""" 214 try: 215 future = self._loop.create_future() 216 await self._send_queue.put((None, future)) 217 await future 218 219 except aio.QueueClosedError: 220 raise ConnectionError() 221 222 async def _receive_loop(self): 223 try: 224 data_queue = collections.deque() 225 while True: 226 tpdu_bytes = await self._conn.receive() 227 tpdu = encoder.decode(memoryview(tpdu_bytes)) 228 229 self._comm_log.debug('received %s', tpdu) 230 231 if isinstance(tpdu, (common.DR, common.ER)): 232 self._log.info("received disconnect request / error") 233 break 234 235 if not isinstance(tpdu, common.DT): 236 continue 237 238 data_queue.append(tpdu.data) 239 240 if not tpdu.eot: 241 continue 242 243 data = bytes(itertools.chain.from_iterable(data_queue)) 244 data_queue.clear() 245 246 await self._receive_queue.put(data) 247 248 except ConnectionError: 249 pass 250 251 except Exception as e: 252 self._log.error("receive loop error: %s", e, exc_info=e) 253 254 finally: 255 self.close() 256 self._receive_queue.close() 257 258 self._comm_log.debug('connection closed') 259 260 async def _send_loop(self): 261 future = None 262 try: 263 while True: 264 data, future = await self._send_queue.get() 265 266 if data is None: 267 await self._conn.drain() 268 269 else: 270 data = memoryview(data) 271 max_size = self._max_tpdu - 3 272 273 while data: 274 single_data, data = data[:max_size], data[max_size:] 275 276 tpdu = common.DT(eot=not data, data=single_data) 277 tpdu_bytes = encoder.encode(tpdu) 278 279 self._comm_log.debug('sending %s', tpdu) 280 281 await self._conn.send(tpdu_bytes) 282 283 if future and not future.done(): 284 future.set_result(None) 285 286 except ConnectionError: 287 pass 288 289 except Exception as e: 290 self._log.error("send loop error: %s", e, exc_info=e) 291 292 finally: 293 self.close() 294 self._send_queue.close() 295 296 while True: 297 if future and not future.done(): 298 future.set_result(None) 299 if self._send_queue.empty(): 300 break 301 _, future = self._send_queue.get_nowait()
Connection( conn: hat.drivers.tpkt.Connection, max_tpdu: int, local_tsel: int | None, remote_tsel: int | None, receive_queue_size: int, send_queue_size: int)
163 def __init__(self, 164 conn: tpkt.Connection, 165 max_tpdu: int, 166 local_tsel: int | None, 167 remote_tsel: int | None, 168 receive_queue_size: int, 169 send_queue_size: int): 170 self._conn = conn 171 self._max_tpdu = max_tpdu 172 self._loop = asyncio.get_running_loop() 173 self._info = ConnectionInfo(local_tsel=local_tsel, 174 remote_tsel=remote_tsel, 175 **conn.info._asdict()) 176 self._receive_queue = aio.Queue(receive_queue_size) 177 self._send_queue = aio.Queue(send_queue_size) 178 self._log = _create_connection_logger_adapter(False, self._info) 179 self._comm_log = _create_connection_logger_adapter(True, self._info) 180 181 self.async_group.spawn(self._receive_loop) 182 self.async_group.spawn(self._send_loop) 183 184 self._comm_log.debug('connection established')
async_group: hat.aio.group.Group
186 @property 187 def async_group(self) -> aio.Group: 188 """Async group""" 189 return self._conn.async_group
Async group
async def
receive(self) -> bytes | bytearray | memoryview:
196 async def receive(self) -> util.Bytes: 197 """Receive data""" 198 try: 199 return await self._receive_queue.get() 200 201 except aio.QueueClosedError: 202 raise ConnectionError()
Receive data