hat.drivers.cosp
Connection oriented session protocol
1"""Connection oriented session protocol""" 2 3from hat.drivers.cosp.connection import (ConnectionInfo, 4 ValidateCb, 5 ConnectionCb, 6 connect, 7 listen, 8 Server, 9 Connection) 10 11 12__all__ = ['ConnectionInfo', 13 'ValidateCb', 14 'ConnectionCb', 15 'connect', 16 'listen', 17 'Server', 18 'Connection']
class
ConnectionInfo(typing.NamedTuple):
28class ConnectionInfo(typing.NamedTuple): 29 local_addr: tcp.Address 30 local_tsel: int | None 31 local_ssel: int | None 32 remote_addr: tcp.Address 33 remote_tsel: int | None 34 remote_ssel: int | None
ConnectionInfo(local_addr, local_tsel, local_ssel, remote_addr, remote_tsel, remote_ssel)
ConnectionInfo( local_addr: hat.drivers.tcp.Address, local_tsel: int | None, local_ssel: int | None, remote_addr: hat.drivers.tcp.Address, remote_tsel: int | None, remote_ssel: int | None)
Create new instance of ConnectionInfo(local_addr, local_tsel, local_ssel, remote_addr, remote_tsel, remote_ssel)
ValidateCb =
typing.Callable[[bytes | bytearray | memoryview], typing.Union[bytes, bytearray, memoryview, NoneType, typing.Awaitable[bytes | bytearray | memoryview | None]]]
ConnectionCb =
typing.Callable[[ForwardRef('Connection')], typing.Optional[typing.Awaitable[NoneType]]]
async def
connect( addr: hat.drivers.tcp.Address, user_data: bytes | bytearray | memoryview | None = None, *, local_ssel: int | None = None, remote_ssel: int | None = None, cosp_receive_queue_size: int = 1024, cosp_send_queue_size: int = 1024, **kwargs) -> Connection:
45async def connect(addr: tcp.Address, 46 user_data: util.Bytes | None = None, 47 *, 48 local_ssel: int | None = None, 49 remote_ssel: int | None = None, 50 cosp_receive_queue_size: int = 1024, 51 cosp_send_queue_size: int = 1024, 52 **kwargs 53 ) -> 'Connection': 54 """Connect to COSP server 55 56 Additional arguments are passed directly to `hat.drivers.cotp.connect`. 57 58 """ 59 conn = await cotp.connect(addr, **kwargs) 60 61 try: 62 cn_spdu = common.Spdu(type=common.SpduType.CN, 63 extended_spdus=False, 64 version_number=_params_version, 65 requirements=_params_requirements, 66 calling_ssel=local_ssel, 67 called_ssel=remote_ssel, 68 user_data=user_data) 69 cn_spdu_bytes = encoder.encode(cn_spdu) 70 await conn.send(cn_spdu_bytes) 71 72 ac_spdu_bytes = await conn.receive() 73 ac_spdu = encoder.decode(memoryview(ac_spdu_bytes)) 74 _validate_connect_response(cn_spdu, ac_spdu) 75 76 calling_ssel, called_ssel = _get_ssels(cn_spdu, ac_spdu) 77 return Connection(conn, cn_spdu, ac_spdu, calling_ssel, called_ssel, 78 cosp_receive_queue_size, cosp_send_queue_size) 79 80 except BaseException: 81 await aio.uncancellable(_close_cotp(conn, _ab_spdu)) 82 raise
Connect to COSP server
Additional arguments are passed directly to hat.drivers.cotp.connect
.
async def
listen( validate_cb: Callable[[bytes | bytearray | memoryview], Union[bytes, bytearray, memoryview, NoneType, Awaitable[bytes | bytearray | memoryview | None]]], connection_cb: Callable[[hat.drivers.acse.Connection], Optional[Awaitable[NoneType]]], addr: hat.drivers.tcp.Address = Address(host='0.0.0.0', port=102), *, bind_connections: bool = False, cosp_receive_queue_size: int = 1024, cosp_send_queue_size: int = 1024, **kwargs) -> Server:
85async def listen(validate_cb: ValidateCb, 86 connection_cb: ConnectionCb, 87 addr: tcp.Address = tcp.Address('0.0.0.0', 102), 88 *, 89 bind_connections: bool = False, 90 cosp_receive_queue_size: int = 1024, 91 cosp_send_queue_size: int = 1024, 92 **kwargs 93 ) -> 'Server': 94 """Create COSP listening server 95 96 Additional arguments are passed directly to `hat.drivers.cotp.listen`. 97 98 Args: 99 validate_cb: callback function or coroutine called on new 100 incomming connection request prior to creating new connection 101 connection_cb: new connection callback 102 addr: local listening address 103 104 """ 105 server = Server() 106 server._validate_cb = validate_cb 107 server._connection_cb = connection_cb 108 server._bind_connections = bind_connections 109 server._receive_queue_size = cosp_receive_queue_size 110 server._send_queue_size = cosp_send_queue_size 111 112 server._srv = await cotp.listen(server._on_connection, addr, 113 bind_connections=False, 114 **kwargs) 115 116 return server
Create COSP listening server
Additional arguments are passed directly to hat.drivers.cotp.listen
.
Arguments:
- validate_cb: callback function or coroutine called on new incomming connection request prior to creating new connection
- connection_cb: new connection callback
- addr: local listening address
class
Server(hat.aio.group.Resource):
119class Server(aio.Resource): 120 """COSP listening server 121 122 For creating new server see `listen`. 123 124 """ 125 126 @property 127 def async_group(self) -> aio.Group: 128 """Async group""" 129 return self._srv.async_group 130 131 @property 132 def addresses(self) -> list[tcp.Address]: 133 """Listening addresses""" 134 return self._srv.addresses 135 136 async def _on_connection(self, cotp_conn): 137 try: 138 try: 139 cn_spdu_bytes = await cotp_conn.receive() 140 cn_spdu = encoder.decode(memoryview(cn_spdu_bytes)) 141 _validate_connect_request(cn_spdu) 142 143 res_user_data = await aio.call(self._validate_cb, 144 cn_spdu.user_data) 145 146 ac_spdu = common.Spdu(type=common.SpduType.AC, 147 extended_spdus=False, 148 version_number=_params_version, 149 requirements=_params_requirements, 150 calling_ssel=cn_spdu.calling_ssel, 151 called_ssel=cn_spdu.called_ssel, 152 user_data=res_user_data) 153 ac_spdu_bytes = encoder.encode(ac_spdu) 154 await cotp_conn.send(ac_spdu_bytes) 155 156 calling_ssel, called_ssel = _get_ssels(cn_spdu, ac_spdu) 157 conn = Connection(cotp_conn, cn_spdu, ac_spdu, 158 called_ssel, calling_ssel, 159 self._receive_queue_size, 160 self._send_queue_size) 161 162 except BaseException: 163 await aio.uncancellable(_close_cotp(cotp_conn, _ab_spdu)) 164 raise 165 166 try: 167 await aio.call(self._connection_cb, conn) 168 169 except BaseException: 170 await aio.uncancellable(conn.async_close()) 171 raise 172 173 except Exception as e: 174 mlog.error("error creating new incomming connection: %s", e, 175 exc_info=e) 176 return 177 178 if not self._bind_connections: 179 return 180 181 try: 182 await conn.wait_closed() 183 184 except BaseException: 185 await aio.uncancellable(conn.async_close()) 186 raise
COSP listening server
For creating new server see listen
.
async_group: hat.aio.group.Group
126 @property 127 def async_group(self) -> aio.Group: 128 """Async group""" 129 return self._srv.async_group
Async group
addresses: list[hat.drivers.tcp.Address]
131 @property 132 def addresses(self) -> list[tcp.Address]: 133 """Listening addresses""" 134 return self._srv.addresses
Listening addresses
class
Connection(hat.aio.group.Resource):
189class Connection(aio.Resource): 190 """COSP connection 191 192 For creating new connection see `connect` or `listen`. 193 194 """ 195 196 def __init__(self, 197 conn: cotp.Connection, 198 cn_spdu: common.Spdu, 199 ac_spdu: common.Spdu, 200 local_ssel: int | None, 201 remote_ssel: int | None, 202 receive_queue_size: int, 203 send_queue_size: int): 204 self._conn = conn 205 self._conn_req_user_data = cn_spdu.user_data 206 self._conn_res_user_data = ac_spdu.user_data 207 self._loop = asyncio.get_running_loop() 208 self._info = ConnectionInfo(local_ssel=local_ssel, 209 remote_ssel=remote_ssel, 210 **conn.info._asdict()) 211 self._close_spdu = None 212 self._receive_queue = aio.Queue(receive_queue_size) 213 self._send_queue = aio.Queue(send_queue_size) 214 self._async_group = aio.Group() 215 216 self.async_group.spawn(aio.call_on_cancel, self._on_close) 217 self.async_group.spawn(self._receive_loop) 218 self.async_group.spawn(self._send_loop) 219 self.async_group.spawn(aio.call_on_done, conn.wait_closing(), 220 self.close) 221 222 @property 223 def async_group(self) -> aio.Group: 224 """Async group""" 225 return self._async_group 226 227 @property 228 def info(self) -> ConnectionInfo: 229 """Connection info""" 230 return self._info 231 232 @property 233 def conn_req_user_data(self) -> util.Bytes: 234 """Connect request's user data""" 235 return self._conn_req_user_data 236 237 @property 238 def conn_res_user_data(self) -> util.Bytes: 239 """Connect response's user data""" 240 return self._conn_res_user_data 241 242 def close(self, user_data: util.Bytes | None = None): 243 """Close connection""" 244 self._close(common.Spdu(common.SpduType.FN, 245 transport_disconnect=True, 246 user_data=user_data)) 247 248 async def async_close(self, user_data: util.Bytes | None = None): 249 """Async close""" 250 self.close(user_data) 251 await self.wait_closed() 252 253 async def receive(self) -> util.Bytes: 254 """Receive data""" 255 try: 256 return await self._receive_queue.get() 257 258 except aio.QueueClosedError: 259 raise ConnectionError() 260 261 async def send(self, data: util.Bytes): 262 """Send data""" 263 try: 264 await self._send_queue.put((data, None)) 265 266 except aio.QueueClosedError: 267 raise ConnectionError() 268 269 async def drain(self): 270 """Drain output buffer""" 271 try: 272 future = self._loop.create_future() 273 await self._send_queue.put((None, future)) 274 await future 275 276 except aio.QueueClosedError: 277 raise ConnectionError() 278 279 async def _on_close(self): 280 await _close_cotp(self._conn, self._close_spdu) 281 282 def _close(self, spdu): 283 if not self.is_open: 284 return 285 286 self._close_spdu = spdu 287 self._async_group.close() 288 289 async def _receive_loop(self): 290 try: 291 data = bytearray() 292 while True: 293 spdu_bytes = await self._conn.receive() 294 spdu = encoder.decode(memoryview(spdu_bytes)) 295 296 if spdu.type == common.SpduType.DT: 297 data.extend(spdu.data) 298 299 if spdu.end is None or spdu.end: 300 await self._receive_queue.put(data) 301 data = bytearray() 302 303 elif spdu.type == common.SpduType.FN: 304 self._close(_dn_spdu) 305 break 306 307 elif spdu.type == common.SpduType.AB: 308 self._close(None) 309 break 310 311 else: 312 self._close(_ab_spdu) 313 break 314 315 except ConnectionError: 316 pass 317 318 except Exception as e: 319 mlog.error("receive loop error: %s", e, exc_info=e) 320 321 finally: 322 self.close() 323 self._receive_queue.close() 324 325 async def _send_loop(self): 326 future = None 327 try: 328 while True: 329 data, future = await self._send_queue.get() 330 331 if data is None: 332 await self._conn.drain() 333 334 else: 335 spdu = common.Spdu(type=common.SpduType.DT, 336 data=data) 337 spdu_bytes = encoder.encode(spdu) 338 339 msg = bytes(itertools.chain(common.give_tokens_spdu_bytes, 340 spdu_bytes)) 341 342 await self._conn.send(msg) 343 344 if future and not future.done(): 345 future.set_result(None) 346 347 except ConnectionError: 348 pass 349 350 except Exception as e: 351 mlog.error("send loop error: %s", e, exc_info=e) 352 353 finally: 354 self.close() 355 self._send_queue.close() 356 357 while True: 358 if future and not future.done(): 359 future.set_result(None) 360 if self._send_queue.empty(): 361 break 362 _, future = self._send_queue.get_nowait()
Connection( conn: Connection, cn_spdu: hat.drivers.cosp.common.Spdu, ac_spdu: hat.drivers.cosp.common.Spdu, local_ssel: int | None, remote_ssel: int | None, receive_queue_size: int, send_queue_size: int)
196 def __init__(self, 197 conn: cotp.Connection, 198 cn_spdu: common.Spdu, 199 ac_spdu: common.Spdu, 200 local_ssel: int | None, 201 remote_ssel: int | None, 202 receive_queue_size: int, 203 send_queue_size: int): 204 self._conn = conn 205 self._conn_req_user_data = cn_spdu.user_data 206 self._conn_res_user_data = ac_spdu.user_data 207 self._loop = asyncio.get_running_loop() 208 self._info = ConnectionInfo(local_ssel=local_ssel, 209 remote_ssel=remote_ssel, 210 **conn.info._asdict()) 211 self._close_spdu = None 212 self._receive_queue = aio.Queue(receive_queue_size) 213 self._send_queue = aio.Queue(send_queue_size) 214 self._async_group = aio.Group() 215 216 self.async_group.spawn(aio.call_on_cancel, self._on_close) 217 self.async_group.spawn(self._receive_loop) 218 self.async_group.spawn(self._send_loop) 219 self.async_group.spawn(aio.call_on_done, conn.wait_closing(), 220 self.close)
async_group: hat.aio.group.Group
222 @property 223 def async_group(self) -> aio.Group: 224 """Async group""" 225 return self._async_group
Async group
conn_req_user_data: bytes | bytearray | memoryview
232 @property 233 def conn_req_user_data(self) -> util.Bytes: 234 """Connect request's user data""" 235 return self._conn_req_user_data
Connect request's user data
conn_res_user_data: bytes | bytearray | memoryview
237 @property 238 def conn_res_user_data(self) -> util.Bytes: 239 """Connect response's user data""" 240 return self._conn_res_user_data
Connect response's user data
def
close(self, user_data: bytes | bytearray | memoryview | None = None):
242 def close(self, user_data: util.Bytes | None = None): 243 """Close connection""" 244 self._close(common.Spdu(common.SpduType.FN, 245 transport_disconnect=True, 246 user_data=user_data))
Close connection
async def
async_close(self, user_data: bytes | bytearray | memoryview | None = None):
248 async def async_close(self, user_data: util.Bytes | None = None): 249 """Async close""" 250 self.close(user_data) 251 await self.wait_closed()
Async close
async def
receive(self) -> bytes | bytearray | memoryview:
253 async def receive(self) -> util.Bytes: 254 """Receive data""" 255 try: 256 return await self._receive_queue.get() 257 258 except aio.QueueClosedError: 259 raise ConnectionError()
Receive data