hat.drivers.cosp
Connection oriented session protocol
1"""Connection oriented session protocol""" 2 3from hat.drivers.cosp.connection import (ConnectionInfo, 4 ValidateCb, 5 ConnectionCb, 6 connect, 7 listen, 8 Server, 9 Connection) 10 11 12__all__ = ['ConnectionInfo', 13 'ValidateCb', 14 'ConnectionCb', 15 'connect', 16 'listen', 17 'Server', 18 'Connection']
class
ConnectionInfo(typing.NamedTuple):
28class ConnectionInfo(typing.NamedTuple): 29 name: str | None 30 local_addr: tcp.Address 31 local_tsel: int | None 32 local_ssel: int | None 33 remote_addr: tcp.Address 34 remote_tsel: int | None 35 remote_ssel: int | None
ConnectionInfo(name, local_addr, local_tsel, local_ssel, remote_addr, remote_tsel, remote_ssel)
ConnectionInfo( name: str | None, local_addr: hat.drivers.tcp.Address, local_tsel: int | None, local_ssel: int | None, remote_addr: hat.drivers.tcp.Address, remote_tsel: int | None, remote_ssel: int | None)
Create new instance of ConnectionInfo(name, local_addr, local_tsel, local_ssel, remote_addr, remote_tsel, remote_ssel)
ValidateCb =
typing.Callable[[bytes | bytearray | memoryview], bytes | bytearray | memoryview | None | collections.abc.Awaitable[bytes | bytearray | memoryview | None]]
ConnectionCb =
typing.Callable[[ForwardRef('Connection')], None | collections.abc.Awaitable[None]]
async def
connect( addr: hat.drivers.tcp.Address, user_data: bytes | bytearray | memoryview | None = None, *, local_ssel: int | None = None, remote_ssel: int | None = None, cosp_receive_queue_size: int = 1024, cosp_send_queue_size: int = 1024, **kwargs) -> Connection:
46async def connect(addr: tcp.Address, 47 user_data: util.Bytes | None = None, 48 *, 49 local_ssel: int | None = None, 50 remote_ssel: int | None = None, 51 cosp_receive_queue_size: int = 1024, 52 cosp_send_queue_size: int = 1024, 53 **kwargs 54 ) -> 'Connection': 55 """Connect to COSP server 56 57 Additional arguments are passed directly to `hat.drivers.cotp.connect`. 58 59 """ 60 log = _create_connection_logger(kwargs.get('name'), None) 61 conn = await cotp.connect(addr, **kwargs) 62 63 try: 64 cn_spdu = common.Spdu(type=common.SpduType.CN, 65 extended_spdus=False, 66 version_number=_params_version, 67 requirements=_params_requirements, 68 calling_ssel=local_ssel, 69 called_ssel=remote_ssel, 70 user_data=user_data) 71 cn_spdu_bytes = encoder.encode(cn_spdu) 72 await conn.send(cn_spdu_bytes) 73 74 ac_spdu_bytes = await conn.receive() 75 ac_spdu = encoder.decode(memoryview(ac_spdu_bytes)) 76 _validate_connect_response(cn_spdu, ac_spdu) 77 78 calling_ssel, called_ssel = _get_ssels(cn_spdu, ac_spdu) 79 return Connection(conn, cn_spdu, ac_spdu, calling_ssel, called_ssel, 80 cosp_receive_queue_size, cosp_send_queue_size) 81 82 except BaseException: 83 await aio.uncancellable(_close_cotp(conn, _ab_spdu, log)) 84 raise
Connect to COSP server
Additional arguments are passed directly to hat.drivers.cotp.connect.
async def
listen( validate_cb: Callable[[bytes | bytearray | memoryview], bytes | bytearray | memoryview | None | Awaitable[bytes | bytearray | memoryview | None]], connection_cb: Callable[[Connection], None | Awaitable[None]], addr: hat.drivers.tcp.Address = Address(host='0.0.0.0', port=102), *, bind_connections: bool = False, cosp_receive_queue_size: int = 1024, cosp_send_queue_size: int = 1024, **kwargs) -> Server:
87async def listen(validate_cb: ValidateCb, 88 connection_cb: ConnectionCb, 89 addr: tcp.Address = tcp.Address('0.0.0.0', 102), 90 *, 91 bind_connections: bool = False, 92 cosp_receive_queue_size: int = 1024, 93 cosp_send_queue_size: int = 1024, 94 **kwargs 95 ) -> 'Server': 96 """Create COSP listening server 97 98 Additional arguments are passed directly to `hat.drivers.cotp.listen`. 99 100 Args: 101 validate_cb: callback function or coroutine called on new 102 incomming connection request prior to creating new connection 103 connection_cb: new connection callback 104 addr: local listening address 105 106 """ 107 server = Server() 108 server._validate_cb = validate_cb 109 server._connection_cb = connection_cb 110 server._bind_connections = bind_connections 111 server._receive_queue_size = cosp_receive_queue_size 112 server._send_queue_size = cosp_send_queue_size 113 server._log = _create_server_logger(kwargs.get('name'), None) 114 115 server._srv = await cotp.listen(server._on_connection, addr, 116 bind_connections=False, 117 **kwargs) 118 119 server._log = _create_server_logger(kwargs.get('name'), server._srv.info) 120 121 return server
Create COSP listening server
Additional arguments are passed directly to hat.drivers.cotp.listen.
Arguments:
- validate_cb: callback function or coroutine called on new incomming connection request prior to creating new connection
- connection_cb: new connection callback
- addr: local listening address
class
Server(hat.aio.group.Resource):
124class Server(aio.Resource): 125 """COSP listening server 126 127 For creating new server see `listen`. 128 129 """ 130 131 @property 132 def async_group(self) -> aio.Group: 133 """Async group""" 134 return self._srv.async_group 135 136 @property 137 def info(self) -> tcp.ServerInfo: 138 """Server info""" 139 return self._srv.info 140 141 async def _on_connection(self, cotp_conn): 142 try: 143 try: 144 cn_spdu_bytes = await cotp_conn.receive() 145 cn_spdu = encoder.decode(memoryview(cn_spdu_bytes)) 146 _validate_connect_request(cn_spdu) 147 148 res_user_data = await aio.call(self._validate_cb, 149 cn_spdu.user_data) 150 151 ac_spdu = common.Spdu(type=common.SpduType.AC, 152 extended_spdus=False, 153 version_number=_params_version, 154 requirements=_params_requirements, 155 calling_ssel=cn_spdu.calling_ssel, 156 called_ssel=cn_spdu.called_ssel, 157 user_data=res_user_data) 158 ac_spdu_bytes = encoder.encode(ac_spdu) 159 await cotp_conn.send(ac_spdu_bytes) 160 161 calling_ssel, called_ssel = _get_ssels(cn_spdu, ac_spdu) 162 conn = Connection(cotp_conn, cn_spdu, ac_spdu, 163 called_ssel, calling_ssel, 164 self._receive_queue_size, 165 self._send_queue_size) 166 167 except BaseException: 168 await aio.uncancellable( 169 _close_cotp(cotp_conn, _ab_spdu, self._log, self._log)) 170 raise 171 172 try: 173 await aio.call(self._connection_cb, conn) 174 175 except BaseException: 176 await aio.uncancellable(conn.async_close()) 177 raise 178 179 except Exception as e: 180 self._log.error("error creating new incomming connection: %s", 181 e, exc_info=e) 182 return 183 184 if not self._bind_connections: 185 return 186 187 try: 188 await conn.wait_closed() 189 190 except BaseException: 191 await aio.uncancellable(conn.async_close()) 192 raise
COSP listening server
For creating new server see listen.
async_group: hat.aio.group.Group
131 @property 132 def async_group(self) -> aio.Group: 133 """Async group""" 134 return self._srv.async_group
Async group
class
Connection(hat.aio.group.Resource):
195class Connection(aio.Resource): 196 """COSP connection 197 198 For creating new connection see `connect` or `listen`. 199 200 """ 201 202 def __init__(self, 203 conn: cotp.Connection, 204 cn_spdu: common.Spdu, 205 ac_spdu: common.Spdu, 206 local_ssel: int | None, 207 remote_ssel: int | None, 208 receive_queue_size: int, 209 send_queue_size: int): 210 self._conn = conn 211 self._conn_req_user_data = cn_spdu.user_data 212 self._conn_res_user_data = ac_spdu.user_data 213 self._loop = asyncio.get_running_loop() 214 self._info = ConnectionInfo(local_ssel=local_ssel, 215 remote_ssel=remote_ssel, 216 **conn.info._asdict()) 217 self._close_spdu = None 218 self._receive_queue = aio.Queue(receive_queue_size) 219 self._send_queue = aio.Queue(send_queue_size) 220 self._async_group = aio.Group() 221 self._log = _create_connection_logger(self._info.name, self._info) 222 223 self.async_group.spawn(aio.call_on_cancel, self._on_close) 224 self.async_group.spawn(self._receive_loop) 225 self.async_group.spawn(self._send_loop) 226 self.async_group.spawn(aio.call_on_done, conn.wait_closing(), 227 self.close) 228 229 @property 230 def async_group(self) -> aio.Group: 231 """Async group""" 232 return self._async_group 233 234 @property 235 def info(self) -> ConnectionInfo: 236 """Connection info""" 237 return self._info 238 239 @property 240 def conn_req_user_data(self) -> util.Bytes: 241 """Connect request's user data""" 242 return self._conn_req_user_data 243 244 @property 245 def conn_res_user_data(self) -> util.Bytes: 246 """Connect response's user data""" 247 return self._conn_res_user_data 248 249 def close(self, user_data: util.Bytes | None = None): 250 """Close connection""" 251 self._close(common.Spdu(common.SpduType.FN, 252 transport_disconnect=True, 253 user_data=user_data)) 254 255 async def async_close(self, user_data: util.Bytes | None = None): 256 """Async close""" 257 self.close(user_data) 258 await self.wait_closed() 259 260 async def receive(self) -> util.Bytes: 261 """Receive data""" 262 try: 263 return await self._receive_queue.get() 264 265 except aio.QueueClosedError: 266 raise ConnectionError() 267 268 async def send(self, data: util.Bytes): 269 """Send data""" 270 try: 271 await self._send_queue.put((data, None)) 272 273 except aio.QueueClosedError: 274 raise ConnectionError() 275 276 async def drain(self): 277 """Drain output buffer""" 278 try: 279 future = self._loop.create_future() 280 await self._send_queue.put((None, future)) 281 await future 282 283 except aio.QueueClosedError: 284 raise ConnectionError() 285 286 async def _on_close(self): 287 await _close_cotp(self._conn, self._close_spdu, self._log) 288 289 def _close(self, spdu): 290 if not self.is_open: 291 return 292 293 self._close_spdu = spdu 294 self._async_group.close() 295 296 async def _receive_loop(self): 297 try: 298 data = bytearray() 299 while True: 300 spdu_bytes = await self._conn.receive() 301 spdu = encoder.decode(memoryview(spdu_bytes)) 302 303 if spdu.type == common.SpduType.DT: 304 data.extend(spdu.data) 305 306 if spdu.end is None or spdu.end: 307 await self._receive_queue.put(data) 308 data = bytearray() 309 310 elif spdu.type == common.SpduType.FN: 311 self._close(_dn_spdu) 312 break 313 314 elif spdu.type == common.SpduType.AB: 315 self._close(None) 316 break 317 318 else: 319 self._close(_ab_spdu) 320 break 321 322 except ConnectionError: 323 pass 324 325 except Exception as e: 326 self._log.error("receive loop error: %s", e, exc_info=e) 327 328 finally: 329 self.close() 330 self._receive_queue.close() 331 332 async def _send_loop(self): 333 future = None 334 try: 335 while True: 336 data, future = await self._send_queue.get() 337 338 if data is None: 339 await self._conn.drain() 340 341 else: 342 spdu = common.Spdu(type=common.SpduType.DT, 343 data=data) 344 spdu_bytes = encoder.encode(spdu) 345 346 msg = bytes(itertools.chain(common.give_tokens_spdu_bytes, 347 spdu_bytes)) 348 349 await self._conn.send(msg) 350 351 if future and not future.done(): 352 future.set_result(None) 353 354 except ConnectionError: 355 pass 356 357 except Exception as e: 358 self._log.error("send loop error: %s", e, exc_info=e) 359 360 finally: 361 self.close() 362 self._send_queue.close() 363 364 while True: 365 if future and not future.done(): 366 future.set_result(None) 367 if self._send_queue.empty(): 368 break 369 _, future = self._send_queue.get_nowait()
Connection( conn: Connection, cn_spdu: hat.drivers.cosp.common.Spdu, ac_spdu: hat.drivers.cosp.common.Spdu, local_ssel: int | None, remote_ssel: int | None, receive_queue_size: int, send_queue_size: int)
202 def __init__(self, 203 conn: cotp.Connection, 204 cn_spdu: common.Spdu, 205 ac_spdu: common.Spdu, 206 local_ssel: int | None, 207 remote_ssel: int | None, 208 receive_queue_size: int, 209 send_queue_size: int): 210 self._conn = conn 211 self._conn_req_user_data = cn_spdu.user_data 212 self._conn_res_user_data = ac_spdu.user_data 213 self._loop = asyncio.get_running_loop() 214 self._info = ConnectionInfo(local_ssel=local_ssel, 215 remote_ssel=remote_ssel, 216 **conn.info._asdict()) 217 self._close_spdu = None 218 self._receive_queue = aio.Queue(receive_queue_size) 219 self._send_queue = aio.Queue(send_queue_size) 220 self._async_group = aio.Group() 221 self._log = _create_connection_logger(self._info.name, self._info) 222 223 self.async_group.spawn(aio.call_on_cancel, self._on_close) 224 self.async_group.spawn(self._receive_loop) 225 self.async_group.spawn(self._send_loop) 226 self.async_group.spawn(aio.call_on_done, conn.wait_closing(), 227 self.close)
async_group: hat.aio.group.Group
229 @property 230 def async_group(self) -> aio.Group: 231 """Async group""" 232 return self._async_group
Async group
conn_req_user_data: bytes | bytearray | memoryview
239 @property 240 def conn_req_user_data(self) -> util.Bytes: 241 """Connect request's user data""" 242 return self._conn_req_user_data
Connect request's user data
conn_res_user_data: bytes | bytearray | memoryview
244 @property 245 def conn_res_user_data(self) -> util.Bytes: 246 """Connect response's user data""" 247 return self._conn_res_user_data
Connect response's user data
def
close(self, user_data: bytes | bytearray | memoryview | None = None):
249 def close(self, user_data: util.Bytes | None = None): 250 """Close connection""" 251 self._close(common.Spdu(common.SpduType.FN, 252 transport_disconnect=True, 253 user_data=user_data))
Close connection
async def
async_close(self, user_data: bytes | bytearray | memoryview | None = None):
255 async def async_close(self, user_data: util.Bytes | None = None): 256 """Async close""" 257 self.close(user_data) 258 await self.wait_closed()
Async close
async def
receive(self) -> bytes | bytearray | memoryview:
260 async def receive(self) -> util.Bytes: 261 """Receive data""" 262 try: 263 return await self._receive_queue.get() 264 265 except aio.QueueClosedError: 266 raise ConnectionError()
Receive data