hat.drivers.copp
Connection oriented presentation protocol
1"""Connection oriented presentation protocol""" 2 3import asyncio 4import importlib.resources 5import logging 6import typing 7 8from hat import aio 9from hat import asn1 10 11from hat.drivers import cosp 12from hat.drivers import tcp 13 14 15mlog = logging.getLogger(__name__) 16 17with importlib.resources.as_file(importlib.resources.files(__package__) / 18 'asn1_repo.json') as _path: 19 _encoder = asn1.Encoder(asn1.Encoding.BER, 20 asn1.Repository.from_json(_path)) 21 22 23class ConnectionInfo(typing.NamedTuple): 24 local_addr: tcp.Address 25 local_tsel: int | None 26 local_ssel: int | None 27 local_psel: int | None 28 remote_addr: tcp.Address 29 remote_tsel: int | None 30 remote_ssel: int | None 31 remote_psel: int | None 32 33 34IdentifiedEntity: typing.TypeAlias = tuple[asn1.ObjectIdentifier, asn1.Entity] 35"""Identified entity""" 36 37ValidateCb: typing.TypeAlias = aio.AsyncCallable[['SyntaxNames', 38 IdentifiedEntity], 39 IdentifiedEntity | None] 40"""Validate callback""" 41 42ConnectionCb: typing.TypeAlias = aio.AsyncCallable[['Connection'], None] 43"""Connection callback""" 44 45 46class SyntaxNames: 47 """Syntax name registry 48 49 Args: 50 syntax_names: list of ASN.1 ObjectIdentifiers representing syntax names 51 52 """ 53 54 def __init__(self, syntax_names: list[asn1.ObjectIdentifier]): 55 self._syntax_id_names = {(i * 2 + 1): name 56 for i, name in enumerate(syntax_names)} 57 self._syntax_name_ids = {v: k 58 for k, v in self._syntax_id_names.items()} 59 60 def get_name(self, syntax_id: int) -> asn1.ObjectIdentifier: 61 """Get syntax name associated with id""" 62 return self._syntax_id_names[syntax_id] 63 64 def get_id(self, syntax_name: asn1.ObjectIdentifier) -> int: 65 """Get syntax id associated with name""" 66 return self._syntax_name_ids[syntax_name] 67 68 69async def connect(addr: tcp.Address, 70 syntax_names: SyntaxNames, 71 user_data: IdentifiedEntity | None = None, 72 *, 73 local_psel: int | None = None, 74 remote_psel: int | None = None, 75 copp_receive_queue_size: int = 1024, 76 copp_send_queue_size: int = 1024, 77 **kwargs 78 ) -> 'Connection': 79 """Connect to COPP server 80 81 Additional arguments are passed directly to `hat.drivers.cosp.connect`. 82 83 """ 84 cp_ppdu = _cp_ppdu(syntax_names, local_psel, remote_psel, user_data) 85 cp_ppdu_data = _encode('CP-type', cp_ppdu) 86 conn = await cosp.connect(addr, cp_ppdu_data, **kwargs) 87 88 try: 89 cpa_ppdu = _decode('CPA-PPDU', conn.conn_res_user_data) 90 _validate_connect_response(cp_ppdu, cpa_ppdu) 91 92 calling_psel, called_psel = _get_psels(cp_ppdu) 93 return Connection(conn, syntax_names, cp_ppdu, cpa_ppdu, 94 calling_psel, called_psel, 95 copp_receive_queue_size, copp_send_queue_size) 96 97 except Exception: 98 await aio.uncancellable(_close_cosp(conn, _arp_ppdu())) 99 raise 100 101 102async def listen(validate_cb: ValidateCb, 103 connection_cb: ConnectionCb, 104 addr: tcp.Address = tcp.Address('0.0.0.0', 102), 105 *, 106 bind_connections: bool = False, 107 copp_receive_queue_size: int = 1024, 108 copp_send_queue_size: int = 1024, 109 **kwargs 110 ) -> 'Server': 111 """Create COPP listening server 112 113 Additional arguments are passed directly to `hat.drivers.cosp.listen`. 114 115 Args: 116 validate_cb: callback function or coroutine called on new 117 incomming connection request prior to creating connection object 118 connection_cb: new connection callback 119 addr: local listening address 120 121 """ 122 server = Server() 123 server._validate_cb = validate_cb 124 server._connection_cb = connection_cb 125 server._bind_connections = bind_connections 126 server._receive_queue_size = copp_receive_queue_size 127 server._send_queue_size = copp_send_queue_size 128 129 server._srv = await cosp.listen(server._on_validate, 130 server._on_connection, 131 addr, 132 bind_connections=False, 133 **kwargs) 134 135 return server 136 137 138class Server(aio.Resource): 139 """COPP listening server 140 141 For creating new server see `listen`. 142 143 """ 144 145 @property 146 def async_group(self) -> aio.Group: 147 """Async group""" 148 return self._srv.async_group 149 150 @property 151 def addresses(self) -> list[tcp.Address]: 152 """Listening addresses""" 153 return self._srv.addresses 154 155 async def _on_validate(self, user_data): 156 cp_ppdu = _decode('CP-type', user_data) 157 cp_params = cp_ppdu['normal-mode-parameters'] 158 called_psel_data = cp_params.get('called-presentation-selector') 159 called_psel = (int.from_bytes(called_psel_data, 'big') 160 if called_psel_data else None) 161 cp_pdv_list = cp_params['user-data'][1][0] 162 syntax_names = _sytax_names_from_cp_ppdu(cp_ppdu) 163 cp_user_data = ( 164 syntax_names.get_name( 165 cp_pdv_list['presentation-context-identifier']), 166 cp_pdv_list['presentation-data-values'][1]) 167 168 cpa_user_data = await aio.call(self._validate_cb, syntax_names, 169 cp_user_data) 170 171 cpa_ppdu = _cpa_ppdu(syntax_names, called_psel, cpa_user_data) 172 cpa_ppdu_data = _encode('CPA-PPDU', cpa_ppdu) 173 return cpa_ppdu_data 174 175 async def _on_connection(self, cosp_conn): 176 try: 177 try: 178 cp_ppdu = _decode('CP-type', cosp_conn.conn_req_user_data) 179 cpa_ppdu = _decode('CPA-PPDU', cosp_conn.conn_res_user_data) 180 181 syntax_names = _sytax_names_from_cp_ppdu(cp_ppdu) 182 calling_psel, called_psel = _get_psels(cp_ppdu) 183 184 conn = Connection(cosp_conn, syntax_names, cp_ppdu, cpa_ppdu, 185 calling_psel, called_psel, 186 self._receive_queue_size, 187 self._send_queue_size) 188 189 except Exception: 190 await aio.uncancellable(_close_cosp(cosp_conn, _arp_ppdu())) 191 raise 192 193 try: 194 await aio.call(self._connection_cb, conn) 195 196 except BaseException: 197 await aio.uncancellable(conn.async_close()) 198 raise 199 200 except Exception as e: 201 mlog.error("error creating new incomming connection: %s", e, 202 exc_info=e) 203 return 204 205 if not self._bind_connections: 206 return 207 208 try: 209 await conn.wait_closed() 210 211 except BaseException: 212 await aio.uncancellable(conn.async_close()) 213 raise 214 215 216class Connection(aio.Resource): 217 """COPP connection 218 219 For creating new connection see `connect` or `listen`. 220 221 """ 222 223 def __init__(self, 224 conn: cosp.Connection, 225 syntax_names: SyntaxNames, 226 cp_ppdu: asn1.Value, 227 cpa_ppdu: asn1.Value, 228 local_psel: int | None, 229 remote_psel: int | None, 230 receive_queue_size: int, 231 send_queue_size: int): 232 cp_user_data = cp_ppdu['normal-mode-parameters']['user-data'] 233 cpa_user_data = cpa_ppdu['normal-mode-parameters']['user-data'] 234 235 conn_req_user_data = ( 236 syntax_names.get_name( 237 cp_user_data[1][0]['presentation-context-identifier']), 238 cp_user_data[1][0]['presentation-data-values'][1]) 239 conn_res_user_data = ( 240 syntax_names.get_name( 241 cpa_user_data[1][0]['presentation-context-identifier']), 242 cpa_user_data[1][0]['presentation-data-values'][1]) 243 244 self._conn = conn 245 self._syntax_names = syntax_names 246 self._conn_req_user_data = conn_req_user_data 247 self._conn_res_user_data = conn_res_user_data 248 self._loop = asyncio.get_running_loop() 249 self._info = ConnectionInfo(local_psel=local_psel, 250 remote_psel=remote_psel, 251 **conn.info._asdict()) 252 self._close_ppdu = _arp_ppdu() 253 self._receive_queue = aio.Queue(receive_queue_size) 254 self._send_queue = aio.Queue(send_queue_size) 255 self._async_group = aio.Group() 256 257 self.async_group.spawn(aio.call_on_cancel, self._on_close) 258 self.async_group.spawn(self._receive_loop) 259 self.async_group.spawn(self._send_loop) 260 self.async_group.spawn(aio.call_on_done, conn.wait_closing(), 261 self.close) 262 263 @property 264 def async_group(self) -> aio.Group: 265 """Async group""" 266 return self._async_group 267 268 @property 269 def info(self) -> ConnectionInfo: 270 """Connection info""" 271 return self._info 272 273 @property 274 def syntax_names(self) -> SyntaxNames: 275 """Syntax names""" 276 return self._syntax_names 277 278 @property 279 def conn_req_user_data(self) -> IdentifiedEntity: 280 """Connect request's user data""" 281 return self._conn_req_user_data 282 283 @property 284 def conn_res_user_data(self) -> IdentifiedEntity: 285 """Connect response's user data""" 286 return self._conn_res_user_data 287 288 def close(self, user_data: IdentifiedEntity | None = None): 289 """Close connection""" 290 self._close(_aru_ppdu(self._syntax_names, user_data)) 291 292 async def async_close(self, user_data: IdentifiedEntity | None = None): 293 """Async close""" 294 self.close(user_data) 295 await self.wait_closed() 296 297 async def receive(self) -> IdentifiedEntity: 298 """Receive data""" 299 try: 300 return await self._receive_queue.get() 301 302 except aio.QueueClosedError: 303 raise ConnectionError() 304 305 async def send(self, data: IdentifiedEntity): 306 """Send data""" 307 try: 308 await self._send_queue.put((data, None)) 309 310 except aio.QueueClosedError: 311 raise ConnectionError() 312 313 async def drain(self): 314 """Drain output buffer""" 315 try: 316 future = self._loop.create_future() 317 await self._send_queue.put((None, future)) 318 await future 319 320 except aio.QueueClosedError: 321 raise ConnectionError() 322 323 async def _on_close(self): 324 await _close_cosp(self._conn, self._close_ppdu) 325 326 def _close(self, ppdu): 327 if not self.is_open: 328 return 329 330 self._close_ppdu = ppdu 331 self._async_group.close() 332 333 async def _receive_loop(self): 334 try: 335 while True: 336 cosp_data = await self._conn.receive() 337 338 user_data = _decode('User-data', cosp_data) 339 pdv_list = user_data[1][0] 340 syntax_name = self._syntax_names.get_name( 341 pdv_list['presentation-context-identifier']) 342 data = pdv_list['presentation-data-values'][1] 343 344 await self._receive_queue.put((syntax_name, data)) 345 346 except ConnectionError: 347 pass 348 349 except Exception as e: 350 mlog.error("receive loop error: %s", e, exc_info=e) 351 352 finally: 353 self._close(_arp_ppdu()) 354 self._receive_queue.close() 355 356 async def _send_loop(self): 357 future = None 358 try: 359 while True: 360 data, future = await self._send_queue.get() 361 362 if data is None: 363 await self._conn.drain() 364 365 else: 366 ppdu_data = _encode('User-data', 367 _user_data(self._syntax_names, data)) 368 await self._conn.send(ppdu_data) 369 370 if future and not future.done(): 371 future.set_result(None) 372 373 except ConnectionError: 374 pass 375 376 except Exception as e: 377 mlog.error("send loop error: %s", e, exc_info=e) 378 379 finally: 380 self._close(_arp_ppdu()) 381 self._send_queue.close() 382 383 while True: 384 if future and not future.done(): 385 future.set_result(None) 386 if self._send_queue.empty(): 387 break 388 _, future = self._send_queue.get_nowait() 389 390 391async def _close_cosp(cosp_conn, ppdu): 392 try: 393 data = _encode('Abort-type', ppdu) 394 395 except Exception as e: 396 mlog.error("error encoding abort ppdu: %s", e, exc_info=e) 397 data = None 398 399 finally: 400 await cosp_conn.async_close(data) 401 402 403def _get_psels(cp_ppdu): 404 cp_params = cp_ppdu['normal-mode-parameters'] 405 calling_psel_data = cp_params.get('calling-presentation-selector') 406 calling_psel = (int.from_bytes(calling_psel_data, 'big') 407 if calling_psel_data else None) 408 called_psel_data = cp_params.get('called-presentation-selector') 409 called_psel = (int.from_bytes(called_psel_data, 'big') 410 if called_psel_data else None) 411 return calling_psel, called_psel 412 413 414def _validate_connect_response(cp_ppdu, cpa_ppdu): 415 cp_params = cp_ppdu['normal-mode-parameters'] 416 cpa_params = cpa_ppdu['normal-mode-parameters'] 417 called_psel_data = cp_params.get('called-presentation-selector') 418 responding_psel_data = cpa_params.get('responding-presentation-selector') 419 420 if called_psel_data and responding_psel_data: 421 called_psel = int.from_bytes(called_psel_data, 'big') 422 responding_psel = int.from_bytes(responding_psel_data, 'big') 423 424 if called_psel != responding_psel: 425 raise Exception('presentation selectors not matching') 426 427 result_list = cpa_params['presentation-context-definition-result-list'] 428 if any(i['result'] != 0 for i in result_list): 429 raise Exception('presentation context not accepted') 430 431 432def _cp_ppdu(syntax_names, calling_psel, called_psel, user_data): 433 cp_params = { 434 'presentation-context-definition-list': [ 435 {'presentation-context-identifier': i, 436 'abstract-syntax-name': name, 437 'transfer-syntax-name-list': [_encoder.syntax_name]} 438 for i, name in syntax_names._syntax_id_names.items()]} 439 440 if calling_psel is not None: 441 cp_params['calling-presentation-selector'] = \ 442 calling_psel.to_bytes(4, 'big') 443 444 if called_psel is not None: 445 cp_params['called-presentation-selector'] = \ 446 called_psel.to_bytes(4, 'big') 447 448 if user_data: 449 cp_params['user-data'] = _user_data(syntax_names, user_data) 450 451 return { 452 'mode-selector': { 453 'mode-value': 1}, 454 'normal-mode-parameters': cp_params} 455 456 457def _cpa_ppdu(syntax_names, responding_psel, user_data): 458 cpa_params = { 459 'presentation-context-definition-result-list': [ 460 {'result': 0, 461 'transfer-syntax-name': _encoder.syntax_name} 462 for _ in syntax_names._syntax_id_names.keys()]} 463 464 if responding_psel is not None: 465 cpa_params['responding-presentation-selector'] = \ 466 responding_psel.to_bytes(4, 'big') 467 468 if user_data: 469 cpa_params['user-data'] = _user_data(syntax_names, user_data) 470 471 return { 472 'mode-selector': { 473 'mode-value': 1}, 474 'normal-mode-parameters': cpa_params} 475 476 477def _aru_ppdu(syntax_names, user_data): 478 aru_params = {} 479 480 if user_data: 481 aru_params['user-data'] = _user_data(syntax_names, user_data) 482 483 return 'aru-ppdu', ('normal-mode-parameters', aru_params) 484 485 486def _arp_ppdu(): 487 return 'arp-ppdu', {} 488 489 490def _user_data(syntax_names, user_data): 491 return 'fully-encoded-data', [{ 492 'presentation-context-identifier': syntax_names.get_id(user_data[0]), 493 'presentation-data-values': ( 494 'single-ASN1-type', user_data[1])}] 495 496 497def _sytax_names_from_cp_ppdu(cp_ppdu): 498 cp_params = cp_ppdu['normal-mode-parameters'] 499 syntax_names = SyntaxNames([]) 500 syntax_names._syntax_id_names = { 501 i['presentation-context-identifier']: i['abstract-syntax-name'] 502 for i in cp_params['presentation-context-definition-list']} 503 syntax_names._syntax_name_ids = { 504 v: k for k, v in syntax_names._syntax_id_names.items()} 505 return syntax_names 506 507 508def _encode(name, value): 509 return _encoder.encode('ISO8823-PRESENTATION', name, value) 510 511 512def _decode(name, data): 513 res, _ = _encoder.decode('ISO8823-PRESENTATION', name, memoryview(data)) 514 return res
24class ConnectionInfo(typing.NamedTuple): 25 local_addr: tcp.Address 26 local_tsel: int | None 27 local_ssel: int | None 28 local_psel: int | None 29 remote_addr: tcp.Address 30 remote_tsel: int | None 31 remote_ssel: int | None 32 remote_psel: int | None
ConnectionInfo(local_addr, local_tsel, local_ssel, local_psel, remote_addr, remote_tsel, remote_ssel, remote_psel)
Create new instance of ConnectionInfo(local_addr, local_tsel, local_ssel, local_psel, remote_addr, remote_tsel, remote_ssel, remote_psel)
Identified entity
Validate callback
Connection callback
47class SyntaxNames: 48 """Syntax name registry 49 50 Args: 51 syntax_names: list of ASN.1 ObjectIdentifiers representing syntax names 52 53 """ 54 55 def __init__(self, syntax_names: list[asn1.ObjectIdentifier]): 56 self._syntax_id_names = {(i * 2 + 1): name 57 for i, name in enumerate(syntax_names)} 58 self._syntax_name_ids = {v: k 59 for k, v in self._syntax_id_names.items()} 60 61 def get_name(self, syntax_id: int) -> asn1.ObjectIdentifier: 62 """Get syntax name associated with id""" 63 return self._syntax_id_names[syntax_id] 64 65 def get_id(self, syntax_name: asn1.ObjectIdentifier) -> int: 66 """Get syntax id associated with name""" 67 return self._syntax_name_ids[syntax_name]
Syntax name registry
Arguments:
- syntax_names: list of ASN.1 ObjectIdentifiers representing syntax names
70async def connect(addr: tcp.Address, 71 syntax_names: SyntaxNames, 72 user_data: IdentifiedEntity | None = None, 73 *, 74 local_psel: int | None = None, 75 remote_psel: int | None = None, 76 copp_receive_queue_size: int = 1024, 77 copp_send_queue_size: int = 1024, 78 **kwargs 79 ) -> 'Connection': 80 """Connect to COPP server 81 82 Additional arguments are passed directly to `hat.drivers.cosp.connect`. 83 84 """ 85 cp_ppdu = _cp_ppdu(syntax_names, local_psel, remote_psel, user_data) 86 cp_ppdu_data = _encode('CP-type', cp_ppdu) 87 conn = await cosp.connect(addr, cp_ppdu_data, **kwargs) 88 89 try: 90 cpa_ppdu = _decode('CPA-PPDU', conn.conn_res_user_data) 91 _validate_connect_response(cp_ppdu, cpa_ppdu) 92 93 calling_psel, called_psel = _get_psels(cp_ppdu) 94 return Connection(conn, syntax_names, cp_ppdu, cpa_ppdu, 95 calling_psel, called_psel, 96 copp_receive_queue_size, copp_send_queue_size) 97 98 except Exception: 99 await aio.uncancellable(_close_cosp(conn, _arp_ppdu())) 100 raise
Connect to COPP server
Additional arguments are passed directly to hat.drivers.cosp.connect
.
103async def listen(validate_cb: ValidateCb, 104 connection_cb: ConnectionCb, 105 addr: tcp.Address = tcp.Address('0.0.0.0', 102), 106 *, 107 bind_connections: bool = False, 108 copp_receive_queue_size: int = 1024, 109 copp_send_queue_size: int = 1024, 110 **kwargs 111 ) -> 'Server': 112 """Create COPP listening server 113 114 Additional arguments are passed directly to `hat.drivers.cosp.listen`. 115 116 Args: 117 validate_cb: callback function or coroutine called on new 118 incomming connection request prior to creating connection object 119 connection_cb: new connection callback 120 addr: local listening address 121 122 """ 123 server = Server() 124 server._validate_cb = validate_cb 125 server._connection_cb = connection_cb 126 server._bind_connections = bind_connections 127 server._receive_queue_size = copp_receive_queue_size 128 server._send_queue_size = copp_send_queue_size 129 130 server._srv = await cosp.listen(server._on_validate, 131 server._on_connection, 132 addr, 133 bind_connections=False, 134 **kwargs) 135 136 return server
Create COPP listening server
Additional arguments are passed directly to hat.drivers.cosp.listen
.
Arguments:
- validate_cb: callback function or coroutine called on new incomming connection request prior to creating connection object
- connection_cb: new connection callback
- addr: local listening address
139class Server(aio.Resource): 140 """COPP listening server 141 142 For creating new server see `listen`. 143 144 """ 145 146 @property 147 def async_group(self) -> aio.Group: 148 """Async group""" 149 return self._srv.async_group 150 151 @property 152 def addresses(self) -> list[tcp.Address]: 153 """Listening addresses""" 154 return self._srv.addresses 155 156 async def _on_validate(self, user_data): 157 cp_ppdu = _decode('CP-type', user_data) 158 cp_params = cp_ppdu['normal-mode-parameters'] 159 called_psel_data = cp_params.get('called-presentation-selector') 160 called_psel = (int.from_bytes(called_psel_data, 'big') 161 if called_psel_data else None) 162 cp_pdv_list = cp_params['user-data'][1][0] 163 syntax_names = _sytax_names_from_cp_ppdu(cp_ppdu) 164 cp_user_data = ( 165 syntax_names.get_name( 166 cp_pdv_list['presentation-context-identifier']), 167 cp_pdv_list['presentation-data-values'][1]) 168 169 cpa_user_data = await aio.call(self._validate_cb, syntax_names, 170 cp_user_data) 171 172 cpa_ppdu = _cpa_ppdu(syntax_names, called_psel, cpa_user_data) 173 cpa_ppdu_data = _encode('CPA-PPDU', cpa_ppdu) 174 return cpa_ppdu_data 175 176 async def _on_connection(self, cosp_conn): 177 try: 178 try: 179 cp_ppdu = _decode('CP-type', cosp_conn.conn_req_user_data) 180 cpa_ppdu = _decode('CPA-PPDU', cosp_conn.conn_res_user_data) 181 182 syntax_names = _sytax_names_from_cp_ppdu(cp_ppdu) 183 calling_psel, called_psel = _get_psels(cp_ppdu) 184 185 conn = Connection(cosp_conn, syntax_names, cp_ppdu, cpa_ppdu, 186 calling_psel, called_psel, 187 self._receive_queue_size, 188 self._send_queue_size) 189 190 except Exception: 191 await aio.uncancellable(_close_cosp(cosp_conn, _arp_ppdu())) 192 raise 193 194 try: 195 await aio.call(self._connection_cb, conn) 196 197 except BaseException: 198 await aio.uncancellable(conn.async_close()) 199 raise 200 201 except Exception as e: 202 mlog.error("error creating new incomming connection: %s", e, 203 exc_info=e) 204 return 205 206 if not self._bind_connections: 207 return 208 209 try: 210 await conn.wait_closed() 211 212 except BaseException: 213 await aio.uncancellable(conn.async_close()) 214 raise
COPP listening server
For creating new server see listen
.
146 @property 147 def async_group(self) -> aio.Group: 148 """Async group""" 149 return self._srv.async_group
Async group
151 @property 152 def addresses(self) -> list[tcp.Address]: 153 """Listening addresses""" 154 return self._srv.addresses
Listening addresses
217class Connection(aio.Resource): 218 """COPP connection 219 220 For creating new connection see `connect` or `listen`. 221 222 """ 223 224 def __init__(self, 225 conn: cosp.Connection, 226 syntax_names: SyntaxNames, 227 cp_ppdu: asn1.Value, 228 cpa_ppdu: asn1.Value, 229 local_psel: int | None, 230 remote_psel: int | None, 231 receive_queue_size: int, 232 send_queue_size: int): 233 cp_user_data = cp_ppdu['normal-mode-parameters']['user-data'] 234 cpa_user_data = cpa_ppdu['normal-mode-parameters']['user-data'] 235 236 conn_req_user_data = ( 237 syntax_names.get_name( 238 cp_user_data[1][0]['presentation-context-identifier']), 239 cp_user_data[1][0]['presentation-data-values'][1]) 240 conn_res_user_data = ( 241 syntax_names.get_name( 242 cpa_user_data[1][0]['presentation-context-identifier']), 243 cpa_user_data[1][0]['presentation-data-values'][1]) 244 245 self._conn = conn 246 self._syntax_names = syntax_names 247 self._conn_req_user_data = conn_req_user_data 248 self._conn_res_user_data = conn_res_user_data 249 self._loop = asyncio.get_running_loop() 250 self._info = ConnectionInfo(local_psel=local_psel, 251 remote_psel=remote_psel, 252 **conn.info._asdict()) 253 self._close_ppdu = _arp_ppdu() 254 self._receive_queue = aio.Queue(receive_queue_size) 255 self._send_queue = aio.Queue(send_queue_size) 256 self._async_group = aio.Group() 257 258 self.async_group.spawn(aio.call_on_cancel, self._on_close) 259 self.async_group.spawn(self._receive_loop) 260 self.async_group.spawn(self._send_loop) 261 self.async_group.spawn(aio.call_on_done, conn.wait_closing(), 262 self.close) 263 264 @property 265 def async_group(self) -> aio.Group: 266 """Async group""" 267 return self._async_group 268 269 @property 270 def info(self) -> ConnectionInfo: 271 """Connection info""" 272 return self._info 273 274 @property 275 def syntax_names(self) -> SyntaxNames: 276 """Syntax names""" 277 return self._syntax_names 278 279 @property 280 def conn_req_user_data(self) -> IdentifiedEntity: 281 """Connect request's user data""" 282 return self._conn_req_user_data 283 284 @property 285 def conn_res_user_data(self) -> IdentifiedEntity: 286 """Connect response's user data""" 287 return self._conn_res_user_data 288 289 def close(self, user_data: IdentifiedEntity | None = None): 290 """Close connection""" 291 self._close(_aru_ppdu(self._syntax_names, user_data)) 292 293 async def async_close(self, user_data: IdentifiedEntity | None = None): 294 """Async close""" 295 self.close(user_data) 296 await self.wait_closed() 297 298 async def receive(self) -> IdentifiedEntity: 299 """Receive data""" 300 try: 301 return await self._receive_queue.get() 302 303 except aio.QueueClosedError: 304 raise ConnectionError() 305 306 async def send(self, data: IdentifiedEntity): 307 """Send data""" 308 try: 309 await self._send_queue.put((data, None)) 310 311 except aio.QueueClosedError: 312 raise ConnectionError() 313 314 async def drain(self): 315 """Drain output buffer""" 316 try: 317 future = self._loop.create_future() 318 await self._send_queue.put((None, future)) 319 await future 320 321 except aio.QueueClosedError: 322 raise ConnectionError() 323 324 async def _on_close(self): 325 await _close_cosp(self._conn, self._close_ppdu) 326 327 def _close(self, ppdu): 328 if not self.is_open: 329 return 330 331 self._close_ppdu = ppdu 332 self._async_group.close() 333 334 async def _receive_loop(self): 335 try: 336 while True: 337 cosp_data = await self._conn.receive() 338 339 user_data = _decode('User-data', cosp_data) 340 pdv_list = user_data[1][0] 341 syntax_name = self._syntax_names.get_name( 342 pdv_list['presentation-context-identifier']) 343 data = pdv_list['presentation-data-values'][1] 344 345 await self._receive_queue.put((syntax_name, data)) 346 347 except ConnectionError: 348 pass 349 350 except Exception as e: 351 mlog.error("receive loop error: %s", e, exc_info=e) 352 353 finally: 354 self._close(_arp_ppdu()) 355 self._receive_queue.close() 356 357 async def _send_loop(self): 358 future = None 359 try: 360 while True: 361 data, future = await self._send_queue.get() 362 363 if data is None: 364 await self._conn.drain() 365 366 else: 367 ppdu_data = _encode('User-data', 368 _user_data(self._syntax_names, data)) 369 await self._conn.send(ppdu_data) 370 371 if future and not future.done(): 372 future.set_result(None) 373 374 except ConnectionError: 375 pass 376 377 except Exception as e: 378 mlog.error("send loop error: %s", e, exc_info=e) 379 380 finally: 381 self._close(_arp_ppdu()) 382 self._send_queue.close() 383 384 while True: 385 if future and not future.done(): 386 future.set_result(None) 387 if self._send_queue.empty(): 388 break 389 _, future = self._send_queue.get_nowait()
224 def __init__(self, 225 conn: cosp.Connection, 226 syntax_names: SyntaxNames, 227 cp_ppdu: asn1.Value, 228 cpa_ppdu: asn1.Value, 229 local_psel: int | None, 230 remote_psel: int | None, 231 receive_queue_size: int, 232 send_queue_size: int): 233 cp_user_data = cp_ppdu['normal-mode-parameters']['user-data'] 234 cpa_user_data = cpa_ppdu['normal-mode-parameters']['user-data'] 235 236 conn_req_user_data = ( 237 syntax_names.get_name( 238 cp_user_data[1][0]['presentation-context-identifier']), 239 cp_user_data[1][0]['presentation-data-values'][1]) 240 conn_res_user_data = ( 241 syntax_names.get_name( 242 cpa_user_data[1][0]['presentation-context-identifier']), 243 cpa_user_data[1][0]['presentation-data-values'][1]) 244 245 self._conn = conn 246 self._syntax_names = syntax_names 247 self._conn_req_user_data = conn_req_user_data 248 self._conn_res_user_data = conn_res_user_data 249 self._loop = asyncio.get_running_loop() 250 self._info = ConnectionInfo(local_psel=local_psel, 251 remote_psel=remote_psel, 252 **conn.info._asdict()) 253 self._close_ppdu = _arp_ppdu() 254 self._receive_queue = aio.Queue(receive_queue_size) 255 self._send_queue = aio.Queue(send_queue_size) 256 self._async_group = aio.Group() 257 258 self.async_group.spawn(aio.call_on_cancel, self._on_close) 259 self.async_group.spawn(self._receive_loop) 260 self.async_group.spawn(self._send_loop) 261 self.async_group.spawn(aio.call_on_done, conn.wait_closing(), 262 self.close)
264 @property 265 def async_group(self) -> aio.Group: 266 """Async group""" 267 return self._async_group
Async group
274 @property 275 def syntax_names(self) -> SyntaxNames: 276 """Syntax names""" 277 return self._syntax_names
Syntax names
279 @property 280 def conn_req_user_data(self) -> IdentifiedEntity: 281 """Connect request's user data""" 282 return self._conn_req_user_data
Connect request's user data
284 @property 285 def conn_res_user_data(self) -> IdentifiedEntity: 286 """Connect response's user data""" 287 return self._conn_res_user_data
Connect response's user data
289 def close(self, user_data: IdentifiedEntity | None = None): 290 """Close connection""" 291 self._close(_aru_ppdu(self._syntax_names, user_data))
Close connection
293 async def async_close(self, user_data: IdentifiedEntity | None = None): 294 """Async close""" 295 self.close(user_data) 296 await self.wait_closed()
Async close
298 async def receive(self) -> IdentifiedEntity: 299 """Receive data""" 300 try: 301 return await self._receive_queue.get() 302 303 except aio.QueueClosedError: 304 raise ConnectionError()
Receive data