hat.drivers.cotp
Connection oriented transport protocol
class
ConnectionInfo(typing.NamedTuple):
22class ConnectionInfo(typing.NamedTuple): 23 local_addr: tcp.Address 24 local_tsel: int | None 25 remote_addr: tcp.Address 26 remote_tsel: int | None
ConnectionInfo(local_addr, local_tsel, remote_addr, remote_tsel)
ConnectionInfo( local_addr: hat.drivers.tcp.Address, local_tsel: int | None, remote_addr: hat.drivers.tcp.Address, remote_tsel: int | None)
Create new instance of ConnectionInfo(local_addr, local_tsel, remote_addr, remote_tsel)
ConnectionCb =
typing.Callable[[ForwardRef('Connection')], None | collections.abc.Awaitable[None]]
async def
connect( addr: hat.drivers.tcp.Address, *, local_tsel: int | None = None, remote_tsel: int | None = None, cotp_receive_queue_size: int = 1024, cotp_send_queue_size: int = 1024, **kwargs) -> Connection:
33async def connect(addr: tcp.Address, 34 *, 35 local_tsel: int | None = None, 36 remote_tsel: int | None = None, 37 cotp_receive_queue_size: int = 1024, 38 cotp_send_queue_size: int = 1024, 39 **kwargs 40 ) -> 'Connection': 41 """Create new COTP connection 42 43 Additional arguments are passed directly to `hat.drivers.tpkt.connect`. 44 45 """ 46 conn = await tpkt.connect(addr, **kwargs) 47 48 try: 49 cr_tpdu = common.CR(src=next(_next_srcs), 50 cls=0, 51 calling_tsel=local_tsel, 52 called_tsel=remote_tsel, 53 max_tpdu=2048, 54 pref_max_tpdu=None) 55 cr_tpdu_bytes = encoder.encode(cr_tpdu) 56 await conn.send(cr_tpdu_bytes) 57 58 cc_tpdu_bytes = await conn.receive() 59 cc_tpdu = encoder.decode(memoryview(cc_tpdu_bytes)) 60 _validate_connect_response(cr_tpdu, cc_tpdu) 61 62 max_tpdu = _calculate_max_tpdu(cr_tpdu, cc_tpdu) 63 calling_tsel, called_tsel = _get_tsels(cr_tpdu, cc_tpdu) 64 65 return Connection(conn, max_tpdu, calling_tsel, called_tsel, 66 cotp_receive_queue_size, cotp_send_queue_size) 67 68 except BaseException: 69 await aio.uncancellable(conn.async_close()) 70 raise
Create new COTP connection
Additional arguments are passed directly to hat.drivers.tpkt.connect
.
async def
listen( connection_cb: Callable[[hat.drivers.acse.Connection], None | Awaitable[None]], addr: hat.drivers.tcp.Address = Address(host='0.0.0.0', port=102), *, cotp_receive_queue_size: int = 1024, cotp_send_queue_size: int = 1024, **kwargs) -> Server:
73async def listen(connection_cb: ConnectionCb, 74 addr: tcp.Address = tcp.Address('0.0.0.0', 102), 75 *, 76 cotp_receive_queue_size: int = 1024, 77 cotp_send_queue_size: int = 1024, 78 **kwargs 79 ) -> 'Server': 80 """Create new COTP listening server 81 82 Additional arguments are passed directly to `hat.drivers.tpkt.listen`. 83 84 """ 85 server = Server() 86 server._connection_cb = connection_cb 87 server._receive_queue_size = cotp_receive_queue_size 88 server._send_queue_size = cotp_send_queue_size 89 90 server._srv = await tpkt.listen(server._on_connection, addr, **kwargs) 91 92 return server
Create new COTP listening server
Additional arguments are passed directly to hat.drivers.tpkt.listen
.
class
Server(hat.aio.group.Resource):
95class Server(aio.Resource): 96 """COTP listening server 97 98 For creation of new instance see `listen` coroutine. 99 100 """ 101 102 @property 103 def async_group(self) -> aio.Group: 104 """Async group""" 105 return self._srv.async_group 106 107 @property 108 def addresses(self) -> list[tcp.Address]: 109 """Listening addresses""" 110 return self._srv.addresses 111 112 async def _on_connection(self, tpkt_conn): 113 try: 114 try: 115 cr_tpdu_bytes = await tpkt_conn.receive() 116 cr_tpdu = encoder.decode(memoryview(cr_tpdu_bytes)) 117 _validate_connect_request(cr_tpdu) 118 119 cc_tpdu = common.CC(dst=cr_tpdu.src, 120 src=next(_next_srcs), 121 cls=0, 122 calling_tsel=cr_tpdu.calling_tsel, 123 called_tsel=cr_tpdu.called_tsel, 124 max_tpdu=_calculate_cc_max_tpdu(cr_tpdu), 125 pref_max_tpdu=None) 126 cc_tpdu_bytes = encoder.encode(cc_tpdu) 127 await tpkt_conn.send(cc_tpdu_bytes) 128 129 max_tpdu = _calculate_max_tpdu(cr_tpdu, cc_tpdu) 130 calling_tsel, called_tsel = _get_tsels(cr_tpdu, cc_tpdu) 131 conn = Connection(tpkt_conn, max_tpdu, 132 called_tsel, calling_tsel, 133 self._receive_queue_size, 134 self._send_queue_size) 135 136 except BaseException: 137 await aio.uncancellable(tpkt_conn.async_close()) 138 raise 139 140 try: 141 await aio.call(self._connection_cb, conn) 142 143 except BaseException: 144 await aio.uncancellable(conn.async_close()) 145 raise 146 147 except Exception as e: 148 mlog.error("error creating new incomming connection: %s", e, 149 exc_info=e)
COTP listening server
For creation of new instance see listen
coroutine.
async_group: hat.aio.group.Group
102 @property 103 def async_group(self) -> aio.Group: 104 """Async group""" 105 return self._srv.async_group
Async group
addresses: list[hat.drivers.tcp.Address]
107 @property 108 def addresses(self) -> list[tcp.Address]: 109 """Listening addresses""" 110 return self._srv.addresses
Listening addresses
class
Connection(hat.aio.group.Resource):
152class Connection(aio.Resource): 153 """COTP connection 154 155 For creation of new instance see `connect` or `listen`. 156 157 """ 158 159 def __init__(self, 160 conn: tpkt.Connection, 161 max_tpdu: int, 162 local_tsel: int | None, 163 remote_tsel: int | None, 164 receive_queue_size: int, 165 send_queue_size: int): 166 self._conn = conn 167 self._max_tpdu = max_tpdu 168 self._loop = asyncio.get_running_loop() 169 self._info = ConnectionInfo(local_tsel=local_tsel, 170 remote_tsel=remote_tsel, 171 **conn.info._asdict()) 172 self._receive_queue = aio.Queue(receive_queue_size) 173 self._send_queue = aio.Queue(send_queue_size) 174 175 self.async_group.spawn(self._receive_loop) 176 self.async_group.spawn(self._send_loop) 177 178 @property 179 def async_group(self) -> aio.Group: 180 """Async group""" 181 return self._conn.async_group 182 183 @property 184 def info(self) -> ConnectionInfo: 185 """Connection info""" 186 return self._info 187 188 async def receive(self) -> util.Bytes: 189 """Receive data""" 190 try: 191 return await self._receive_queue.get() 192 193 except aio.QueueClosedError: 194 raise ConnectionError() 195 196 async def send(self, data: util.Bytes): 197 """Send data""" 198 try: 199 await self._send_queue.put((data, None)) 200 201 except aio.QueueClosedError: 202 raise ConnectionError() 203 204 async def drain(self): 205 """Drain output buffer""" 206 try: 207 future = self._loop.create_future() 208 await self._send_queue.put((None, future)) 209 await future 210 211 except aio.QueueClosedError: 212 raise ConnectionError() 213 214 async def _receive_loop(self): 215 try: 216 data_queue = collections.deque() 217 while True: 218 tpdu_bytes = await self._conn.receive() 219 tpdu = encoder.decode(memoryview(tpdu_bytes)) 220 221 if isinstance(tpdu, (common.DR, common.ER)): 222 mlog.info("received disconnect request / error") 223 break 224 225 if not isinstance(tpdu, common.DT): 226 continue 227 228 data_queue.append(tpdu.data) 229 230 if not tpdu.eot: 231 continue 232 233 data = bytes(itertools.chain.from_iterable(data_queue)) 234 data_queue.clear() 235 236 await self._receive_queue.put(data) 237 238 except ConnectionError: 239 pass 240 241 except Exception as e: 242 mlog.error("receive loop error: %s", e, exc_info=e) 243 244 finally: 245 self.close() 246 self._receive_queue.close() 247 248 async def _send_loop(self): 249 future = None 250 try: 251 while True: 252 data, future = await self._send_queue.get() 253 254 if data is None: 255 await self._conn.drain() 256 257 else: 258 data = memoryview(data) 259 max_size = self._max_tpdu - 3 260 261 while data: 262 single_data, data = data[:max_size], data[max_size:] 263 264 tpdu = common.DT(eot=not data, data=single_data) 265 tpdu_bytes = encoder.encode(tpdu) 266 await self._conn.send(tpdu_bytes) 267 268 if future and not future.done(): 269 future.set_result(None) 270 271 except ConnectionError: 272 pass 273 274 except Exception as e: 275 mlog.error("send loop error: %s", e, exc_info=e) 276 277 finally: 278 self.close() 279 self._send_queue.close() 280 281 while True: 282 if future and not future.done(): 283 future.set_result(None) 284 if self._send_queue.empty(): 285 break 286 _, future = self._send_queue.get_nowait()
Connection( conn: hat.drivers.tpkt.Connection, max_tpdu: int, local_tsel: int | None, remote_tsel: int | None, receive_queue_size: int, send_queue_size: int)
159 def __init__(self, 160 conn: tpkt.Connection, 161 max_tpdu: int, 162 local_tsel: int | None, 163 remote_tsel: int | None, 164 receive_queue_size: int, 165 send_queue_size: int): 166 self._conn = conn 167 self._max_tpdu = max_tpdu 168 self._loop = asyncio.get_running_loop() 169 self._info = ConnectionInfo(local_tsel=local_tsel, 170 remote_tsel=remote_tsel, 171 **conn.info._asdict()) 172 self._receive_queue = aio.Queue(receive_queue_size) 173 self._send_queue = aio.Queue(send_queue_size) 174 175 self.async_group.spawn(self._receive_loop) 176 self.async_group.spawn(self._send_loop)
async_group: hat.aio.group.Group
178 @property 179 def async_group(self) -> aio.Group: 180 """Async group""" 181 return self._conn.async_group
Async group
async def
receive(self) -> bytes | bytearray | memoryview:
188 async def receive(self) -> util.Bytes: 189 """Receive data""" 190 try: 191 return await self._receive_queue.get() 192 193 except aio.QueueClosedError: 194 raise ConnectionError()
Receive data