hat.drivers.acse
Association Controll Service Element
1"""Association Controll Service Element""" 2 3import asyncio 4import importlib.resources 5import logging 6import typing 7 8from hat import aio 9from hat import asn1 10from hat import json 11 12from hat.drivers import copp 13from hat.drivers import tcp 14 15 16mlog = logging.getLogger(__name__) 17 18# (joint-iso-itu-t, association-control, abstract-syntax, apdus, version1) 19_acse_syntax_name = (2, 2, 1, 0, 1) 20 21with importlib.resources.open_text(__package__, 'asn1_repo.json') as _f: 22 _encoder = asn1.ber.BerEncoder( 23 asn1.repository_from_json( 24 json.decode_stream(_f))) 25 26 27class ConnectionInfo(typing.NamedTuple): 28 name: str | None 29 local_addr: tcp.Address 30 local_tsel: int | None 31 local_ssel: int | None 32 local_psel: int | None 33 local_ap_title: asn1.ObjectIdentifier | None 34 local_ae_qualifier: int | None 35 remote_addr: tcp.Address 36 remote_tsel: int | None 37 remote_ssel: int | None 38 remote_psel: int | None 39 remote_ap_title: asn1.ObjectIdentifier | None 40 remote_ae_qualifier: int | None 41 42 43ValidateCb: typing.TypeAlias = aio.AsyncCallable[[copp.SyntaxNames, 44 copp.IdentifiedEntity], 45 copp.IdentifiedEntity | None] 46"""Validate callback""" 47 48ConnectionCb: typing.TypeAlias = aio.AsyncCallable[['Connection'], None] 49"""Connection callback""" 50 51 52async def connect(addr: tcp.Address, 53 syntax_name_list: list[asn1.ObjectIdentifier], 54 app_context_name: asn1.ObjectIdentifier, 55 user_data: copp.IdentifiedEntity | None = None, 56 *, 57 local_ap_title: asn1.ObjectIdentifier | None = None, 58 remote_ap_title: asn1.ObjectIdentifier | None = None, 59 local_ae_qualifier: int | None = None, 60 remote_ae_qualifier: int | None = None, 61 acse_receive_queue_size: int = 1024, 62 acse_send_queue_size: int = 1024, 63 **kwargs 64 ) -> 'Connection': 65 """Connect to ACSE server 66 67 Additional arguments are passed directly to `hat.drivers.copp.connect` 68 (`syntax_names` is set by this coroutine). 69 70 """ 71 syntax_names = copp.SyntaxNames([_acse_syntax_name, *syntax_name_list]) 72 aarq_apdu = _aarq_apdu(syntax_names, app_context_name, 73 local_ap_title, remote_ap_title, 74 local_ae_qualifier, remote_ae_qualifier, 75 user_data) 76 copp_user_data = _acse_syntax_name, _encode(aarq_apdu) 77 conn = await copp.connect(addr, syntax_names, copp_user_data, **kwargs) 78 79 try: 80 aare_apdu_syntax_name, aare_apdu_entity = conn.conn_res_user_data 81 if aare_apdu_syntax_name != _acse_syntax_name: 82 raise Exception("invalid syntax name") 83 84 aare_apdu = _decode(aare_apdu_entity) 85 if aare_apdu[0] != 'aare' or aare_apdu[1]['result'] != 0: 86 raise Exception("invalid apdu") 87 88 calling_ap_title, called_ap_title = _get_ap_titles(aarq_apdu) 89 calling_ae_qualifier, called_ae_qualifier = _get_ae_qualifiers( 90 aarq_apdu) 91 return Connection(conn, aarq_apdu, aare_apdu, 92 calling_ap_title, called_ap_title, 93 calling_ae_qualifier, called_ae_qualifier, 94 acse_receive_queue_size, acse_send_queue_size) 95 96 except Exception: 97 await aio.uncancellable(_close_copp(conn, _abrt_apdu(1))) 98 raise 99 100 101async def listen(validate_cb: ValidateCb, 102 connection_cb: ConnectionCb, 103 addr: tcp.Address = tcp.Address('0.0.0.0', 102), 104 *, 105 bind_connections: bool = False, 106 acse_receive_queue_size: int = 1024, 107 acse_send_queue_size: int = 1024, 108 **kwargs 109 ) -> 'Server': 110 """Create ACSE listening server 111 112 Additional arguments are passed directly to `hat.drivers.copp.listen`. 113 114 Args: 115 validate_cb: callback function or coroutine called on new 116 incomming connection request prior to creating connection object 117 connection_cb: new connection callback 118 addr: local listening address 119 120 """ 121 server = Server() 122 server._validate_cb = validate_cb 123 server._connection_cb = connection_cb 124 server._bind_connections = bind_connections 125 server._receive_queue_size = acse_receive_queue_size 126 server._send_queue_size = acse_send_queue_size 127 server._log = _create_server_logger(kwargs.get('name'), None) 128 129 server._srv = await copp.listen(server._on_validate, 130 server._on_connection, 131 addr, 132 bind_connections=False, 133 **kwargs) 134 135 server._log = _create_server_logger(kwargs.get('name'), server._srv.info) 136 137 return server 138 139 140class Server(aio.Resource): 141 """ACSE listening server 142 143 For creating new server see `listen`. 144 145 """ 146 147 @property 148 def async_group(self) -> aio.Group: 149 """Async group""" 150 return self._srv.async_group 151 152 @property 153 def info(self) -> tcp.ServerInfo: 154 """Server info""" 155 return self._srv.info 156 157 async def _on_validate(self, syntax_names, user_data): 158 aarq_apdu_syntax_name, aarq_apdu_entity = user_data 159 if aarq_apdu_syntax_name != _acse_syntax_name: 160 raise Exception('invalid acse syntax name') 161 162 aarq_apdu = _decode(aarq_apdu_entity) 163 if aarq_apdu[0] != 'aarq': 164 raise Exception('not aarq message') 165 166 aarq_external = aarq_apdu[1]['user-information'][0] 167 if aarq_external.direct_ref is not None: 168 if aarq_external.direct_ref != _encoder.syntax_name: 169 raise Exception('invalid encoder identifier') 170 171 _, called_ap_title = _get_ap_titles(aarq_apdu) 172 _, called_ae_qualifier = _get_ae_qualifiers(aarq_apdu) 173 _, called_ap_invocation_identifier = \ 174 _get_ap_invocation_identifiers(aarq_apdu) 175 _, called_ae_invocation_identifier = \ 176 _get_ae_invocation_identifiers(aarq_apdu) 177 178 aarq_user_data = (syntax_names.get_name(aarq_external.indirect_ref), 179 aarq_external.data) 180 181 user_validate_result = await aio.call(self._validate_cb, syntax_names, 182 aarq_user_data) 183 184 aare_apdu = _aare_apdu(syntax_names, 185 user_validate_result, 186 called_ap_title, called_ae_qualifier, 187 called_ap_invocation_identifier, 188 called_ae_invocation_identifier) 189 return _acse_syntax_name, _encode(aare_apdu) 190 191 async def _on_connection(self, copp_conn): 192 try: 193 try: 194 aarq_apdu = _decode(copp_conn.conn_req_user_data[1]) 195 aare_apdu = _decode(copp_conn.conn_res_user_data[1]) 196 197 calling_ap_title, called_ap_title = _get_ap_titles(aarq_apdu) 198 calling_ae_qualifier, called_ae_qualifier = _get_ae_qualifiers( 199 aarq_apdu) 200 201 conn = Connection(copp_conn, aarq_apdu, aare_apdu, 202 called_ap_title, calling_ap_title, 203 called_ae_qualifier, calling_ae_qualifier, 204 self._receive_queue_size, 205 self._send_queue_size) 206 207 except Exception: 208 await aio.uncancellable(_close_copp(copp_conn, _abrt_apdu(1))) 209 raise 210 211 try: 212 await aio.call(self._connection_cb, conn) 213 214 except BaseException: 215 await aio.uncancellable(conn.async_close()) 216 raise 217 218 except Exception as e: 219 self._log.error("error creating new incomming connection: %s", 220 e, exc_info=e) 221 return 222 223 if not self._bind_connections: 224 return 225 226 try: 227 await conn.wait_closed() 228 229 except BaseException: 230 await aio.uncancellable(conn.async_close()) 231 raise 232 233 234class Connection(aio.Resource): 235 """ACSE connection 236 237 For creating new connection see `connect` or `listen`. 238 239 """ 240 241 def __init__(self, 242 conn: copp.Connection, 243 aarq_apdu: asn1.Value, 244 aare_apdu: asn1.Value, 245 local_ap_title: asn1.ObjectIdentifier | None, 246 remote_ap_title: asn1.ObjectIdentifier | None, 247 local_ae_qualifier: int | None, 248 remote_ae_qualifier: int | None, 249 receive_queue_size: int, 250 send_queue_size: int): 251 aarq_external = aarq_apdu[1]['user-information'][0] 252 aare_external = aare_apdu[1]['user-information'][0] 253 254 conn_req_user_data = ( 255 conn.syntax_names.get_name(aarq_external.indirect_ref), 256 aarq_external.data) 257 conn_res_user_data = ( 258 conn.syntax_names.get_name(aare_external.indirect_ref), 259 aare_external.data) 260 261 self._conn = conn 262 self._conn_req_user_data = conn_req_user_data 263 self._conn_res_user_data = conn_res_user_data 264 self._loop = asyncio.get_running_loop() 265 self._info = ConnectionInfo(local_ap_title=local_ap_title, 266 local_ae_qualifier=local_ae_qualifier, 267 remote_ap_title=remote_ap_title, 268 remote_ae_qualifier=remote_ae_qualifier, 269 **conn.info._asdict()) 270 self._close_apdu = _abrt_apdu(0) 271 self._receive_queue = aio.Queue(receive_queue_size) 272 self._send_queue = aio.Queue(send_queue_size) 273 self._async_group = aio.Group() 274 self._log = _create_connection_logger(self._info) 275 276 self.async_group.spawn(aio.call_on_cancel, self._on_close) 277 self.async_group.spawn(self._receive_loop) 278 self.async_group.spawn(self._send_loop) 279 self.async_group.spawn(aio.call_on_done, conn.wait_closing(), 280 self.close) 281 282 @property 283 def async_group(self) -> aio.Group: 284 """Async group""" 285 return self._async_group 286 287 @property 288 def info(self) -> ConnectionInfo: 289 """Connection info""" 290 return self._info 291 292 @property 293 def conn_req_user_data(self) -> copp.IdentifiedEntity: 294 """Connect request's user data""" 295 return self._conn_req_user_data 296 297 @property 298 def conn_res_user_data(self) -> copp.IdentifiedEntity: 299 """Connect response's user data""" 300 return self._conn_res_user_data 301 302 async def receive(self) -> copp.IdentifiedEntity: 303 """Receive data""" 304 try: 305 return await self._receive_queue.get() 306 307 except aio.QueueClosedError: 308 raise ConnectionError() 309 310 async def send(self, data: copp.IdentifiedEntity): 311 """Send data""" 312 try: 313 await self._send_queue.put((data, None)) 314 315 except aio.QueueClosedError: 316 raise ConnectionError() 317 318 async def drain(self): 319 """Drain output buffer""" 320 try: 321 future = self._loop.create_future() 322 await self._send_queue.put((None, future)) 323 await future 324 325 except aio.QueueClosedError: 326 raise ConnectionError() 327 328 async def _on_close(self): 329 await _close_copp(self._conn, self._close_apdu) 330 331 def _close(self, apdu): 332 if not self.is_open: 333 return 334 335 self._close_apdu = apdu 336 self._async_group.close() 337 338 async def _receive_loop(self): 339 try: 340 while True: 341 syntax_name, entity = await self._conn.receive() 342 343 if syntax_name == _acse_syntax_name: 344 if entity[0] == 'abrt': 345 close_apdu = None 346 347 elif entity[0] == 'rlrq': 348 close_apdu = _rlre_apdu() 349 350 else: 351 close_apdu = _abrt_apdu(1) 352 353 self._close(close_apdu) 354 break 355 356 await self._receive_queue.put((syntax_name, entity)) 357 358 except ConnectionError: 359 pass 360 361 except Exception as e: 362 self._log.error("receive loop error: %s", e, exc_info=e) 363 364 finally: 365 self._close(_abrt_apdu(1)) 366 self._receive_queue.close() 367 368 async def _send_loop(self): 369 future = None 370 try: 371 while True: 372 data, future = await self._send_queue.get() 373 374 if data is None: 375 await self._conn.drain() 376 377 else: 378 await self._conn.send(data) 379 380 if future and not future.done(): 381 future.set_result(None) 382 383 except ConnectionError: 384 pass 385 386 except Exception as e: 387 self._log.error("send loop error: %s", e, exc_info=e) 388 389 finally: 390 self._close(_abrt_apdu(1)) 391 self._send_queue.close() 392 393 while True: 394 if future and not future.done(): 395 future.set_result(None) 396 if self._send_queue.empty(): 397 break 398 _, future = self._send_queue.get_nowait() 399 400 401async def _close_copp(copp_conn, apdu): 402 data = (_acse_syntax_name, _encode(apdu)) if apdu else None 403 await copp_conn.async_close(data) 404 405 406def _get_ap_titles(aarq_apdu): 407 calling = None 408 if 'calling-AP-title' in aarq_apdu[1]: 409 if aarq_apdu[1]['calling-AP-title'][0] == 'ap-title-form2': 410 calling = aarq_apdu[1]['calling-AP-title'][1] 411 412 called = None 413 if 'called-AP-title' in aarq_apdu[1]: 414 if aarq_apdu[1]['called-AP-title'][0] == 'ap-title-form2': 415 called = aarq_apdu[1]['called-AP-title'][1] 416 417 return calling, called 418 419 420def _get_ae_qualifiers(aarq_apdu): 421 calling = None 422 if 'calling-AE-qualifier' in aarq_apdu[1]: 423 if aarq_apdu[1]['calling-AE-qualifier'][0] == 'ae-qualifier-form2': 424 calling = aarq_apdu[1]['calling-AE-qualifier'][1] 425 426 called = None 427 if 'called-AE-qualifier' in aarq_apdu[1]: 428 if aarq_apdu[1]['called-AE-qualifier'][0] == 'ae-qualifier-form2': 429 called = aarq_apdu[1]['called-AE-qualifier'][1] 430 431 return calling, called 432 433 434def _get_ap_invocation_identifiers(aarq_apdu): 435 calling = aarq_apdu[1].get('calling-AP-invocation-identifier') 436 called = aarq_apdu[1].get('called-AP-invocation-identifier') 437 return calling, called 438 439 440def _get_ae_invocation_identifiers(aarq_apdu): 441 calling = aarq_apdu[1].get('calling-AE-invocation-identifier') 442 called = aarq_apdu[1].get('called-AE-invocation-identifier') 443 return calling, called 444 445 446def _aarq_apdu(syntax_names, app_context_name, 447 calling_ap_title, called_ap_title, 448 calling_ae_qualifier, called_ae_qualifier, 449 user_data): 450 aarq_apdu = 'aarq', {'application-context-name': app_context_name} 451 452 if calling_ap_title is not None: 453 aarq_apdu[1]['calling-AP-title'] = 'ap-title-form2', calling_ap_title 454 455 if called_ap_title is not None: 456 aarq_apdu[1]['called-AP-title'] = 'ap-title-form2', called_ap_title 457 458 if calling_ae_qualifier is not None: 459 aarq_apdu[1]['calling-AE-qualifier'] = ('ae-qualifier-form2', 460 calling_ae_qualifier) 461 462 if called_ae_qualifier is not None: 463 aarq_apdu[1]['called-AE-qualifier'] = ('ae-qualifier-form2', 464 called_ae_qualifier) 465 466 if user_data: 467 aarq_apdu[1]['user-information'] = [ 468 asn1.External(direct_ref=_encoder.syntax_name, 469 indirect_ref=syntax_names.get_id(user_data[0]), 470 data=user_data[1])] 471 472 return aarq_apdu 473 474 475def _aare_apdu(syntax_names, user_data, 476 responding_ap_title, responding_ae_qualifier, 477 responding_ap_invocation_identifier, 478 responding_ae_invocation_identifier): 479 aare_apdu = 'aare', { 480 'application-context-name': user_data[0], 481 'result': 0, 482 'result-source-diagnostic': ('acse-service-user', 0), 483 'user-information': [ 484 asn1.External(direct_ref=_encoder.syntax_name, 485 indirect_ref=syntax_names.get_id(user_data[0]), 486 data=user_data[1])]} 487 488 if responding_ap_title is not None: 489 aare_apdu[1]['responding-AP-title'] = ('ap-title-form2', 490 responding_ap_title) 491 492 if responding_ae_qualifier is not None: 493 aare_apdu[1]['responding-AE-qualifier'] = ('ae-qualifier-form2', 494 responding_ae_qualifier) 495 496 if responding_ap_invocation_identifier is not None: 497 aare_apdu[1]['responding-AP-invocation-identifier'] = \ 498 responding_ap_invocation_identifier 499 500 if responding_ae_invocation_identifier is not None: 501 aare_apdu[1]['responding-AE-invocation-identifier'] = \ 502 responding_ae_invocation_identifier 503 504 return aare_apdu 505 506 507def _abrt_apdu(source): 508 return 'abrt', {'abort-source': source} 509 510 511def _rlre_apdu(): 512 return 'rlre', {} 513 514 515def _encode(value): 516 return _encoder.encode_value(asn1.TypeRef('ACSE-1', 'ACSE-apdu'), value) 517 518 519def _decode(entity): 520 return _encoder.decode_value(asn1.TypeRef('ACSE-1', 'ACSE-apdu'), entity) 521 522 523def _create_server_logger(name, info): 524 extra = {'meta': {'type': 'AcseServer', 525 'name': name}} 526 527 if info is not None: 528 extra['meta']['addresses'] = [{'host': addr.host, 529 'port': addr.port} 530 for addr in info.addresses] 531 532 return logging.LoggerAdapter(mlog, extra) 533 534 535def _create_connection_logger(info): 536 extra = {'meta': {'type': 'AcseConnection', 537 'name': info.name, 538 'local_addr': {'host': info.local_addr.host, 539 'port': info.local_addr.port}, 540 'remote_addr': {'host': info.remote_addr.host, 541 'port': info.remote_addr.port}}} 542 543 return logging.LoggerAdapter(mlog, extra)
28class ConnectionInfo(typing.NamedTuple): 29 name: str | None 30 local_addr: tcp.Address 31 local_tsel: int | None 32 local_ssel: int | None 33 local_psel: int | None 34 local_ap_title: asn1.ObjectIdentifier | None 35 local_ae_qualifier: int | None 36 remote_addr: tcp.Address 37 remote_tsel: int | None 38 remote_ssel: int | None 39 remote_psel: int | None 40 remote_ap_title: asn1.ObjectIdentifier | None 41 remote_ae_qualifier: int | None
ConnectionInfo(name, local_addr, local_tsel, local_ssel, local_psel, local_ap_title, local_ae_qualifier, remote_addr, remote_tsel, remote_ssel, remote_psel, remote_ap_title, remote_ae_qualifier)
Create new instance of ConnectionInfo(name, local_addr, local_tsel, local_ssel, local_psel, local_ap_title, local_ae_qualifier, remote_addr, remote_tsel, remote_ssel, remote_psel, remote_ap_title, remote_ae_qualifier)
Validate callback
Connection callback
53async def connect(addr: tcp.Address, 54 syntax_name_list: list[asn1.ObjectIdentifier], 55 app_context_name: asn1.ObjectIdentifier, 56 user_data: copp.IdentifiedEntity | None = None, 57 *, 58 local_ap_title: asn1.ObjectIdentifier | None = None, 59 remote_ap_title: asn1.ObjectIdentifier | None = None, 60 local_ae_qualifier: int | None = None, 61 remote_ae_qualifier: int | None = None, 62 acse_receive_queue_size: int = 1024, 63 acse_send_queue_size: int = 1024, 64 **kwargs 65 ) -> 'Connection': 66 """Connect to ACSE server 67 68 Additional arguments are passed directly to `hat.drivers.copp.connect` 69 (`syntax_names` is set by this coroutine). 70 71 """ 72 syntax_names = copp.SyntaxNames([_acse_syntax_name, *syntax_name_list]) 73 aarq_apdu = _aarq_apdu(syntax_names, app_context_name, 74 local_ap_title, remote_ap_title, 75 local_ae_qualifier, remote_ae_qualifier, 76 user_data) 77 copp_user_data = _acse_syntax_name, _encode(aarq_apdu) 78 conn = await copp.connect(addr, syntax_names, copp_user_data, **kwargs) 79 80 try: 81 aare_apdu_syntax_name, aare_apdu_entity = conn.conn_res_user_data 82 if aare_apdu_syntax_name != _acse_syntax_name: 83 raise Exception("invalid syntax name") 84 85 aare_apdu = _decode(aare_apdu_entity) 86 if aare_apdu[0] != 'aare' or aare_apdu[1]['result'] != 0: 87 raise Exception("invalid apdu") 88 89 calling_ap_title, called_ap_title = _get_ap_titles(aarq_apdu) 90 calling_ae_qualifier, called_ae_qualifier = _get_ae_qualifiers( 91 aarq_apdu) 92 return Connection(conn, aarq_apdu, aare_apdu, 93 calling_ap_title, called_ap_title, 94 calling_ae_qualifier, called_ae_qualifier, 95 acse_receive_queue_size, acse_send_queue_size) 96 97 except Exception: 98 await aio.uncancellable(_close_copp(conn, _abrt_apdu(1))) 99 raise
Connect to ACSE server
Additional arguments are passed directly to hat.drivers.copp.connect
(syntax_names is set by this coroutine).
102async def listen(validate_cb: ValidateCb, 103 connection_cb: ConnectionCb, 104 addr: tcp.Address = tcp.Address('0.0.0.0', 102), 105 *, 106 bind_connections: bool = False, 107 acse_receive_queue_size: int = 1024, 108 acse_send_queue_size: int = 1024, 109 **kwargs 110 ) -> 'Server': 111 """Create ACSE listening server 112 113 Additional arguments are passed directly to `hat.drivers.copp.listen`. 114 115 Args: 116 validate_cb: callback function or coroutine called on new 117 incomming connection request prior to creating connection object 118 connection_cb: new connection callback 119 addr: local listening address 120 121 """ 122 server = Server() 123 server._validate_cb = validate_cb 124 server._connection_cb = connection_cb 125 server._bind_connections = bind_connections 126 server._receive_queue_size = acse_receive_queue_size 127 server._send_queue_size = acse_send_queue_size 128 server._log = _create_server_logger(kwargs.get('name'), None) 129 130 server._srv = await copp.listen(server._on_validate, 131 server._on_connection, 132 addr, 133 bind_connections=False, 134 **kwargs) 135 136 server._log = _create_server_logger(kwargs.get('name'), server._srv.info) 137 138 return server
Create ACSE listening server
Additional arguments are passed directly to hat.drivers.copp.listen.
Arguments:
- validate_cb: callback function or coroutine called on new incomming connection request prior to creating connection object
- connection_cb: new connection callback
- addr: local listening address
141class Server(aio.Resource): 142 """ACSE listening server 143 144 For creating new server see `listen`. 145 146 """ 147 148 @property 149 def async_group(self) -> aio.Group: 150 """Async group""" 151 return self._srv.async_group 152 153 @property 154 def info(self) -> tcp.ServerInfo: 155 """Server info""" 156 return self._srv.info 157 158 async def _on_validate(self, syntax_names, user_data): 159 aarq_apdu_syntax_name, aarq_apdu_entity = user_data 160 if aarq_apdu_syntax_name != _acse_syntax_name: 161 raise Exception('invalid acse syntax name') 162 163 aarq_apdu = _decode(aarq_apdu_entity) 164 if aarq_apdu[0] != 'aarq': 165 raise Exception('not aarq message') 166 167 aarq_external = aarq_apdu[1]['user-information'][0] 168 if aarq_external.direct_ref is not None: 169 if aarq_external.direct_ref != _encoder.syntax_name: 170 raise Exception('invalid encoder identifier') 171 172 _, called_ap_title = _get_ap_titles(aarq_apdu) 173 _, called_ae_qualifier = _get_ae_qualifiers(aarq_apdu) 174 _, called_ap_invocation_identifier = \ 175 _get_ap_invocation_identifiers(aarq_apdu) 176 _, called_ae_invocation_identifier = \ 177 _get_ae_invocation_identifiers(aarq_apdu) 178 179 aarq_user_data = (syntax_names.get_name(aarq_external.indirect_ref), 180 aarq_external.data) 181 182 user_validate_result = await aio.call(self._validate_cb, syntax_names, 183 aarq_user_data) 184 185 aare_apdu = _aare_apdu(syntax_names, 186 user_validate_result, 187 called_ap_title, called_ae_qualifier, 188 called_ap_invocation_identifier, 189 called_ae_invocation_identifier) 190 return _acse_syntax_name, _encode(aare_apdu) 191 192 async def _on_connection(self, copp_conn): 193 try: 194 try: 195 aarq_apdu = _decode(copp_conn.conn_req_user_data[1]) 196 aare_apdu = _decode(copp_conn.conn_res_user_data[1]) 197 198 calling_ap_title, called_ap_title = _get_ap_titles(aarq_apdu) 199 calling_ae_qualifier, called_ae_qualifier = _get_ae_qualifiers( 200 aarq_apdu) 201 202 conn = Connection(copp_conn, aarq_apdu, aare_apdu, 203 called_ap_title, calling_ap_title, 204 called_ae_qualifier, calling_ae_qualifier, 205 self._receive_queue_size, 206 self._send_queue_size) 207 208 except Exception: 209 await aio.uncancellable(_close_copp(copp_conn, _abrt_apdu(1))) 210 raise 211 212 try: 213 await aio.call(self._connection_cb, conn) 214 215 except BaseException: 216 await aio.uncancellable(conn.async_close()) 217 raise 218 219 except Exception as e: 220 self._log.error("error creating new incomming connection: %s", 221 e, exc_info=e) 222 return 223 224 if not self._bind_connections: 225 return 226 227 try: 228 await conn.wait_closed() 229 230 except BaseException: 231 await aio.uncancellable(conn.async_close()) 232 raise
ACSE listening server
For creating new server see listen.
148 @property 149 def async_group(self) -> aio.Group: 150 """Async group""" 151 return self._srv.async_group
Async group
235class Connection(aio.Resource): 236 """ACSE connection 237 238 For creating new connection see `connect` or `listen`. 239 240 """ 241 242 def __init__(self, 243 conn: copp.Connection, 244 aarq_apdu: asn1.Value, 245 aare_apdu: asn1.Value, 246 local_ap_title: asn1.ObjectIdentifier | None, 247 remote_ap_title: asn1.ObjectIdentifier | None, 248 local_ae_qualifier: int | None, 249 remote_ae_qualifier: int | None, 250 receive_queue_size: int, 251 send_queue_size: int): 252 aarq_external = aarq_apdu[1]['user-information'][0] 253 aare_external = aare_apdu[1]['user-information'][0] 254 255 conn_req_user_data = ( 256 conn.syntax_names.get_name(aarq_external.indirect_ref), 257 aarq_external.data) 258 conn_res_user_data = ( 259 conn.syntax_names.get_name(aare_external.indirect_ref), 260 aare_external.data) 261 262 self._conn = conn 263 self._conn_req_user_data = conn_req_user_data 264 self._conn_res_user_data = conn_res_user_data 265 self._loop = asyncio.get_running_loop() 266 self._info = ConnectionInfo(local_ap_title=local_ap_title, 267 local_ae_qualifier=local_ae_qualifier, 268 remote_ap_title=remote_ap_title, 269 remote_ae_qualifier=remote_ae_qualifier, 270 **conn.info._asdict()) 271 self._close_apdu = _abrt_apdu(0) 272 self._receive_queue = aio.Queue(receive_queue_size) 273 self._send_queue = aio.Queue(send_queue_size) 274 self._async_group = aio.Group() 275 self._log = _create_connection_logger(self._info) 276 277 self.async_group.spawn(aio.call_on_cancel, self._on_close) 278 self.async_group.spawn(self._receive_loop) 279 self.async_group.spawn(self._send_loop) 280 self.async_group.spawn(aio.call_on_done, conn.wait_closing(), 281 self.close) 282 283 @property 284 def async_group(self) -> aio.Group: 285 """Async group""" 286 return self._async_group 287 288 @property 289 def info(self) -> ConnectionInfo: 290 """Connection info""" 291 return self._info 292 293 @property 294 def conn_req_user_data(self) -> copp.IdentifiedEntity: 295 """Connect request's user data""" 296 return self._conn_req_user_data 297 298 @property 299 def conn_res_user_data(self) -> copp.IdentifiedEntity: 300 """Connect response's user data""" 301 return self._conn_res_user_data 302 303 async def receive(self) -> copp.IdentifiedEntity: 304 """Receive data""" 305 try: 306 return await self._receive_queue.get() 307 308 except aio.QueueClosedError: 309 raise ConnectionError() 310 311 async def send(self, data: copp.IdentifiedEntity): 312 """Send data""" 313 try: 314 await self._send_queue.put((data, None)) 315 316 except aio.QueueClosedError: 317 raise ConnectionError() 318 319 async def drain(self): 320 """Drain output buffer""" 321 try: 322 future = self._loop.create_future() 323 await self._send_queue.put((None, future)) 324 await future 325 326 except aio.QueueClosedError: 327 raise ConnectionError() 328 329 async def _on_close(self): 330 await _close_copp(self._conn, self._close_apdu) 331 332 def _close(self, apdu): 333 if not self.is_open: 334 return 335 336 self._close_apdu = apdu 337 self._async_group.close() 338 339 async def _receive_loop(self): 340 try: 341 while True: 342 syntax_name, entity = await self._conn.receive() 343 344 if syntax_name == _acse_syntax_name: 345 if entity[0] == 'abrt': 346 close_apdu = None 347 348 elif entity[0] == 'rlrq': 349 close_apdu = _rlre_apdu() 350 351 else: 352 close_apdu = _abrt_apdu(1) 353 354 self._close(close_apdu) 355 break 356 357 await self._receive_queue.put((syntax_name, entity)) 358 359 except ConnectionError: 360 pass 361 362 except Exception as e: 363 self._log.error("receive loop error: %s", e, exc_info=e) 364 365 finally: 366 self._close(_abrt_apdu(1)) 367 self._receive_queue.close() 368 369 async def _send_loop(self): 370 future = None 371 try: 372 while True: 373 data, future = await self._send_queue.get() 374 375 if data is None: 376 await self._conn.drain() 377 378 else: 379 await self._conn.send(data) 380 381 if future and not future.done(): 382 future.set_result(None) 383 384 except ConnectionError: 385 pass 386 387 except Exception as e: 388 self._log.error("send loop error: %s", e, exc_info=e) 389 390 finally: 391 self._close(_abrt_apdu(1)) 392 self._send_queue.close() 393 394 while True: 395 if future and not future.done(): 396 future.set_result(None) 397 if self._send_queue.empty(): 398 break 399 _, future = self._send_queue.get_nowait()
242 def __init__(self, 243 conn: copp.Connection, 244 aarq_apdu: asn1.Value, 245 aare_apdu: asn1.Value, 246 local_ap_title: asn1.ObjectIdentifier | None, 247 remote_ap_title: asn1.ObjectIdentifier | None, 248 local_ae_qualifier: int | None, 249 remote_ae_qualifier: int | None, 250 receive_queue_size: int, 251 send_queue_size: int): 252 aarq_external = aarq_apdu[1]['user-information'][0] 253 aare_external = aare_apdu[1]['user-information'][0] 254 255 conn_req_user_data = ( 256 conn.syntax_names.get_name(aarq_external.indirect_ref), 257 aarq_external.data) 258 conn_res_user_data = ( 259 conn.syntax_names.get_name(aare_external.indirect_ref), 260 aare_external.data) 261 262 self._conn = conn 263 self._conn_req_user_data = conn_req_user_data 264 self._conn_res_user_data = conn_res_user_data 265 self._loop = asyncio.get_running_loop() 266 self._info = ConnectionInfo(local_ap_title=local_ap_title, 267 local_ae_qualifier=local_ae_qualifier, 268 remote_ap_title=remote_ap_title, 269 remote_ae_qualifier=remote_ae_qualifier, 270 **conn.info._asdict()) 271 self._close_apdu = _abrt_apdu(0) 272 self._receive_queue = aio.Queue(receive_queue_size) 273 self._send_queue = aio.Queue(send_queue_size) 274 self._async_group = aio.Group() 275 self._log = _create_connection_logger(self._info) 276 277 self.async_group.spawn(aio.call_on_cancel, self._on_close) 278 self.async_group.spawn(self._receive_loop) 279 self.async_group.spawn(self._send_loop) 280 self.async_group.spawn(aio.call_on_done, conn.wait_closing(), 281 self.close)
283 @property 284 def async_group(self) -> aio.Group: 285 """Async group""" 286 return self._async_group
Async group
293 @property 294 def conn_req_user_data(self) -> copp.IdentifiedEntity: 295 """Connect request's user data""" 296 return self._conn_req_user_data
Connect request's user data
298 @property 299 def conn_res_user_data(self) -> copp.IdentifiedEntity: 300 """Connect response's user data""" 301 return self._conn_res_user_data
Connect response's user data
303 async def receive(self) -> copp.IdentifiedEntity: 304 """Receive data""" 305 try: 306 return await self._receive_queue.get() 307 308 except aio.QueueClosedError: 309 raise ConnectionError()
Receive data