hat.drivers.copp
Connection oriented presentation protocol
1"""Connection oriented presentation protocol""" 2 3import asyncio 4import importlib.resources 5import logging 6import typing 7 8from hat import aio 9from hat import asn1 10from hat import json 11 12from hat.drivers import cosp 13from hat.drivers import tcp 14 15 16mlog = logging.getLogger(__name__) 17 18with importlib.resources.open_text(__package__, 'asn1_repo.json') as _f: 19 _encoder = asn1.ber.BerEncoder( 20 asn1.repository_from_json( 21 json.decode_stream(_f))) 22 23 24class ConnectionInfo(typing.NamedTuple): 25 local_addr: tcp.Address 26 local_tsel: int | None 27 local_ssel: int | None 28 local_psel: int | None 29 remote_addr: tcp.Address 30 remote_tsel: int | None 31 remote_ssel: int | None 32 remote_psel: int | None 33 34 35IdentifiedEntity: typing.TypeAlias = tuple[asn1.ObjectIdentifier, asn1.Entity] 36"""Identified entity""" 37 38ValidateCb: typing.TypeAlias = aio.AsyncCallable[['SyntaxNames', 39 IdentifiedEntity], 40 IdentifiedEntity | None] 41"""Validate callback""" 42 43ConnectionCb: typing.TypeAlias = aio.AsyncCallable[['Connection'], None] 44"""Connection callback""" 45 46 47class SyntaxNames: 48 """Syntax name registry 49 50 Args: 51 syntax_names: list of ASN.1 ObjectIdentifiers representing syntax names 52 53 """ 54 55 def __init__(self, syntax_names: list[asn1.ObjectIdentifier]): 56 self._syntax_id_names = {(i * 2 + 1): name 57 for i, name in enumerate(syntax_names)} 58 self._syntax_name_ids = {v: k 59 for k, v in self._syntax_id_names.items()} 60 61 def get_name(self, syntax_id: int) -> asn1.ObjectIdentifier: 62 """Get syntax name associated with id""" 63 return self._syntax_id_names[syntax_id] 64 65 def get_id(self, syntax_name: asn1.ObjectIdentifier) -> int: 66 """Get syntax id associated with name""" 67 return self._syntax_name_ids[syntax_name] 68 69 70async def connect(addr: tcp.Address, 71 syntax_names: SyntaxNames, 72 user_data: IdentifiedEntity | None = None, 73 *, 74 local_psel: int | None = None, 75 remote_psel: int | None = None, 76 copp_receive_queue_size: int = 1024, 77 copp_send_queue_size: int = 1024, 78 **kwargs 79 ) -> 'Connection': 80 """Connect to COPP server 81 82 Additional arguments are passed directly to `hat.drivers.cosp.connect`. 83 84 """ 85 cp_ppdu = _cp_ppdu(syntax_names, local_psel, remote_psel, user_data) 86 cp_ppdu_data = _encode('CP-type', cp_ppdu) 87 conn = await cosp.connect(addr, cp_ppdu_data, **kwargs) 88 89 try: 90 cpa_ppdu = _decode('CPA-PPDU', conn.conn_res_user_data) 91 _validate_connect_response(cp_ppdu, cpa_ppdu) 92 93 calling_psel, called_psel = _get_psels(cp_ppdu) 94 return Connection(conn, syntax_names, cp_ppdu, cpa_ppdu, 95 calling_psel, called_psel, 96 copp_receive_queue_size, copp_send_queue_size) 97 98 except Exception: 99 await aio.uncancellable(_close_cosp(conn, _arp_ppdu())) 100 raise 101 102 103async def listen(validate_cb: ValidateCb, 104 connection_cb: ConnectionCb, 105 addr: tcp.Address = tcp.Address('0.0.0.0', 102), 106 *, 107 bind_connections: bool = False, 108 copp_receive_queue_size: int = 1024, 109 copp_send_queue_size: int = 1024, 110 **kwargs 111 ) -> 'Server': 112 """Create COPP listening server 113 114 Additional arguments are passed directly to `hat.drivers.cosp.listen`. 115 116 Args: 117 validate_cb: callback function or coroutine called on new 118 incomming connection request prior to creating connection object 119 connection_cb: new connection callback 120 addr: local listening address 121 122 """ 123 server = Server() 124 server._validate_cb = validate_cb 125 server._connection_cb = connection_cb 126 server._bind_connections = bind_connections 127 server._receive_queue_size = copp_receive_queue_size 128 server._send_queue_size = copp_send_queue_size 129 130 server._srv = await cosp.listen(server._on_validate, 131 server._on_connection, 132 addr, 133 bind_connections=False, 134 **kwargs) 135 136 return server 137 138 139class Server(aio.Resource): 140 """COPP listening server 141 142 For creating new server see `listen`. 143 144 """ 145 146 @property 147 def async_group(self) -> aio.Group: 148 """Async group""" 149 return self._srv.async_group 150 151 @property 152 def addresses(self) -> list[tcp.Address]: 153 """Listening addresses""" 154 return self._srv.addresses 155 156 async def _on_validate(self, user_data): 157 cp_ppdu = _decode('CP-type', user_data) 158 cp_params = cp_ppdu['normal-mode-parameters'] 159 called_psel_data = cp_params.get('called-presentation-selector') 160 called_psel = (int.from_bytes(called_psel_data, 'big') 161 if called_psel_data else None) 162 cp_pdv_list = cp_params['user-data'][1][0] 163 syntax_names = _sytax_names_from_cp_ppdu(cp_ppdu) 164 cp_user_data = ( 165 syntax_names.get_name( 166 cp_pdv_list['presentation-context-identifier']), 167 cp_pdv_list['presentation-data-values'][1]) 168 169 cpa_user_data = await aio.call(self._validate_cb, syntax_names, 170 cp_user_data) 171 172 cpa_ppdu = _cpa_ppdu(syntax_names, called_psel, cpa_user_data) 173 cpa_ppdu_data = _encode('CPA-PPDU', cpa_ppdu) 174 return cpa_ppdu_data 175 176 async def _on_connection(self, cosp_conn): 177 try: 178 try: 179 cp_ppdu = _decode('CP-type', cosp_conn.conn_req_user_data) 180 cpa_ppdu = _decode('CPA-PPDU', cosp_conn.conn_res_user_data) 181 182 syntax_names = _sytax_names_from_cp_ppdu(cp_ppdu) 183 calling_psel, called_psel = _get_psels(cp_ppdu) 184 185 conn = Connection(cosp_conn, syntax_names, cp_ppdu, cpa_ppdu, 186 calling_psel, called_psel, 187 self._receive_queue_size, 188 self._send_queue_size) 189 190 except Exception: 191 await aio.uncancellable(_close_cosp(cosp_conn, _arp_ppdu())) 192 raise 193 194 try: 195 await aio.call(self._connection_cb, conn) 196 197 except BaseException: 198 await aio.uncancellable(conn.async_close()) 199 raise 200 201 except Exception as e: 202 mlog.error("error creating new incomming connection: %s", e, 203 exc_info=e) 204 return 205 206 if not self._bind_connections: 207 return 208 209 try: 210 await conn.wait_closed() 211 212 except BaseException: 213 await aio.uncancellable(conn.async_close()) 214 raise 215 216 217class Connection(aio.Resource): 218 """COPP connection 219 220 For creating new connection see `connect` or `listen`. 221 222 """ 223 224 def __init__(self, 225 conn: cosp.Connection, 226 syntax_names: SyntaxNames, 227 cp_ppdu: asn1.Value, 228 cpa_ppdu: asn1.Value, 229 local_psel: int | None, 230 remote_psel: int | None, 231 receive_queue_size: int, 232 send_queue_size: int): 233 cp_user_data = cp_ppdu['normal-mode-parameters']['user-data'] 234 cpa_user_data = cpa_ppdu['normal-mode-parameters']['user-data'] 235 236 conn_req_user_data = ( 237 syntax_names.get_name( 238 cp_user_data[1][0]['presentation-context-identifier']), 239 cp_user_data[1][0]['presentation-data-values'][1]) 240 conn_res_user_data = ( 241 syntax_names.get_name( 242 cpa_user_data[1][0]['presentation-context-identifier']), 243 cpa_user_data[1][0]['presentation-data-values'][1]) 244 245 self._conn = conn 246 self._syntax_names = syntax_names 247 self._conn_req_user_data = conn_req_user_data 248 self._conn_res_user_data = conn_res_user_data 249 self._loop = asyncio.get_running_loop() 250 self._info = ConnectionInfo(local_psel=local_psel, 251 remote_psel=remote_psel, 252 **conn.info._asdict()) 253 self._close_ppdu = _arp_ppdu() 254 self._receive_queue = aio.Queue(receive_queue_size) 255 self._send_queue = aio.Queue(send_queue_size) 256 self._async_group = aio.Group() 257 258 self.async_group.spawn(aio.call_on_cancel, self._on_close) 259 self.async_group.spawn(self._receive_loop) 260 self.async_group.spawn(self._send_loop) 261 self.async_group.spawn(aio.call_on_done, conn.wait_closing(), 262 self.close) 263 264 @property 265 def async_group(self) -> aio.Group: 266 """Async group""" 267 return self._async_group 268 269 @property 270 def info(self) -> ConnectionInfo: 271 """Connection info""" 272 return self._info 273 274 @property 275 def syntax_names(self) -> SyntaxNames: 276 """Syntax names""" 277 return self._syntax_names 278 279 @property 280 def conn_req_user_data(self) -> IdentifiedEntity: 281 """Connect request's user data""" 282 return self._conn_req_user_data 283 284 @property 285 def conn_res_user_data(self) -> IdentifiedEntity: 286 """Connect response's user data""" 287 return self._conn_res_user_data 288 289 def close(self, user_data: IdentifiedEntity | None = None): 290 """Close connection""" 291 self._close(_aru_ppdu(self._syntax_names, user_data)) 292 293 async def async_close(self, user_data: IdentifiedEntity | None = None): 294 """Async close""" 295 self.close(user_data) 296 await self.wait_closed() 297 298 async def receive(self) -> IdentifiedEntity: 299 """Receive data""" 300 try: 301 return await self._receive_queue.get() 302 303 except aio.QueueClosedError: 304 raise ConnectionError() 305 306 async def send(self, data: IdentifiedEntity): 307 """Send data""" 308 try: 309 await self._send_queue.put((data, None)) 310 311 except aio.QueueClosedError: 312 raise ConnectionError() 313 314 async def drain(self): 315 """Drain output buffer""" 316 try: 317 future = self._loop.create_future() 318 await self._send_queue.put((None, future)) 319 await future 320 321 except aio.QueueClosedError: 322 raise ConnectionError() 323 324 async def _on_close(self): 325 await _close_cosp(self._conn, self._close_ppdu) 326 327 def _close(self, ppdu): 328 if not self.is_open: 329 return 330 331 self._close_ppdu = ppdu 332 self._async_group.close() 333 334 async def _receive_loop(self): 335 try: 336 while True: 337 cosp_data = await self._conn.receive() 338 339 user_data = _decode('User-data', cosp_data) 340 pdv_list = user_data[1][0] 341 syntax_name = self._syntax_names.get_name( 342 pdv_list['presentation-context-identifier']) 343 data = pdv_list['presentation-data-values'][1] 344 345 await self._receive_queue.put((syntax_name, data)) 346 347 except ConnectionError: 348 pass 349 350 except Exception as e: 351 mlog.error("receive loop error: %s", e, exc_info=e) 352 353 finally: 354 self._close(_arp_ppdu()) 355 self._receive_queue.close() 356 357 async def _send_loop(self): 358 future = None 359 try: 360 while True: 361 data, future = await self._send_queue.get() 362 363 if data is None: 364 await self._conn.drain() 365 366 else: 367 ppdu_data = _encode('User-data', 368 _user_data(self._syntax_names, data)) 369 await self._conn.send(ppdu_data) 370 371 if future and not future.done(): 372 future.set_result(None) 373 374 except ConnectionError: 375 pass 376 377 except Exception as e: 378 mlog.error("send loop error: %s", e, exc_info=e) 379 380 finally: 381 self._close(_arp_ppdu()) 382 self._send_queue.close() 383 384 while True: 385 if future and not future.done(): 386 future.set_result(None) 387 if self._send_queue.empty(): 388 break 389 _, future = self._send_queue.get_nowait() 390 391 392async def _close_cosp(cosp_conn, ppdu): 393 try: 394 data = _encode('Abort-type', ppdu) 395 396 except Exception as e: 397 mlog.error("error encoding abort ppdu: %s", e, exc_info=e) 398 data = None 399 400 finally: 401 await cosp_conn.async_close(data) 402 403 404def _get_psels(cp_ppdu): 405 cp_params = cp_ppdu['normal-mode-parameters'] 406 calling_psel_data = cp_params.get('calling-presentation-selector') 407 calling_psel = (int.from_bytes(calling_psel_data, 'big') 408 if calling_psel_data else None) 409 called_psel_data = cp_params.get('called-presentation-selector') 410 called_psel = (int.from_bytes(called_psel_data, 'big') 411 if called_psel_data else None) 412 return calling_psel, called_psel 413 414 415def _validate_connect_response(cp_ppdu, cpa_ppdu): 416 cp_params = cp_ppdu['normal-mode-parameters'] 417 cpa_params = cpa_ppdu['normal-mode-parameters'] 418 called_psel_data = cp_params.get('called-presentation-selector') 419 responding_psel_data = cpa_params.get('responding-presentation-selector') 420 421 if called_psel_data and responding_psel_data: 422 called_psel = int.from_bytes(called_psel_data, 'big') 423 responding_psel = int.from_bytes(responding_psel_data, 'big') 424 425 if called_psel != responding_psel: 426 raise Exception('presentation selectors not matching') 427 428 result_list = cpa_params['presentation-context-definition-result-list'] 429 if any(i['result'] != 0 for i in result_list): 430 raise Exception('presentation context not accepted') 431 432 433def _cp_ppdu(syntax_names, calling_psel, called_psel, user_data): 434 cp_params = { 435 'presentation-context-definition-list': [ 436 {'presentation-context-identifier': i, 437 'abstract-syntax-name': name, 438 'transfer-syntax-name-list': [_encoder.syntax_name]} 439 for i, name in syntax_names._syntax_id_names.items()]} 440 441 if calling_psel is not None: 442 cp_params['calling-presentation-selector'] = \ 443 calling_psel.to_bytes(4, 'big') 444 445 if called_psel is not None: 446 cp_params['called-presentation-selector'] = \ 447 called_psel.to_bytes(4, 'big') 448 449 if user_data: 450 cp_params['user-data'] = _user_data(syntax_names, user_data) 451 452 return { 453 'mode-selector': { 454 'mode-value': 1}, 455 'normal-mode-parameters': cp_params} 456 457 458def _cpa_ppdu(syntax_names, responding_psel, user_data): 459 cpa_params = { 460 'presentation-context-definition-result-list': [ 461 {'result': 0, 462 'transfer-syntax-name': _encoder.syntax_name} 463 for _ in syntax_names._syntax_id_names.keys()]} 464 465 if responding_psel is not None: 466 cpa_params['responding-presentation-selector'] = \ 467 responding_psel.to_bytes(4, 'big') 468 469 if user_data: 470 cpa_params['user-data'] = _user_data(syntax_names, user_data) 471 472 return { 473 'mode-selector': { 474 'mode-value': 1}, 475 'normal-mode-parameters': cpa_params} 476 477 478def _aru_ppdu(syntax_names, user_data): 479 aru_params = {} 480 481 if user_data: 482 aru_params['user-data'] = _user_data(syntax_names, user_data) 483 484 return 'aru-ppdu', ('normal-mode-parameters', aru_params) 485 486 487def _arp_ppdu(): 488 return 'arp-ppdu', {} 489 490 491def _user_data(syntax_names, user_data): 492 return 'fully-encoded-data', [{ 493 'presentation-context-identifier': syntax_names.get_id(user_data[0]), 494 'presentation-data-values': ( 495 'single-ASN1-type', user_data[1])}] 496 497 498def _sytax_names_from_cp_ppdu(cp_ppdu): 499 cp_params = cp_ppdu['normal-mode-parameters'] 500 syntax_names = SyntaxNames([]) 501 syntax_names._syntax_id_names = { 502 i['presentation-context-identifier']: i['abstract-syntax-name'] 503 for i in cp_params['presentation-context-definition-list']} 504 syntax_names._syntax_name_ids = { 505 v: k for k, v in syntax_names._syntax_id_names.items()} 506 return syntax_names 507 508 509def _encode(name, value): 510 return _encoder.encode(asn1.TypeRef('ISO8823-PRESENTATION', name), value) 511 512 513def _decode(name, data): 514 res, _ = _encoder.decode(asn1.TypeRef('ISO8823-PRESENTATION', name), 515 memoryview(data)) 516 return res
25class ConnectionInfo(typing.NamedTuple): 26 local_addr: tcp.Address 27 local_tsel: int | None 28 local_ssel: int | None 29 local_psel: int | None 30 remote_addr: tcp.Address 31 remote_tsel: int | None 32 remote_ssel: int | None 33 remote_psel: int | None
ConnectionInfo(local_addr, local_tsel, local_ssel, local_psel, remote_addr, remote_tsel, remote_ssel, remote_psel)
Create new instance of ConnectionInfo(local_addr, local_tsel, local_ssel, local_psel, remote_addr, remote_tsel, remote_ssel, remote_psel)
Identified entity
Validate callback
Connection callback
48class SyntaxNames: 49 """Syntax name registry 50 51 Args: 52 syntax_names: list of ASN.1 ObjectIdentifiers representing syntax names 53 54 """ 55 56 def __init__(self, syntax_names: list[asn1.ObjectIdentifier]): 57 self._syntax_id_names = {(i * 2 + 1): name 58 for i, name in enumerate(syntax_names)} 59 self._syntax_name_ids = {v: k 60 for k, v in self._syntax_id_names.items()} 61 62 def get_name(self, syntax_id: int) -> asn1.ObjectIdentifier: 63 """Get syntax name associated with id""" 64 return self._syntax_id_names[syntax_id] 65 66 def get_id(self, syntax_name: asn1.ObjectIdentifier) -> int: 67 """Get syntax id associated with name""" 68 return self._syntax_name_ids[syntax_name]
Syntax name registry
Arguments:
- syntax_names: list of ASN.1 ObjectIdentifiers representing syntax names
71async def connect(addr: tcp.Address, 72 syntax_names: SyntaxNames, 73 user_data: IdentifiedEntity | None = None, 74 *, 75 local_psel: int | None = None, 76 remote_psel: int | None = None, 77 copp_receive_queue_size: int = 1024, 78 copp_send_queue_size: int = 1024, 79 **kwargs 80 ) -> 'Connection': 81 """Connect to COPP server 82 83 Additional arguments are passed directly to `hat.drivers.cosp.connect`. 84 85 """ 86 cp_ppdu = _cp_ppdu(syntax_names, local_psel, remote_psel, user_data) 87 cp_ppdu_data = _encode('CP-type', cp_ppdu) 88 conn = await cosp.connect(addr, cp_ppdu_data, **kwargs) 89 90 try: 91 cpa_ppdu = _decode('CPA-PPDU', conn.conn_res_user_data) 92 _validate_connect_response(cp_ppdu, cpa_ppdu) 93 94 calling_psel, called_psel = _get_psels(cp_ppdu) 95 return Connection(conn, syntax_names, cp_ppdu, cpa_ppdu, 96 calling_psel, called_psel, 97 copp_receive_queue_size, copp_send_queue_size) 98 99 except Exception: 100 await aio.uncancellable(_close_cosp(conn, _arp_ppdu())) 101 raise
Connect to COPP server
Additional arguments are passed directly to hat.drivers.cosp.connect
.
104async def listen(validate_cb: ValidateCb, 105 connection_cb: ConnectionCb, 106 addr: tcp.Address = tcp.Address('0.0.0.0', 102), 107 *, 108 bind_connections: bool = False, 109 copp_receive_queue_size: int = 1024, 110 copp_send_queue_size: int = 1024, 111 **kwargs 112 ) -> 'Server': 113 """Create COPP listening server 114 115 Additional arguments are passed directly to `hat.drivers.cosp.listen`. 116 117 Args: 118 validate_cb: callback function or coroutine called on new 119 incomming connection request prior to creating connection object 120 connection_cb: new connection callback 121 addr: local listening address 122 123 """ 124 server = Server() 125 server._validate_cb = validate_cb 126 server._connection_cb = connection_cb 127 server._bind_connections = bind_connections 128 server._receive_queue_size = copp_receive_queue_size 129 server._send_queue_size = copp_send_queue_size 130 131 server._srv = await cosp.listen(server._on_validate, 132 server._on_connection, 133 addr, 134 bind_connections=False, 135 **kwargs) 136 137 return server
Create COPP listening server
Additional arguments are passed directly to hat.drivers.cosp.listen
.
Arguments:
- validate_cb: callback function or coroutine called on new incomming connection request prior to creating connection object
- connection_cb: new connection callback
- addr: local listening address
140class Server(aio.Resource): 141 """COPP listening server 142 143 For creating new server see `listen`. 144 145 """ 146 147 @property 148 def async_group(self) -> aio.Group: 149 """Async group""" 150 return self._srv.async_group 151 152 @property 153 def addresses(self) -> list[tcp.Address]: 154 """Listening addresses""" 155 return self._srv.addresses 156 157 async def _on_validate(self, user_data): 158 cp_ppdu = _decode('CP-type', user_data) 159 cp_params = cp_ppdu['normal-mode-parameters'] 160 called_psel_data = cp_params.get('called-presentation-selector') 161 called_psel = (int.from_bytes(called_psel_data, 'big') 162 if called_psel_data else None) 163 cp_pdv_list = cp_params['user-data'][1][0] 164 syntax_names = _sytax_names_from_cp_ppdu(cp_ppdu) 165 cp_user_data = ( 166 syntax_names.get_name( 167 cp_pdv_list['presentation-context-identifier']), 168 cp_pdv_list['presentation-data-values'][1]) 169 170 cpa_user_data = await aio.call(self._validate_cb, syntax_names, 171 cp_user_data) 172 173 cpa_ppdu = _cpa_ppdu(syntax_names, called_psel, cpa_user_data) 174 cpa_ppdu_data = _encode('CPA-PPDU', cpa_ppdu) 175 return cpa_ppdu_data 176 177 async def _on_connection(self, cosp_conn): 178 try: 179 try: 180 cp_ppdu = _decode('CP-type', cosp_conn.conn_req_user_data) 181 cpa_ppdu = _decode('CPA-PPDU', cosp_conn.conn_res_user_data) 182 183 syntax_names = _sytax_names_from_cp_ppdu(cp_ppdu) 184 calling_psel, called_psel = _get_psels(cp_ppdu) 185 186 conn = Connection(cosp_conn, syntax_names, cp_ppdu, cpa_ppdu, 187 calling_psel, called_psel, 188 self._receive_queue_size, 189 self._send_queue_size) 190 191 except Exception: 192 await aio.uncancellable(_close_cosp(cosp_conn, _arp_ppdu())) 193 raise 194 195 try: 196 await aio.call(self._connection_cb, conn) 197 198 except BaseException: 199 await aio.uncancellable(conn.async_close()) 200 raise 201 202 except Exception as e: 203 mlog.error("error creating new incomming connection: %s", e, 204 exc_info=e) 205 return 206 207 if not self._bind_connections: 208 return 209 210 try: 211 await conn.wait_closed() 212 213 except BaseException: 214 await aio.uncancellable(conn.async_close()) 215 raise
COPP listening server
For creating new server see listen
.
147 @property 148 def async_group(self) -> aio.Group: 149 """Async group""" 150 return self._srv.async_group
Async group
152 @property 153 def addresses(self) -> list[tcp.Address]: 154 """Listening addresses""" 155 return self._srv.addresses
Listening addresses
218class Connection(aio.Resource): 219 """COPP connection 220 221 For creating new connection see `connect` or `listen`. 222 223 """ 224 225 def __init__(self, 226 conn: cosp.Connection, 227 syntax_names: SyntaxNames, 228 cp_ppdu: asn1.Value, 229 cpa_ppdu: asn1.Value, 230 local_psel: int | None, 231 remote_psel: int | None, 232 receive_queue_size: int, 233 send_queue_size: int): 234 cp_user_data = cp_ppdu['normal-mode-parameters']['user-data'] 235 cpa_user_data = cpa_ppdu['normal-mode-parameters']['user-data'] 236 237 conn_req_user_data = ( 238 syntax_names.get_name( 239 cp_user_data[1][0]['presentation-context-identifier']), 240 cp_user_data[1][0]['presentation-data-values'][1]) 241 conn_res_user_data = ( 242 syntax_names.get_name( 243 cpa_user_data[1][0]['presentation-context-identifier']), 244 cpa_user_data[1][0]['presentation-data-values'][1]) 245 246 self._conn = conn 247 self._syntax_names = syntax_names 248 self._conn_req_user_data = conn_req_user_data 249 self._conn_res_user_data = conn_res_user_data 250 self._loop = asyncio.get_running_loop() 251 self._info = ConnectionInfo(local_psel=local_psel, 252 remote_psel=remote_psel, 253 **conn.info._asdict()) 254 self._close_ppdu = _arp_ppdu() 255 self._receive_queue = aio.Queue(receive_queue_size) 256 self._send_queue = aio.Queue(send_queue_size) 257 self._async_group = aio.Group() 258 259 self.async_group.spawn(aio.call_on_cancel, self._on_close) 260 self.async_group.spawn(self._receive_loop) 261 self.async_group.spawn(self._send_loop) 262 self.async_group.spawn(aio.call_on_done, conn.wait_closing(), 263 self.close) 264 265 @property 266 def async_group(self) -> aio.Group: 267 """Async group""" 268 return self._async_group 269 270 @property 271 def info(self) -> ConnectionInfo: 272 """Connection info""" 273 return self._info 274 275 @property 276 def syntax_names(self) -> SyntaxNames: 277 """Syntax names""" 278 return self._syntax_names 279 280 @property 281 def conn_req_user_data(self) -> IdentifiedEntity: 282 """Connect request's user data""" 283 return self._conn_req_user_data 284 285 @property 286 def conn_res_user_data(self) -> IdentifiedEntity: 287 """Connect response's user data""" 288 return self._conn_res_user_data 289 290 def close(self, user_data: IdentifiedEntity | None = None): 291 """Close connection""" 292 self._close(_aru_ppdu(self._syntax_names, user_data)) 293 294 async def async_close(self, user_data: IdentifiedEntity | None = None): 295 """Async close""" 296 self.close(user_data) 297 await self.wait_closed() 298 299 async def receive(self) -> IdentifiedEntity: 300 """Receive data""" 301 try: 302 return await self._receive_queue.get() 303 304 except aio.QueueClosedError: 305 raise ConnectionError() 306 307 async def send(self, data: IdentifiedEntity): 308 """Send data""" 309 try: 310 await self._send_queue.put((data, None)) 311 312 except aio.QueueClosedError: 313 raise ConnectionError() 314 315 async def drain(self): 316 """Drain output buffer""" 317 try: 318 future = self._loop.create_future() 319 await self._send_queue.put((None, future)) 320 await future 321 322 except aio.QueueClosedError: 323 raise ConnectionError() 324 325 async def _on_close(self): 326 await _close_cosp(self._conn, self._close_ppdu) 327 328 def _close(self, ppdu): 329 if not self.is_open: 330 return 331 332 self._close_ppdu = ppdu 333 self._async_group.close() 334 335 async def _receive_loop(self): 336 try: 337 while True: 338 cosp_data = await self._conn.receive() 339 340 user_data = _decode('User-data', cosp_data) 341 pdv_list = user_data[1][0] 342 syntax_name = self._syntax_names.get_name( 343 pdv_list['presentation-context-identifier']) 344 data = pdv_list['presentation-data-values'][1] 345 346 await self._receive_queue.put((syntax_name, data)) 347 348 except ConnectionError: 349 pass 350 351 except Exception as e: 352 mlog.error("receive loop error: %s", e, exc_info=e) 353 354 finally: 355 self._close(_arp_ppdu()) 356 self._receive_queue.close() 357 358 async def _send_loop(self): 359 future = None 360 try: 361 while True: 362 data, future = await self._send_queue.get() 363 364 if data is None: 365 await self._conn.drain() 366 367 else: 368 ppdu_data = _encode('User-data', 369 _user_data(self._syntax_names, data)) 370 await self._conn.send(ppdu_data) 371 372 if future and not future.done(): 373 future.set_result(None) 374 375 except ConnectionError: 376 pass 377 378 except Exception as e: 379 mlog.error("send loop error: %s", e, exc_info=e) 380 381 finally: 382 self._close(_arp_ppdu()) 383 self._send_queue.close() 384 385 while True: 386 if future and not future.done(): 387 future.set_result(None) 388 if self._send_queue.empty(): 389 break 390 _, future = self._send_queue.get_nowait()
225 def __init__(self, 226 conn: cosp.Connection, 227 syntax_names: SyntaxNames, 228 cp_ppdu: asn1.Value, 229 cpa_ppdu: asn1.Value, 230 local_psel: int | None, 231 remote_psel: int | None, 232 receive_queue_size: int, 233 send_queue_size: int): 234 cp_user_data = cp_ppdu['normal-mode-parameters']['user-data'] 235 cpa_user_data = cpa_ppdu['normal-mode-parameters']['user-data'] 236 237 conn_req_user_data = ( 238 syntax_names.get_name( 239 cp_user_data[1][0]['presentation-context-identifier']), 240 cp_user_data[1][0]['presentation-data-values'][1]) 241 conn_res_user_data = ( 242 syntax_names.get_name( 243 cpa_user_data[1][0]['presentation-context-identifier']), 244 cpa_user_data[1][0]['presentation-data-values'][1]) 245 246 self._conn = conn 247 self._syntax_names = syntax_names 248 self._conn_req_user_data = conn_req_user_data 249 self._conn_res_user_data = conn_res_user_data 250 self._loop = asyncio.get_running_loop() 251 self._info = ConnectionInfo(local_psel=local_psel, 252 remote_psel=remote_psel, 253 **conn.info._asdict()) 254 self._close_ppdu = _arp_ppdu() 255 self._receive_queue = aio.Queue(receive_queue_size) 256 self._send_queue = aio.Queue(send_queue_size) 257 self._async_group = aio.Group() 258 259 self.async_group.spawn(aio.call_on_cancel, self._on_close) 260 self.async_group.spawn(self._receive_loop) 261 self.async_group.spawn(self._send_loop) 262 self.async_group.spawn(aio.call_on_done, conn.wait_closing(), 263 self.close)
265 @property 266 def async_group(self) -> aio.Group: 267 """Async group""" 268 return self._async_group
Async group
275 @property 276 def syntax_names(self) -> SyntaxNames: 277 """Syntax names""" 278 return self._syntax_names
Syntax names
280 @property 281 def conn_req_user_data(self) -> IdentifiedEntity: 282 """Connect request's user data""" 283 return self._conn_req_user_data
Connect request's user data
285 @property 286 def conn_res_user_data(self) -> IdentifiedEntity: 287 """Connect response's user data""" 288 return self._conn_res_user_data
Connect response's user data
290 def close(self, user_data: IdentifiedEntity | None = None): 291 """Close connection""" 292 self._close(_aru_ppdu(self._syntax_names, user_data))
Close connection
294 async def async_close(self, user_data: IdentifiedEntity | None = None): 295 """Async close""" 296 self.close(user_data) 297 await self.wait_closed()
Async close
299 async def receive(self) -> IdentifiedEntity: 300 """Receive data""" 301 try: 302 return await self._receive_queue.get() 303 304 except aio.QueueClosedError: 305 raise ConnectionError()
Receive data