hat.drivers.cosp
Connection oriented session protocol
1"""Connection oriented session protocol""" 2 3from hat.drivers.cosp.connection import (ConnectionInfo, 4 ValidateCb, 5 ConnectionCb, 6 connect, 7 listen, 8 Server, 9 Connection) 10 11 12__all__ = ['ConnectionInfo', 13 'ValidateCb', 14 'ConnectionCb', 15 'connect', 16 'listen', 17 'Server', 18 'Connection']
class
ConnectionInfo(typing.NamedTuple):
28class ConnectionInfo(typing.NamedTuple): 29 name: str | None 30 local_addr: tcp.Address 31 local_tsel: int | None 32 local_ssel: int | None 33 remote_addr: tcp.Address 34 remote_tsel: int | None 35 remote_ssel: int | None
ConnectionInfo(name, local_addr, local_tsel, local_ssel, remote_addr, remote_tsel, remote_ssel)
ConnectionInfo( name: str | None, local_addr: hat.drivers.tcp.Address, local_tsel: int | None, local_ssel: int | None, remote_addr: hat.drivers.tcp.Address, remote_tsel: int | None, remote_ssel: int | None)
Create new instance of ConnectionInfo(name, local_addr, local_tsel, local_ssel, remote_addr, remote_tsel, remote_ssel)
ValidateCb =
typing.Callable[[bytes | bytearray | memoryview], bytes | bytearray | memoryview | None | collections.abc.Awaitable[bytes | bytearray | memoryview | None]]
ConnectionCb =
typing.Callable[[ForwardRef('Connection')], None | collections.abc.Awaitable[None]]
async def
connect( addr: hat.drivers.tcp.Address, user_data: bytes | bytearray | memoryview | None = None, *, local_ssel: int | None = None, remote_ssel: int | None = None, cosp_receive_queue_size: int = 1024, cosp_send_queue_size: int = 1024, **kwargs) -> Connection:
46async def connect(addr: tcp.Address, 47 user_data: util.Bytes | None = None, 48 *, 49 local_ssel: int | None = None, 50 remote_ssel: int | None = None, 51 cosp_receive_queue_size: int = 1024, 52 cosp_send_queue_size: int = 1024, 53 **kwargs 54 ) -> 'Connection': 55 """Connect to COSP server 56 57 Additional arguments are passed directly to `hat.drivers.cotp.connect`. 58 59 """ 60 conn = await cotp.connect(addr, **kwargs) 61 62 try: 63 cn_spdu = common.Spdu(type=common.SpduType.CN, 64 extended_spdus=False, 65 version_number=_params_version, 66 requirements=_params_requirements, 67 calling_ssel=local_ssel, 68 called_ssel=remote_ssel, 69 user_data=user_data) 70 cn_spdu_bytes = encoder.encode(cn_spdu) 71 await conn.send(cn_spdu_bytes) 72 73 ac_spdu_bytes = await conn.receive() 74 ac_spdu = encoder.decode(memoryview(ac_spdu_bytes)) 75 _validate_connect_response(cn_spdu, ac_spdu) 76 77 calling_ssel, called_ssel = _get_ssels(cn_spdu, ac_spdu) 78 return Connection(conn, cn_spdu, ac_spdu, calling_ssel, called_ssel, 79 cosp_receive_queue_size, cosp_send_queue_size) 80 81 except BaseException: 82 await aio.uncancellable(_close_cotp(conn, _ab_spdu, mlog, mlog)) 83 raise
Connect to COSP server
Additional arguments are passed directly to hat.drivers.cotp.connect.
async def
listen( validate_cb: Callable[[bytes | bytearray | memoryview], bytes | bytearray | memoryview | None | Awaitable[bytes | bytearray | memoryview | None]], connection_cb: Callable[[Connection], None | Awaitable[None]], addr: hat.drivers.tcp.Address = Address(host='0.0.0.0', port=102), *, bind_connections: bool = False, cosp_receive_queue_size: int = 1024, cosp_send_queue_size: int = 1024, **kwargs) -> Server:
86async def listen(validate_cb: ValidateCb, 87 connection_cb: ConnectionCb, 88 addr: tcp.Address = tcp.Address('0.0.0.0', 102), 89 *, 90 bind_connections: bool = False, 91 cosp_receive_queue_size: int = 1024, 92 cosp_send_queue_size: int = 1024, 93 **kwargs 94 ) -> 'Server': 95 """Create COSP listening server 96 97 Additional arguments are passed directly to `hat.drivers.cotp.listen`. 98 99 Args: 100 validate_cb: callback function or coroutine called on new 101 incomming connection request prior to creating new connection 102 connection_cb: new connection callback 103 addr: local listening address 104 105 """ 106 server = Server() 107 server._validate_cb = validate_cb 108 server._connection_cb = connection_cb 109 server._bind_connections = bind_connections 110 server._receive_queue_size = cosp_receive_queue_size 111 server._send_queue_size = cosp_send_queue_size 112 server._log = mlog 113 114 server._srv = await cotp.listen(server._on_connection, addr, 115 bind_connections=False, 116 **kwargs) 117 118 server._log = _create_server_logger_adapter(server._srv.info) 119 120 return server
Create COSP listening server
Additional arguments are passed directly to hat.drivers.cotp.listen.
Arguments:
- validate_cb: callback function or coroutine called on new incomming connection request prior to creating new connection
- connection_cb: new connection callback
- addr: local listening address
class
Server(hat.aio.group.Resource):
123class Server(aio.Resource): 124 """COSP listening server 125 126 For creating new server see `listen`. 127 128 """ 129 130 @property 131 def async_group(self) -> aio.Group: 132 """Async group""" 133 return self._srv.async_group 134 135 @property 136 def info(self) -> tcp.ServerInfo: 137 """Server info""" 138 return self._srv.info 139 140 async def _on_connection(self, cotp_conn): 141 try: 142 try: 143 cn_spdu_bytes = await cotp_conn.receive() 144 cn_spdu = encoder.decode(memoryview(cn_spdu_bytes)) 145 _validate_connect_request(cn_spdu) 146 147 res_user_data = await aio.call(self._validate_cb, 148 cn_spdu.user_data) 149 150 ac_spdu = common.Spdu(type=common.SpduType.AC, 151 extended_spdus=False, 152 version_number=_params_version, 153 requirements=_params_requirements, 154 calling_ssel=cn_spdu.calling_ssel, 155 called_ssel=cn_spdu.called_ssel, 156 user_data=res_user_data) 157 ac_spdu_bytes = encoder.encode(ac_spdu) 158 await cotp_conn.send(ac_spdu_bytes) 159 160 calling_ssel, called_ssel = _get_ssels(cn_spdu, ac_spdu) 161 conn = Connection(cotp_conn, cn_spdu, ac_spdu, 162 called_ssel, calling_ssel, 163 self._receive_queue_size, 164 self._send_queue_size) 165 166 except BaseException: 167 await aio.uncancellable( 168 _close_cotp(cotp_conn, _ab_spdu, self._log, self._log)) 169 raise 170 171 try: 172 await aio.call(self._connection_cb, conn) 173 174 except BaseException: 175 await aio.uncancellable(conn.async_close()) 176 raise 177 178 except Exception as e: 179 self._log.error("error creating new incomming connection: %s", 180 e, exc_info=e) 181 return 182 183 if not self._bind_connections: 184 return 185 186 try: 187 await conn.wait_closed() 188 189 except BaseException: 190 await aio.uncancellable(conn.async_close()) 191 raise
COSP listening server
For creating new server see listen.
async_group: hat.aio.group.Group
130 @property 131 def async_group(self) -> aio.Group: 132 """Async group""" 133 return self._srv.async_group
Async group
class
Connection(hat.aio.group.Resource):
194class Connection(aio.Resource): 195 """COSP connection 196 197 For creating new connection see `connect` or `listen`. 198 199 """ 200 201 def __init__(self, 202 conn: cotp.Connection, 203 cn_spdu: common.Spdu, 204 ac_spdu: common.Spdu, 205 local_ssel: int | None, 206 remote_ssel: int | None, 207 receive_queue_size: int, 208 send_queue_size: int): 209 self._conn = conn 210 self._conn_req_user_data = cn_spdu.user_data 211 self._conn_res_user_data = ac_spdu.user_data 212 self._loop = asyncio.get_running_loop() 213 self._info = ConnectionInfo(local_ssel=local_ssel, 214 remote_ssel=remote_ssel, 215 **conn.info._asdict()) 216 self._close_spdu = None 217 self._receive_queue = aio.Queue(receive_queue_size) 218 self._send_queue = aio.Queue(send_queue_size) 219 self._async_group = aio.Group() 220 self._log = _create_connection_logger_adapter(False, self._info) 221 self._comm_log = _create_connection_logger_adapter(True, self._info) 222 223 self.async_group.spawn(aio.call_on_cancel, self._on_close) 224 self.async_group.spawn(self._receive_loop) 225 self.async_group.spawn(self._send_loop) 226 self.async_group.spawn(aio.call_on_done, conn.wait_closing(), 227 self.close) 228 229 self._comm_log.debug('connection established') 230 231 @property 232 def async_group(self) -> aio.Group: 233 """Async group""" 234 return self._async_group 235 236 @property 237 def info(self) -> ConnectionInfo: 238 """Connection info""" 239 return self._info 240 241 @property 242 def conn_req_user_data(self) -> util.Bytes: 243 """Connect request's user data""" 244 return self._conn_req_user_data 245 246 @property 247 def conn_res_user_data(self) -> util.Bytes: 248 """Connect response's user data""" 249 return self._conn_res_user_data 250 251 def close(self, user_data: util.Bytes | None = None): 252 """Close connection""" 253 self._close(common.Spdu(common.SpduType.FN, 254 transport_disconnect=True, 255 user_data=user_data)) 256 257 async def async_close(self, user_data: util.Bytes | None = None): 258 """Async close""" 259 self.close(user_data) 260 await self.wait_closed() 261 262 async def receive(self) -> util.Bytes: 263 """Receive data""" 264 try: 265 return await self._receive_queue.get() 266 267 except aio.QueueClosedError: 268 raise ConnectionError() 269 270 async def send(self, data: util.Bytes): 271 """Send data""" 272 try: 273 await self._send_queue.put((data, None)) 274 275 except aio.QueueClosedError: 276 raise ConnectionError() 277 278 async def drain(self): 279 """Drain output buffer""" 280 try: 281 future = self._loop.create_future() 282 await self._send_queue.put((None, future)) 283 await future 284 285 except aio.QueueClosedError: 286 raise ConnectionError() 287 288 async def _on_close(self): 289 await _close_cotp(self._conn, self._close_spdu, self._log, 290 self._comm_log) 291 292 self._comm_log.debug('connection closed') 293 294 def _close(self, spdu): 295 if not self.is_open: 296 return 297 298 self._close_spdu = spdu 299 self._async_group.close() 300 301 async def _receive_loop(self): 302 try: 303 data = bytearray() 304 while True: 305 spdu_bytes = await self._conn.receive() 306 spdu = encoder.decode(memoryview(spdu_bytes)) 307 308 self._comm_log.debug('received %s', spdu) 309 310 if spdu.type == common.SpduType.DT: 311 data.extend(spdu.data) 312 313 if spdu.end is None or spdu.end: 314 await self._receive_queue.put(data) 315 data = bytearray() 316 317 elif spdu.type == common.SpduType.FN: 318 self._close(_dn_spdu) 319 break 320 321 elif spdu.type == common.SpduType.AB: 322 self._close(None) 323 break 324 325 else: 326 self._close(_ab_spdu) 327 break 328 329 except ConnectionError: 330 pass 331 332 except Exception as e: 333 self._log.error("receive loop error: %s", e, exc_info=e) 334 335 finally: 336 self.close() 337 self._receive_queue.close() 338 339 async def _send_loop(self): 340 future = None 341 try: 342 while True: 343 data, future = await self._send_queue.get() 344 345 if data is None: 346 await self._conn.drain() 347 348 else: 349 spdu = common.Spdu(type=common.SpduType.DT, 350 data=data) 351 spdu_bytes = encoder.encode(spdu) 352 353 msg = bytes(itertools.chain(common.give_tokens_spdu_bytes, 354 spdu_bytes)) 355 356 self._comm_log.debug('sending %s', spdu) 357 358 await self._conn.send(msg) 359 360 if future and not future.done(): 361 future.set_result(None) 362 363 except ConnectionError: 364 pass 365 366 except Exception as e: 367 self._log.error("send loop error: %s", e, exc_info=e) 368 369 finally: 370 self.close() 371 self._send_queue.close() 372 373 while True: 374 if future and not future.done(): 375 future.set_result(None) 376 if self._send_queue.empty(): 377 break 378 _, future = self._send_queue.get_nowait()
Connection( conn: Connection, cn_spdu: hat.drivers.cosp.common.Spdu, ac_spdu: hat.drivers.cosp.common.Spdu, local_ssel: int | None, remote_ssel: int | None, receive_queue_size: int, send_queue_size: int)
201 def __init__(self, 202 conn: cotp.Connection, 203 cn_spdu: common.Spdu, 204 ac_spdu: common.Spdu, 205 local_ssel: int | None, 206 remote_ssel: int | None, 207 receive_queue_size: int, 208 send_queue_size: int): 209 self._conn = conn 210 self._conn_req_user_data = cn_spdu.user_data 211 self._conn_res_user_data = ac_spdu.user_data 212 self._loop = asyncio.get_running_loop() 213 self._info = ConnectionInfo(local_ssel=local_ssel, 214 remote_ssel=remote_ssel, 215 **conn.info._asdict()) 216 self._close_spdu = None 217 self._receive_queue = aio.Queue(receive_queue_size) 218 self._send_queue = aio.Queue(send_queue_size) 219 self._async_group = aio.Group() 220 self._log = _create_connection_logger_adapter(False, self._info) 221 self._comm_log = _create_connection_logger_adapter(True, self._info) 222 223 self.async_group.spawn(aio.call_on_cancel, self._on_close) 224 self.async_group.spawn(self._receive_loop) 225 self.async_group.spawn(self._send_loop) 226 self.async_group.spawn(aio.call_on_done, conn.wait_closing(), 227 self.close) 228 229 self._comm_log.debug('connection established')
async_group: hat.aio.group.Group
231 @property 232 def async_group(self) -> aio.Group: 233 """Async group""" 234 return self._async_group
Async group
conn_req_user_data: bytes | bytearray | memoryview
241 @property 242 def conn_req_user_data(self) -> util.Bytes: 243 """Connect request's user data""" 244 return self._conn_req_user_data
Connect request's user data
conn_res_user_data: bytes | bytearray | memoryview
246 @property 247 def conn_res_user_data(self) -> util.Bytes: 248 """Connect response's user data""" 249 return self._conn_res_user_data
Connect response's user data
def
close(self, user_data: bytes | bytearray | memoryview | None = None):
251 def close(self, user_data: util.Bytes | None = None): 252 """Close connection""" 253 self._close(common.Spdu(common.SpduType.FN, 254 transport_disconnect=True, 255 user_data=user_data))
Close connection
async def
async_close(self, user_data: bytes | bytearray | memoryview | None = None):
257 async def async_close(self, user_data: util.Bytes | None = None): 258 """Async close""" 259 self.close(user_data) 260 await self.wait_closed()
Async close
async def
receive(self) -> bytes | bytearray | memoryview:
262 async def receive(self) -> util.Bytes: 263 """Receive data""" 264 try: 265 return await self._receive_queue.get() 266 267 except aio.QueueClosedError: 268 raise ConnectionError()
Receive data