hat.drivers.copp
Connection oriented presentation protocol
1"""Connection oriented presentation protocol""" 2 3import asyncio 4import importlib.resources 5import logging 6import typing 7 8from hat import aio 9from hat import asn1 10from hat import json 11 12from hat.drivers import cosp 13from hat.drivers import tcp 14 15 16mlog = logging.getLogger(__name__) 17 18with importlib.resources.open_text(__package__, 'asn1_repo.json') as _f: 19 _encoder = asn1.ber.BerEncoder( 20 asn1.repository_from_json( 21 json.decode_stream(_f))) 22 23 24class ConnectionInfo(typing.NamedTuple): 25 name: str | None 26 local_addr: tcp.Address 27 local_tsel: int | None 28 local_ssel: int | None 29 local_psel: int | None 30 remote_addr: tcp.Address 31 remote_tsel: int | None 32 remote_ssel: int | None 33 remote_psel: int | None 34 35 36IdentifiedEntity: typing.TypeAlias = tuple[asn1.ObjectIdentifier, asn1.Entity] 37"""Identified entity""" 38 39ValidateCb: typing.TypeAlias = aio.AsyncCallable[['SyntaxNames', 40 IdentifiedEntity], 41 IdentifiedEntity | None] 42"""Validate callback""" 43 44ConnectionCb: typing.TypeAlias = aio.AsyncCallable[['Connection'], None] 45"""Connection callback""" 46 47 48class SyntaxNames: 49 """Syntax name registry 50 51 Args: 52 syntax_names: list of ASN.1 ObjectIdentifiers representing syntax names 53 54 """ 55 56 def __init__(self, syntax_names: list[asn1.ObjectIdentifier]): 57 self._syntax_id_names = {(i * 2 + 1): name 58 for i, name in enumerate(syntax_names)} 59 self._syntax_name_ids = {v: k 60 for k, v in self._syntax_id_names.items()} 61 62 def get_name(self, syntax_id: int) -> asn1.ObjectIdentifier: 63 """Get syntax name associated with id""" 64 return self._syntax_id_names[syntax_id] 65 66 def get_id(self, syntax_name: asn1.ObjectIdentifier) -> int: 67 """Get syntax id associated with name""" 68 return self._syntax_name_ids[syntax_name] 69 70 71async def connect(addr: tcp.Address, 72 syntax_names: SyntaxNames, 73 user_data: IdentifiedEntity | None = None, 74 *, 75 local_psel: int | None = None, 76 remote_psel: int | None = None, 77 copp_receive_queue_size: int = 1024, 78 copp_send_queue_size: int = 1024, 79 **kwargs 80 ) -> 'Connection': 81 """Connect to COPP server 82 83 Additional arguments are passed directly to `hat.drivers.cosp.connect`. 84 85 """ 86 log = _create_connection_logger(kwargs.get('name'), None) 87 cp_ppdu = _cp_ppdu(syntax_names, local_psel, remote_psel, user_data) 88 cp_ppdu_data = _encode('CP-type', cp_ppdu) 89 conn = await cosp.connect(addr, cp_ppdu_data, **kwargs) 90 91 try: 92 cpa_ppdu = _decode('CPA-PPDU', conn.conn_res_user_data) 93 _validate_connect_response(cp_ppdu, cpa_ppdu) 94 95 calling_psel, called_psel = _get_psels(cp_ppdu) 96 return Connection(conn, syntax_names, cp_ppdu, cpa_ppdu, 97 calling_psel, called_psel, 98 copp_receive_queue_size, copp_send_queue_size) 99 100 except Exception: 101 await aio.uncancellable(_close_cosp(conn, _arp_ppdu(), log)) 102 raise 103 104 105async def listen(validate_cb: ValidateCb, 106 connection_cb: ConnectionCb, 107 addr: tcp.Address = tcp.Address('0.0.0.0', 102), 108 *, 109 bind_connections: bool = False, 110 copp_receive_queue_size: int = 1024, 111 copp_send_queue_size: int = 1024, 112 **kwargs 113 ) -> 'Server': 114 """Create COPP listening server 115 116 Additional arguments are passed directly to `hat.drivers.cosp.listen`. 117 118 Args: 119 validate_cb: callback function or coroutine called on new 120 incomming connection request prior to creating connection object 121 connection_cb: new connection callback 122 addr: local listening address 123 124 """ 125 server = Server() 126 server._validate_cb = validate_cb 127 server._connection_cb = connection_cb 128 server._bind_connections = bind_connections 129 server._receive_queue_size = copp_receive_queue_size 130 server._send_queue_size = copp_send_queue_size 131 server._log = _create_server_logger(kwargs.get('name'), None) 132 133 server._srv = await cosp.listen(server._on_validate, 134 server._on_connection, 135 addr, 136 bind_connections=False, 137 **kwargs) 138 139 server._log = _create_server_logger(kwargs.get('name'), server._srv.info) 140 141 return server 142 143 144class Server(aio.Resource): 145 """COPP listening server 146 147 For creating new server see `listen`. 148 149 """ 150 151 @property 152 def async_group(self) -> aio.Group: 153 """Async group""" 154 return self._srv.async_group 155 156 @property 157 def info(self) -> tcp.ServerInfo: 158 """Server info""" 159 return self._srv.info 160 161 async def _on_validate(self, user_data): 162 cp_ppdu = _decode('CP-type', user_data) 163 cp_params = cp_ppdu['normal-mode-parameters'] 164 called_psel_data = cp_params.get('called-presentation-selector') 165 called_psel = (int.from_bytes(called_psel_data, 'big') 166 if called_psel_data else None) 167 cp_pdv_list = cp_params['user-data'][1][0] 168 syntax_names = _sytax_names_from_cp_ppdu(cp_ppdu) 169 cp_user_data = ( 170 syntax_names.get_name( 171 cp_pdv_list['presentation-context-identifier']), 172 cp_pdv_list['presentation-data-values'][1]) 173 174 cpa_user_data = await aio.call(self._validate_cb, syntax_names, 175 cp_user_data) 176 177 cpa_ppdu = _cpa_ppdu(syntax_names, called_psel, cpa_user_data) 178 cpa_ppdu_data = _encode('CPA-PPDU', cpa_ppdu) 179 return cpa_ppdu_data 180 181 async def _on_connection(self, cosp_conn): 182 try: 183 try: 184 cp_ppdu = _decode('CP-type', cosp_conn.conn_req_user_data) 185 cpa_ppdu = _decode('CPA-PPDU', cosp_conn.conn_res_user_data) 186 187 syntax_names = _sytax_names_from_cp_ppdu(cp_ppdu) 188 calling_psel, called_psel = _get_psels(cp_ppdu) 189 190 conn = Connection(cosp_conn, syntax_names, cp_ppdu, cpa_ppdu, 191 called_psel, calling_psel, 192 self._receive_queue_size, 193 self._send_queue_size) 194 195 except Exception: 196 await aio.uncancellable( 197 _close_cosp(cosp_conn, _arp_ppdu(), self._log)) 198 raise 199 200 try: 201 await aio.call(self._connection_cb, conn) 202 203 except BaseException: 204 await aio.uncancellable(conn.async_close()) 205 raise 206 207 except Exception as e: 208 self._log.error("error creating new incomming connection: %s", 209 e, exc_info=e) 210 return 211 212 if not self._bind_connections: 213 return 214 215 try: 216 await conn.wait_closed() 217 218 except BaseException: 219 await aio.uncancellable(conn.async_close()) 220 raise 221 222 223class Connection(aio.Resource): 224 """COPP connection 225 226 For creating new connection see `connect` or `listen`. 227 228 """ 229 230 def __init__(self, 231 conn: cosp.Connection, 232 syntax_names: SyntaxNames, 233 cp_ppdu: asn1.Value, 234 cpa_ppdu: asn1.Value, 235 local_psel: int | None, 236 remote_psel: int | None, 237 receive_queue_size: int, 238 send_queue_size: int): 239 cp_user_data = cp_ppdu['normal-mode-parameters']['user-data'] 240 cpa_user_data = cpa_ppdu['normal-mode-parameters']['user-data'] 241 242 conn_req_user_data = ( 243 syntax_names.get_name( 244 cp_user_data[1][0]['presentation-context-identifier']), 245 cp_user_data[1][0]['presentation-data-values'][1]) 246 conn_res_user_data = ( 247 syntax_names.get_name( 248 cpa_user_data[1][0]['presentation-context-identifier']), 249 cpa_user_data[1][0]['presentation-data-values'][1]) 250 251 self._conn = conn 252 self._syntax_names = syntax_names 253 self._conn_req_user_data = conn_req_user_data 254 self._conn_res_user_data = conn_res_user_data 255 self._loop = asyncio.get_running_loop() 256 self._info = ConnectionInfo(local_psel=local_psel, 257 remote_psel=remote_psel, 258 **conn.info._asdict()) 259 self._close_ppdu = _arp_ppdu() 260 self._receive_queue = aio.Queue(receive_queue_size) 261 self._send_queue = aio.Queue(send_queue_size) 262 self._async_group = aio.Group() 263 self._log = _create_connection_logger(self._info.name, self._info) 264 265 self.async_group.spawn(aio.call_on_cancel, self._on_close) 266 self.async_group.spawn(self._receive_loop) 267 self.async_group.spawn(self._send_loop) 268 self.async_group.spawn(aio.call_on_done, conn.wait_closing(), 269 self.close) 270 271 @property 272 def async_group(self) -> aio.Group: 273 """Async group""" 274 return self._async_group 275 276 @property 277 def info(self) -> ConnectionInfo: 278 """Connection info""" 279 return self._info 280 281 @property 282 def syntax_names(self) -> SyntaxNames: 283 """Syntax names""" 284 return self._syntax_names 285 286 @property 287 def conn_req_user_data(self) -> IdentifiedEntity: 288 """Connect request's user data""" 289 return self._conn_req_user_data 290 291 @property 292 def conn_res_user_data(self) -> IdentifiedEntity: 293 """Connect response's user data""" 294 return self._conn_res_user_data 295 296 def close(self, user_data: IdentifiedEntity | None = None): 297 """Close connection""" 298 self._close(_aru_ppdu(self._syntax_names, user_data)) 299 300 async def async_close(self, user_data: IdentifiedEntity | None = None): 301 """Async close""" 302 self.close(user_data) 303 await self.wait_closed() 304 305 async def receive(self) -> IdentifiedEntity: 306 """Receive data""" 307 try: 308 return await self._receive_queue.get() 309 310 except aio.QueueClosedError: 311 raise ConnectionError() 312 313 async def send(self, data: IdentifiedEntity): 314 """Send data""" 315 try: 316 await self._send_queue.put((data, None)) 317 318 except aio.QueueClosedError: 319 raise ConnectionError() 320 321 async def drain(self): 322 """Drain output buffer""" 323 try: 324 future = self._loop.create_future() 325 await self._send_queue.put((None, future)) 326 await future 327 328 except aio.QueueClosedError: 329 raise ConnectionError() 330 331 async def _on_close(self): 332 await _close_cosp(self._conn, self._close_ppdu, self._log) 333 334 def _close(self, ppdu): 335 if not self.is_open: 336 return 337 338 self._close_ppdu = ppdu 339 self._async_group.close() 340 341 async def _receive_loop(self): 342 try: 343 while True: 344 cosp_data = await self._conn.receive() 345 346 user_data = _decode('User-data', cosp_data) 347 348 pdv_list = user_data[1][0] 349 syntax_name = self._syntax_names.get_name( 350 pdv_list['presentation-context-identifier']) 351 data = pdv_list['presentation-data-values'][1] 352 353 await self._receive_queue.put((syntax_name, data)) 354 355 except ConnectionError: 356 pass 357 358 except Exception as e: 359 self._log.error("receive loop error: %s", e, exc_info=e) 360 361 finally: 362 self._close(_arp_ppdu()) 363 self._receive_queue.close() 364 365 async def _send_loop(self): 366 future = None 367 try: 368 while True: 369 data, future = await self._send_queue.get() 370 371 if data is None: 372 await self._conn.drain() 373 374 else: 375 user_data = _user_data(self._syntax_names, data) 376 ppdu_data = _encode('User-data', user_data) 377 378 await self._conn.send(ppdu_data) 379 380 if future and not future.done(): 381 future.set_result(None) 382 383 except ConnectionError: 384 pass 385 386 except Exception as e: 387 self._log.error("send loop error: %s", e, exc_info=e) 388 389 finally: 390 self._close(_arp_ppdu()) 391 self._send_queue.close() 392 393 while True: 394 if future and not future.done(): 395 future.set_result(None) 396 if self._send_queue.empty(): 397 break 398 _, future = self._send_queue.get_nowait() 399 400 401async def _close_cosp(cosp_conn, ppdu, log): 402 try: 403 data = _encode('Abort-type', ppdu) 404 405 except Exception as e: 406 log.error("error encoding abort ppdu: %s", e, exc_info=e) 407 data = None 408 409 finally: 410 await cosp_conn.async_close(data) 411 412 413def _get_psels(cp_ppdu): 414 cp_params = cp_ppdu['normal-mode-parameters'] 415 calling_psel_data = cp_params.get('calling-presentation-selector') 416 calling_psel = (int.from_bytes(calling_psel_data, 'big') 417 if calling_psel_data else None) 418 called_psel_data = cp_params.get('called-presentation-selector') 419 called_psel = (int.from_bytes(called_psel_data, 'big') 420 if called_psel_data else None) 421 return calling_psel, called_psel 422 423 424def _validate_connect_response(cp_ppdu, cpa_ppdu): 425 cp_params = cp_ppdu['normal-mode-parameters'] 426 cpa_params = cpa_ppdu['normal-mode-parameters'] 427 called_psel_data = cp_params.get('called-presentation-selector') 428 responding_psel_data = cpa_params.get('responding-presentation-selector') 429 430 if called_psel_data and responding_psel_data: 431 called_psel = int.from_bytes(called_psel_data, 'big') 432 responding_psel = int.from_bytes(responding_psel_data, 'big') 433 434 if called_psel != responding_psel: 435 raise Exception('presentation selectors not matching') 436 437 result_list = cpa_params['presentation-context-definition-result-list'] 438 if any(i['result'] != 0 for i in result_list): 439 raise Exception('presentation context not accepted') 440 441 442def _cp_ppdu(syntax_names, calling_psel, called_psel, user_data): 443 cp_params = { 444 'presentation-context-definition-list': [ 445 {'presentation-context-identifier': i, 446 'abstract-syntax-name': name, 447 'transfer-syntax-name-list': [_encoder.syntax_name]} 448 for i, name in syntax_names._syntax_id_names.items()]} 449 450 if calling_psel is not None: 451 cp_params['calling-presentation-selector'] = \ 452 calling_psel.to_bytes(4, 'big') 453 454 if called_psel is not None: 455 cp_params['called-presentation-selector'] = \ 456 called_psel.to_bytes(4, 'big') 457 458 if user_data: 459 cp_params['user-data'] = _user_data(syntax_names, user_data) 460 461 return { 462 'mode-selector': { 463 'mode-value': 1}, 464 'normal-mode-parameters': cp_params} 465 466 467def _cpa_ppdu(syntax_names, responding_psel, user_data): 468 cpa_params = { 469 'presentation-context-definition-result-list': [ 470 {'result': 0, 471 'transfer-syntax-name': _encoder.syntax_name} 472 for _ in syntax_names._syntax_id_names.keys()]} 473 474 if responding_psel is not None: 475 cpa_params['responding-presentation-selector'] = \ 476 responding_psel.to_bytes(4, 'big') 477 478 if user_data: 479 cpa_params['user-data'] = _user_data(syntax_names, user_data) 480 481 return { 482 'mode-selector': { 483 'mode-value': 1}, 484 'normal-mode-parameters': cpa_params} 485 486 487def _aru_ppdu(syntax_names, user_data): 488 aru_params = {} 489 490 if user_data: 491 aru_params['user-data'] = _user_data(syntax_names, user_data) 492 493 return 'aru-ppdu', ('normal-mode-parameters', aru_params) 494 495 496def _arp_ppdu(): 497 return 'arp-ppdu', {} 498 499 500def _user_data(syntax_names, user_data): 501 return 'fully-encoded-data', [{ 502 'presentation-context-identifier': syntax_names.get_id(user_data[0]), 503 'presentation-data-values': ( 504 'single-ASN1-type', user_data[1])}] 505 506 507def _sytax_names_from_cp_ppdu(cp_ppdu): 508 cp_params = cp_ppdu['normal-mode-parameters'] 509 syntax_names = SyntaxNames([]) 510 syntax_names._syntax_id_names = { 511 i['presentation-context-identifier']: i['abstract-syntax-name'] 512 for i in cp_params['presentation-context-definition-list']} 513 syntax_names._syntax_name_ids = { 514 v: k for k, v in syntax_names._syntax_id_names.items()} 515 return syntax_names 516 517 518def _encode(name, value): 519 return _encoder.encode(asn1.TypeRef('ISO8823-PRESENTATION', name), value) 520 521 522def _decode(name, data): 523 res, _ = _encoder.decode(asn1.TypeRef('ISO8823-PRESENTATION', name), 524 memoryview(data)) 525 return res 526 527 528def _create_server_logger(name, info): 529 extra = {'meta': {'type': 'CoppServer', 530 'name': name}} 531 532 if info is not None: 533 extra['meta']['addresses'] = [{'host': addr.host, 534 'port': addr.port} 535 for addr in info.addresses] 536 537 return logging.LoggerAdapter(mlog, extra) 538 539 540def _create_connection_logger(name, info): 541 extra = {'meta': {'type': 'CoppConnection', 542 'name': name}} 543 544 if info is not None: 545 extra['meta']['local_addr'] = {'host': info.local_addr.host, 546 'port': info.local_addr.port} 547 extra['meta']['remote_addr'] = {'host': info.remote_addr.host, 548 'port': info.remote_addr.port} 549 550 return logging.LoggerAdapter(mlog, extra)
mlog =
<Logger hat.drivers.copp (WARNING)>
class
ConnectionInfo(typing.NamedTuple):
25class ConnectionInfo(typing.NamedTuple): 26 name: str | None 27 local_addr: tcp.Address 28 local_tsel: int | None 29 local_ssel: int | None 30 local_psel: int | None 31 remote_addr: tcp.Address 32 remote_tsel: int | None 33 remote_ssel: int | None 34 remote_psel: int | None
ConnectionInfo(name, local_addr, local_tsel, local_ssel, local_psel, remote_addr, remote_tsel, remote_ssel, remote_psel)
ConnectionInfo( name: str | None, local_addr: hat.drivers.tcp.Address, local_tsel: int | None, local_ssel: int | None, local_psel: int | None, remote_addr: hat.drivers.tcp.Address, remote_tsel: int | None, remote_ssel: int | None, remote_psel: int | None)
Create new instance of ConnectionInfo(name, local_addr, local_tsel, local_ssel, local_psel, remote_addr, remote_tsel, remote_ssel, remote_psel)
IdentifiedEntity: TypeAlias =
tuple[tuple[int, ...], hat.asn1.common.Entity]
Identified entity
ValidateCb: TypeAlias =
Callable[[ForwardRef('SyntaxNames'), tuple[tuple[int, ...], hat.asn1.common.Entity]], tuple[tuple[int, ...], hat.asn1.common.Entity] | None | Awaitable[tuple[tuple[int, ...], hat.asn1.common.Entity] | None]]
Validate callback
ConnectionCb: TypeAlias =
Callable[[ForwardRef('Connection')], None | Awaitable[None]]
Connection callback
class
SyntaxNames:
49class SyntaxNames: 50 """Syntax name registry 51 52 Args: 53 syntax_names: list of ASN.1 ObjectIdentifiers representing syntax names 54 55 """ 56 57 def __init__(self, syntax_names: list[asn1.ObjectIdentifier]): 58 self._syntax_id_names = {(i * 2 + 1): name 59 for i, name in enumerate(syntax_names)} 60 self._syntax_name_ids = {v: k 61 for k, v in self._syntax_id_names.items()} 62 63 def get_name(self, syntax_id: int) -> asn1.ObjectIdentifier: 64 """Get syntax name associated with id""" 65 return self._syntax_id_names[syntax_id] 66 67 def get_id(self, syntax_name: asn1.ObjectIdentifier) -> int: 68 """Get syntax id associated with name""" 69 return self._syntax_name_ids[syntax_name]
Syntax name registry
Arguments:
- syntax_names: list of ASN.1 ObjectIdentifiers representing syntax names
async def
connect( addr: hat.drivers.tcp.Address, syntax_names: SyntaxNames, user_data: tuple[tuple[int, ...], hat.asn1.common.Entity] | None = None, *, local_psel: int | None = None, remote_psel: int | None = None, copp_receive_queue_size: int = 1024, copp_send_queue_size: int = 1024, **kwargs) -> Connection:
72async def connect(addr: tcp.Address, 73 syntax_names: SyntaxNames, 74 user_data: IdentifiedEntity | None = None, 75 *, 76 local_psel: int | None = None, 77 remote_psel: int | None = None, 78 copp_receive_queue_size: int = 1024, 79 copp_send_queue_size: int = 1024, 80 **kwargs 81 ) -> 'Connection': 82 """Connect to COPP server 83 84 Additional arguments are passed directly to `hat.drivers.cosp.connect`. 85 86 """ 87 log = _create_connection_logger(kwargs.get('name'), None) 88 cp_ppdu = _cp_ppdu(syntax_names, local_psel, remote_psel, user_data) 89 cp_ppdu_data = _encode('CP-type', cp_ppdu) 90 conn = await cosp.connect(addr, cp_ppdu_data, **kwargs) 91 92 try: 93 cpa_ppdu = _decode('CPA-PPDU', conn.conn_res_user_data) 94 _validate_connect_response(cp_ppdu, cpa_ppdu) 95 96 calling_psel, called_psel = _get_psels(cp_ppdu) 97 return Connection(conn, syntax_names, cp_ppdu, cpa_ppdu, 98 calling_psel, called_psel, 99 copp_receive_queue_size, copp_send_queue_size) 100 101 except Exception: 102 await aio.uncancellable(_close_cosp(conn, _arp_ppdu(), log)) 103 raise
Connect to COPP server
Additional arguments are passed directly to hat.drivers.cosp.connect.
async def
listen( validate_cb: Callable[[SyntaxNames, tuple[tuple[int, ...], hat.asn1.common.Entity]], tuple[tuple[int, ...], hat.asn1.common.Entity] | None | Awaitable[tuple[tuple[int, ...], hat.asn1.common.Entity] | None]], connection_cb: Callable[[Connection], None | Awaitable[None]], addr: hat.drivers.tcp.Address = Address(host='0.0.0.0', port=102), *, bind_connections: bool = False, copp_receive_queue_size: int = 1024, copp_send_queue_size: int = 1024, **kwargs) -> Server:
106async def listen(validate_cb: ValidateCb, 107 connection_cb: ConnectionCb, 108 addr: tcp.Address = tcp.Address('0.0.0.0', 102), 109 *, 110 bind_connections: bool = False, 111 copp_receive_queue_size: int = 1024, 112 copp_send_queue_size: int = 1024, 113 **kwargs 114 ) -> 'Server': 115 """Create COPP listening server 116 117 Additional arguments are passed directly to `hat.drivers.cosp.listen`. 118 119 Args: 120 validate_cb: callback function or coroutine called on new 121 incomming connection request prior to creating connection object 122 connection_cb: new connection callback 123 addr: local listening address 124 125 """ 126 server = Server() 127 server._validate_cb = validate_cb 128 server._connection_cb = connection_cb 129 server._bind_connections = bind_connections 130 server._receive_queue_size = copp_receive_queue_size 131 server._send_queue_size = copp_send_queue_size 132 server._log = _create_server_logger(kwargs.get('name'), None) 133 134 server._srv = await cosp.listen(server._on_validate, 135 server._on_connection, 136 addr, 137 bind_connections=False, 138 **kwargs) 139 140 server._log = _create_server_logger(kwargs.get('name'), server._srv.info) 141 142 return server
Create COPP listening server
Additional arguments are passed directly to hat.drivers.cosp.listen.
Arguments:
- validate_cb: callback function or coroutine called on new incomming connection request prior to creating connection object
- connection_cb: new connection callback
- addr: local listening address
class
Server(hat.aio.group.Resource):
145class Server(aio.Resource): 146 """COPP listening server 147 148 For creating new server see `listen`. 149 150 """ 151 152 @property 153 def async_group(self) -> aio.Group: 154 """Async group""" 155 return self._srv.async_group 156 157 @property 158 def info(self) -> tcp.ServerInfo: 159 """Server info""" 160 return self._srv.info 161 162 async def _on_validate(self, user_data): 163 cp_ppdu = _decode('CP-type', user_data) 164 cp_params = cp_ppdu['normal-mode-parameters'] 165 called_psel_data = cp_params.get('called-presentation-selector') 166 called_psel = (int.from_bytes(called_psel_data, 'big') 167 if called_psel_data else None) 168 cp_pdv_list = cp_params['user-data'][1][0] 169 syntax_names = _sytax_names_from_cp_ppdu(cp_ppdu) 170 cp_user_data = ( 171 syntax_names.get_name( 172 cp_pdv_list['presentation-context-identifier']), 173 cp_pdv_list['presentation-data-values'][1]) 174 175 cpa_user_data = await aio.call(self._validate_cb, syntax_names, 176 cp_user_data) 177 178 cpa_ppdu = _cpa_ppdu(syntax_names, called_psel, cpa_user_data) 179 cpa_ppdu_data = _encode('CPA-PPDU', cpa_ppdu) 180 return cpa_ppdu_data 181 182 async def _on_connection(self, cosp_conn): 183 try: 184 try: 185 cp_ppdu = _decode('CP-type', cosp_conn.conn_req_user_data) 186 cpa_ppdu = _decode('CPA-PPDU', cosp_conn.conn_res_user_data) 187 188 syntax_names = _sytax_names_from_cp_ppdu(cp_ppdu) 189 calling_psel, called_psel = _get_psels(cp_ppdu) 190 191 conn = Connection(cosp_conn, syntax_names, cp_ppdu, cpa_ppdu, 192 called_psel, calling_psel, 193 self._receive_queue_size, 194 self._send_queue_size) 195 196 except Exception: 197 await aio.uncancellable( 198 _close_cosp(cosp_conn, _arp_ppdu(), self._log)) 199 raise 200 201 try: 202 await aio.call(self._connection_cb, conn) 203 204 except BaseException: 205 await aio.uncancellable(conn.async_close()) 206 raise 207 208 except Exception as e: 209 self._log.error("error creating new incomming connection: %s", 210 e, exc_info=e) 211 return 212 213 if not self._bind_connections: 214 return 215 216 try: 217 await conn.wait_closed() 218 219 except BaseException: 220 await aio.uncancellable(conn.async_close()) 221 raise
COPP listening server
For creating new server see listen.
async_group: hat.aio.group.Group
152 @property 153 def async_group(self) -> aio.Group: 154 """Async group""" 155 return self._srv.async_group
Async group
class
Connection(hat.aio.group.Resource):
224class Connection(aio.Resource): 225 """COPP connection 226 227 For creating new connection see `connect` or `listen`. 228 229 """ 230 231 def __init__(self, 232 conn: cosp.Connection, 233 syntax_names: SyntaxNames, 234 cp_ppdu: asn1.Value, 235 cpa_ppdu: asn1.Value, 236 local_psel: int | None, 237 remote_psel: int | None, 238 receive_queue_size: int, 239 send_queue_size: int): 240 cp_user_data = cp_ppdu['normal-mode-parameters']['user-data'] 241 cpa_user_data = cpa_ppdu['normal-mode-parameters']['user-data'] 242 243 conn_req_user_data = ( 244 syntax_names.get_name( 245 cp_user_data[1][0]['presentation-context-identifier']), 246 cp_user_data[1][0]['presentation-data-values'][1]) 247 conn_res_user_data = ( 248 syntax_names.get_name( 249 cpa_user_data[1][0]['presentation-context-identifier']), 250 cpa_user_data[1][0]['presentation-data-values'][1]) 251 252 self._conn = conn 253 self._syntax_names = syntax_names 254 self._conn_req_user_data = conn_req_user_data 255 self._conn_res_user_data = conn_res_user_data 256 self._loop = asyncio.get_running_loop() 257 self._info = ConnectionInfo(local_psel=local_psel, 258 remote_psel=remote_psel, 259 **conn.info._asdict()) 260 self._close_ppdu = _arp_ppdu() 261 self._receive_queue = aio.Queue(receive_queue_size) 262 self._send_queue = aio.Queue(send_queue_size) 263 self._async_group = aio.Group() 264 self._log = _create_connection_logger(self._info.name, self._info) 265 266 self.async_group.spawn(aio.call_on_cancel, self._on_close) 267 self.async_group.spawn(self._receive_loop) 268 self.async_group.spawn(self._send_loop) 269 self.async_group.spawn(aio.call_on_done, conn.wait_closing(), 270 self.close) 271 272 @property 273 def async_group(self) -> aio.Group: 274 """Async group""" 275 return self._async_group 276 277 @property 278 def info(self) -> ConnectionInfo: 279 """Connection info""" 280 return self._info 281 282 @property 283 def syntax_names(self) -> SyntaxNames: 284 """Syntax names""" 285 return self._syntax_names 286 287 @property 288 def conn_req_user_data(self) -> IdentifiedEntity: 289 """Connect request's user data""" 290 return self._conn_req_user_data 291 292 @property 293 def conn_res_user_data(self) -> IdentifiedEntity: 294 """Connect response's user data""" 295 return self._conn_res_user_data 296 297 def close(self, user_data: IdentifiedEntity | None = None): 298 """Close connection""" 299 self._close(_aru_ppdu(self._syntax_names, user_data)) 300 301 async def async_close(self, user_data: IdentifiedEntity | None = None): 302 """Async close""" 303 self.close(user_data) 304 await self.wait_closed() 305 306 async def receive(self) -> IdentifiedEntity: 307 """Receive data""" 308 try: 309 return await self._receive_queue.get() 310 311 except aio.QueueClosedError: 312 raise ConnectionError() 313 314 async def send(self, data: IdentifiedEntity): 315 """Send data""" 316 try: 317 await self._send_queue.put((data, None)) 318 319 except aio.QueueClosedError: 320 raise ConnectionError() 321 322 async def drain(self): 323 """Drain output buffer""" 324 try: 325 future = self._loop.create_future() 326 await self._send_queue.put((None, future)) 327 await future 328 329 except aio.QueueClosedError: 330 raise ConnectionError() 331 332 async def _on_close(self): 333 await _close_cosp(self._conn, self._close_ppdu, self._log) 334 335 def _close(self, ppdu): 336 if not self.is_open: 337 return 338 339 self._close_ppdu = ppdu 340 self._async_group.close() 341 342 async def _receive_loop(self): 343 try: 344 while True: 345 cosp_data = await self._conn.receive() 346 347 user_data = _decode('User-data', cosp_data) 348 349 pdv_list = user_data[1][0] 350 syntax_name = self._syntax_names.get_name( 351 pdv_list['presentation-context-identifier']) 352 data = pdv_list['presentation-data-values'][1] 353 354 await self._receive_queue.put((syntax_name, data)) 355 356 except ConnectionError: 357 pass 358 359 except Exception as e: 360 self._log.error("receive loop error: %s", e, exc_info=e) 361 362 finally: 363 self._close(_arp_ppdu()) 364 self._receive_queue.close() 365 366 async def _send_loop(self): 367 future = None 368 try: 369 while True: 370 data, future = await self._send_queue.get() 371 372 if data is None: 373 await self._conn.drain() 374 375 else: 376 user_data = _user_data(self._syntax_names, data) 377 ppdu_data = _encode('User-data', user_data) 378 379 await self._conn.send(ppdu_data) 380 381 if future and not future.done(): 382 future.set_result(None) 383 384 except ConnectionError: 385 pass 386 387 except Exception as e: 388 self._log.error("send loop error: %s", e, exc_info=e) 389 390 finally: 391 self._close(_arp_ppdu()) 392 self._send_queue.close() 393 394 while True: 395 if future and not future.done(): 396 future.set_result(None) 397 if self._send_queue.empty(): 398 break 399 _, future = self._send_queue.get_nowait()
Connection( conn: Connection, syntax_names: SyntaxNames, cp_ppdu: Union[bool, int, Collection[bool], bytes, bytearray, memoryview, NoneType, tuple[int, ...], str, float, Tuple[str, ForwardRef('Value')], Dict[str, ForwardRef('Value')], Collection['Value'], hat.asn1.common.Entity, hat.asn1.common.External, hat.asn1.common.EmbeddedPDV], cpa_ppdu: Union[bool, int, Collection[bool], bytes, bytearray, memoryview, NoneType, tuple[int, ...], str, float, Tuple[str, ForwardRef('Value')], Dict[str, ForwardRef('Value')], Collection['Value'], hat.asn1.common.Entity, hat.asn1.common.External, hat.asn1.common.EmbeddedPDV], local_psel: int | None, remote_psel: int | None, receive_queue_size: int, send_queue_size: int)
231 def __init__(self, 232 conn: cosp.Connection, 233 syntax_names: SyntaxNames, 234 cp_ppdu: asn1.Value, 235 cpa_ppdu: asn1.Value, 236 local_psel: int | None, 237 remote_psel: int | None, 238 receive_queue_size: int, 239 send_queue_size: int): 240 cp_user_data = cp_ppdu['normal-mode-parameters']['user-data'] 241 cpa_user_data = cpa_ppdu['normal-mode-parameters']['user-data'] 242 243 conn_req_user_data = ( 244 syntax_names.get_name( 245 cp_user_data[1][0]['presentation-context-identifier']), 246 cp_user_data[1][0]['presentation-data-values'][1]) 247 conn_res_user_data = ( 248 syntax_names.get_name( 249 cpa_user_data[1][0]['presentation-context-identifier']), 250 cpa_user_data[1][0]['presentation-data-values'][1]) 251 252 self._conn = conn 253 self._syntax_names = syntax_names 254 self._conn_req_user_data = conn_req_user_data 255 self._conn_res_user_data = conn_res_user_data 256 self._loop = asyncio.get_running_loop() 257 self._info = ConnectionInfo(local_psel=local_psel, 258 remote_psel=remote_psel, 259 **conn.info._asdict()) 260 self._close_ppdu = _arp_ppdu() 261 self._receive_queue = aio.Queue(receive_queue_size) 262 self._send_queue = aio.Queue(send_queue_size) 263 self._async_group = aio.Group() 264 self._log = _create_connection_logger(self._info.name, self._info) 265 266 self.async_group.spawn(aio.call_on_cancel, self._on_close) 267 self.async_group.spawn(self._receive_loop) 268 self.async_group.spawn(self._send_loop) 269 self.async_group.spawn(aio.call_on_done, conn.wait_closing(), 270 self.close)
async_group: hat.aio.group.Group
272 @property 273 def async_group(self) -> aio.Group: 274 """Async group""" 275 return self._async_group
Async group
syntax_names: SyntaxNames
282 @property 283 def syntax_names(self) -> SyntaxNames: 284 """Syntax names""" 285 return self._syntax_names
Syntax names
conn_req_user_data: tuple[tuple[int, ...], hat.asn1.common.Entity]
287 @property 288 def conn_req_user_data(self) -> IdentifiedEntity: 289 """Connect request's user data""" 290 return self._conn_req_user_data
Connect request's user data
conn_res_user_data: tuple[tuple[int, ...], hat.asn1.common.Entity]
292 @property 293 def conn_res_user_data(self) -> IdentifiedEntity: 294 """Connect response's user data""" 295 return self._conn_res_user_data
Connect response's user data
def
close( self, user_data: tuple[tuple[int, ...], hat.asn1.common.Entity] | None = None):
297 def close(self, user_data: IdentifiedEntity | None = None): 298 """Close connection""" 299 self._close(_aru_ppdu(self._syntax_names, user_data))
Close connection
async def
async_close( self, user_data: tuple[tuple[int, ...], hat.asn1.common.Entity] | None = None):
301 async def async_close(self, user_data: IdentifiedEntity | None = None): 302 """Async close""" 303 self.close(user_data) 304 await self.wait_closed()
Async close
async def
receive(self) -> tuple[tuple[int, ...], hat.asn1.common.Entity]:
306 async def receive(self) -> IdentifiedEntity: 307 """Receive data""" 308 try: 309 return await self._receive_queue.get() 310 311 except aio.QueueClosedError: 312 raise ConnectionError()
Receive data