hat.drivers.acse
Association Controll Service Element
1"""Association Controll Service Element""" 2 3import asyncio 4import importlib.resources 5import logging 6import typing 7 8from hat import aio 9from hat import asn1 10from hat import json 11 12from hat.drivers import copp 13from hat.drivers import tcp 14 15 16mlog = logging.getLogger(__name__) 17 18# (joint-iso-itu-t, association-control, abstract-syntax, apdus, version1) 19_acse_syntax_name = (2, 2, 1, 0, 1) 20 21with importlib.resources.open_text(__package__, 'asn1_repo.json') as _f: 22 _encoder = asn1.ber.BerEncoder( 23 asn1.repository_from_json( 24 json.decode_stream(_f))) 25 26 27class ConnectionInfo(typing.NamedTuple): 28 local_addr: tcp.Address 29 local_tsel: int | None 30 local_ssel: int | None 31 local_psel: int | None 32 local_ap_title: asn1.ObjectIdentifier | None 33 local_ae_qualifier: int | None 34 remote_addr: tcp.Address 35 remote_tsel: int | None 36 remote_ssel: int | None 37 remote_psel: int | None 38 remote_ap_title: asn1.ObjectIdentifier | None 39 remote_ae_qualifier: int | None 40 41 42ValidateCb: typing.TypeAlias = aio.AsyncCallable[[copp.SyntaxNames, 43 copp.IdentifiedEntity], 44 copp.IdentifiedEntity | None] 45"""Validate callback""" 46 47ConnectionCb: typing.TypeAlias = aio.AsyncCallable[['Connection'], None] 48"""Connection callback""" 49 50 51async def connect(addr: tcp.Address, 52 syntax_name_list: list[asn1.ObjectIdentifier], 53 app_context_name: asn1.ObjectIdentifier, 54 user_data: copp.IdentifiedEntity | None = None, 55 *, 56 local_ap_title: asn1.ObjectIdentifier | None = None, 57 remote_ap_title: asn1.ObjectIdentifier | None = None, 58 local_ae_qualifier: int | None = None, 59 remote_ae_qualifier: int | None = None, 60 acse_receive_queue_size: int = 1024, 61 acse_send_queue_size: int = 1024, 62 **kwargs 63 ) -> 'Connection': 64 """Connect to ACSE server 65 66 Additional arguments are passed directly to `hat.drivers.copp.connect` 67 (`syntax_names` is set by this coroutine). 68 69 """ 70 syntax_names = copp.SyntaxNames([_acse_syntax_name, *syntax_name_list]) 71 aarq_apdu = _aarq_apdu(syntax_names, app_context_name, 72 local_ap_title, remote_ap_title, 73 local_ae_qualifier, remote_ae_qualifier, 74 user_data) 75 copp_user_data = _acse_syntax_name, _encode(aarq_apdu) 76 conn = await copp.connect(addr, syntax_names, copp_user_data, **kwargs) 77 78 try: 79 aare_apdu_syntax_name, aare_apdu_entity = conn.conn_res_user_data 80 if aare_apdu_syntax_name != _acse_syntax_name: 81 raise Exception("invalid syntax name") 82 83 aare_apdu = _decode(aare_apdu_entity) 84 if aare_apdu[0] != 'aare' or aare_apdu[1]['result'] != 0: 85 raise Exception("invalid apdu") 86 87 calling_ap_title, called_ap_title = _get_ap_titles(aarq_apdu) 88 calling_ae_qualifier, called_ae_qualifier = _get_ae_qualifiers( 89 aarq_apdu) 90 return Connection(conn, aarq_apdu, aare_apdu, 91 calling_ap_title, called_ap_title, 92 calling_ae_qualifier, called_ae_qualifier, 93 acse_receive_queue_size, acse_send_queue_size) 94 95 except Exception: 96 await aio.uncancellable(_close_copp(conn, _abrt_apdu(1))) 97 raise 98 99 100async def listen(validate_cb: ValidateCb, 101 connection_cb: ConnectionCb, 102 addr: tcp.Address = tcp.Address('0.0.0.0', 102), 103 *, 104 bind_connections: bool = False, 105 acse_receive_queue_size: int = 1024, 106 acse_send_queue_size: int = 1024, 107 **kwargs 108 ) -> 'Server': 109 """Create ACSE listening server 110 111 Additional arguments are passed directly to `hat.drivers.copp.listen`. 112 113 Args: 114 validate_cb: callback function or coroutine called on new 115 incomming connection request prior to creating connection object 116 connection_cb: new connection callback 117 addr: local listening address 118 119 """ 120 server = Server() 121 server._validate_cb = validate_cb 122 server._connection_cb = connection_cb 123 server._bind_connections = bind_connections 124 server._receive_queue_size = acse_receive_queue_size 125 server._send_queue_size = acse_send_queue_size 126 127 server._srv = await copp.listen(server._on_validate, 128 server._on_connection, 129 addr, 130 bind_connections=False, 131 **kwargs) 132 133 return server 134 135 136class Server(aio.Resource): 137 """ACSE listening server 138 139 For creating new server see `listen`. 140 141 """ 142 143 @property 144 def async_group(self) -> aio.Group: 145 """Async group""" 146 return self._srv.async_group 147 148 @property 149 def addresses(self) -> list[tcp.Address]: 150 """Listening addresses""" 151 return self._srv.addresses 152 153 async def _on_validate(self, syntax_names, user_data): 154 aarq_apdu_syntax_name, aarq_apdu_entity = user_data 155 if aarq_apdu_syntax_name != _acse_syntax_name: 156 raise Exception('invalid acse syntax name') 157 158 aarq_apdu = _decode(aarq_apdu_entity) 159 if aarq_apdu[0] != 'aarq': 160 raise Exception('not aarq message') 161 162 aarq_external = aarq_apdu[1]['user-information'][0] 163 if aarq_external.direct_ref is not None: 164 if aarq_external.direct_ref != _encoder.syntax_name: 165 raise Exception('invalid encoder identifier') 166 167 _, called_ap_title = _get_ap_titles(aarq_apdu) 168 _, called_ae_qualifier = _get_ae_qualifiers(aarq_apdu) 169 _, called_ap_invocation_identifier = \ 170 _get_ap_invocation_identifiers(aarq_apdu) 171 _, called_ae_invocation_identifier = \ 172 _get_ae_invocation_identifiers(aarq_apdu) 173 174 aarq_user_data = (syntax_names.get_name(aarq_external.indirect_ref), 175 aarq_external.data) 176 177 user_validate_result = await aio.call(self._validate_cb, syntax_names, 178 aarq_user_data) 179 180 aare_apdu = _aare_apdu(syntax_names, 181 user_validate_result, 182 called_ap_title, called_ae_qualifier, 183 called_ap_invocation_identifier, 184 called_ae_invocation_identifier) 185 return _acse_syntax_name, _encode(aare_apdu) 186 187 async def _on_connection(self, copp_conn): 188 try: 189 try: 190 aarq_apdu = _decode(copp_conn.conn_req_user_data[1]) 191 aare_apdu = _decode(copp_conn.conn_res_user_data[1]) 192 193 calling_ap_title, called_ap_title = _get_ap_titles(aarq_apdu) 194 calling_ae_qualifier, called_ae_qualifier = _get_ae_qualifiers( 195 aarq_apdu) 196 197 conn = Connection(copp_conn, aarq_apdu, aare_apdu, 198 calling_ap_title, called_ap_title, 199 calling_ae_qualifier, called_ae_qualifier, 200 self._receive_queue_size, 201 self._send_queue_size) 202 203 except Exception: 204 await aio.uncancellable(_close_copp(copp_conn, _abrt_apdu(1))) 205 raise 206 207 try: 208 await aio.call(self._connection_cb, conn) 209 210 except BaseException: 211 await aio.uncancellable(conn.async_close()) 212 raise 213 214 except Exception as e: 215 mlog.error("error creating new incomming connection: %s", e, 216 exc_info=e) 217 return 218 219 if not self._bind_connections: 220 return 221 222 try: 223 await conn.wait_closed() 224 225 except BaseException: 226 await aio.uncancellable(conn.async_close()) 227 raise 228 229 230class Connection(aio.Resource): 231 """ACSE connection 232 233 For creating new connection see `connect` or `listen`. 234 235 """ 236 237 def __init__(self, 238 conn: copp.Connection, 239 aarq_apdu: asn1.Value, 240 aare_apdu: asn1.Value, 241 local_ap_title: asn1.ObjectIdentifier | None, 242 remote_ap_title: asn1.ObjectIdentifier | None, 243 local_ae_qualifier: int | None, 244 remote_ae_qualifier: int | None, 245 receive_queue_size: int, 246 send_queue_size: int): 247 aarq_external = aarq_apdu[1]['user-information'][0] 248 aare_external = aare_apdu[1]['user-information'][0] 249 250 conn_req_user_data = ( 251 conn.syntax_names.get_name(aarq_external.indirect_ref), 252 aarq_external.data) 253 conn_res_user_data = ( 254 conn.syntax_names.get_name(aare_external.indirect_ref), 255 aare_external.data) 256 257 self._conn = conn 258 self._conn_req_user_data = conn_req_user_data 259 self._conn_res_user_data = conn_res_user_data 260 self._loop = asyncio.get_running_loop() 261 self._info = ConnectionInfo(local_ap_title=local_ap_title, 262 local_ae_qualifier=local_ae_qualifier, 263 remote_ap_title=remote_ap_title, 264 remote_ae_qualifier=remote_ae_qualifier, 265 **conn.info._asdict()) 266 self._close_apdu = _abrt_apdu(0) 267 self._receive_queue = aio.Queue(receive_queue_size) 268 self._send_queue = aio.Queue(send_queue_size) 269 self._async_group = aio.Group() 270 271 self.async_group.spawn(aio.call_on_cancel, self._on_close) 272 self.async_group.spawn(self._receive_loop) 273 self.async_group.spawn(self._send_loop) 274 self.async_group.spawn(aio.call_on_done, conn.wait_closing(), 275 self.close) 276 277 @property 278 def async_group(self) -> aio.Group: 279 """Async group""" 280 return self._async_group 281 282 @property 283 def info(self) -> ConnectionInfo: 284 """Connection info""" 285 return self._info 286 287 @property 288 def conn_req_user_data(self) -> copp.IdentifiedEntity: 289 """Connect request's user data""" 290 return self._conn_req_user_data 291 292 @property 293 def conn_res_user_data(self) -> copp.IdentifiedEntity: 294 """Connect response's user data""" 295 return self._conn_res_user_data 296 297 async def receive(self) -> copp.IdentifiedEntity: 298 """Receive data""" 299 try: 300 return await self._receive_queue.get() 301 302 except aio.QueueClosedError: 303 raise ConnectionError() 304 305 async def send(self, data: copp.IdentifiedEntity): 306 """Send data""" 307 try: 308 await self._send_queue.put((data, None)) 309 310 except aio.QueueClosedError: 311 raise ConnectionError() 312 313 async def drain(self): 314 """Drain output buffer""" 315 try: 316 future = self._loop.create_future() 317 await self._send_queue.put((None, future)) 318 await future 319 320 except aio.QueueClosedError: 321 raise ConnectionError() 322 323 async def _on_close(self): 324 await _close_copp(self._conn, self._close_apdu) 325 326 def _close(self, apdu): 327 if not self.is_open: 328 return 329 330 self._close_apdu = apdu 331 self._async_group.close() 332 333 async def _receive_loop(self): 334 try: 335 while True: 336 syntax_name, entity = await self._conn.receive() 337 338 if syntax_name == _acse_syntax_name: 339 if entity[0] == 'abrt': 340 close_apdu = None 341 342 elif entity[0] == 'rlrq': 343 close_apdu = _rlre_apdu() 344 345 else: 346 close_apdu = _abrt_apdu(1) 347 348 self._close(close_apdu) 349 break 350 351 await self._receive_queue.put((syntax_name, entity)) 352 353 except ConnectionError: 354 pass 355 356 except Exception as e: 357 mlog.error("receive loop error: %s", e, exc_info=e) 358 359 finally: 360 self._close(_abrt_apdu(1)) 361 self._receive_queue.close() 362 363 async def _send_loop(self): 364 future = None 365 try: 366 while True: 367 data, future = await self._send_queue.get() 368 369 if data is None: 370 await self._conn.drain() 371 372 else: 373 await self._conn.send(data) 374 375 if future and not future.done(): 376 future.set_result(None) 377 378 except ConnectionError: 379 pass 380 381 except Exception as e: 382 mlog.error("send loop error: %s", e, exc_info=e) 383 384 finally: 385 self._close(_abrt_apdu(1)) 386 self._send_queue.close() 387 388 while True: 389 if future and not future.done(): 390 future.set_result(None) 391 if self._send_queue.empty(): 392 break 393 _, future = self._send_queue.get_nowait() 394 395 396async def _close_copp(copp_conn, apdu): 397 data = (_acse_syntax_name, _encode(apdu)) if apdu else None 398 await copp_conn.async_close(data) 399 400 401def _get_ap_titles(aarq_apdu): 402 calling = None 403 if 'calling-AP-title' in aarq_apdu[1]: 404 if aarq_apdu[1]['calling-AP-title'][0] == 'ap-title-form2': 405 calling = aarq_apdu[1]['calling-AP-title'][1] 406 407 called = None 408 if 'called-AP-title' in aarq_apdu[1]: 409 if aarq_apdu[1]['called-AP-title'][0] == 'ap-title-form2': 410 called = aarq_apdu[1]['called-AP-title'][1] 411 412 return calling, called 413 414 415def _get_ae_qualifiers(aarq_apdu): 416 calling = None 417 if 'calling-AE-qualifier' in aarq_apdu[1]: 418 if aarq_apdu[1]['calling-AE-qualifier'][0] == 'ap-qualifier-form2': 419 calling = aarq_apdu[1]['calling-AE-qualifier'][1] 420 421 called = None 422 if 'called-AE-qualifier' in aarq_apdu[1]: 423 if aarq_apdu[1]['called-AE-qualifier'][0] == 'ap-qualifier-form2': 424 called = aarq_apdu[1]['called-AE-qualifier'][1] 425 426 return calling, called 427 428 429def _get_ap_invocation_identifiers(aarq_apdu): 430 calling = aarq_apdu[1].get('calling-AP-invocation-identifier') 431 called = aarq_apdu[1].get('called-AP-invocation-identifier') 432 return calling, called 433 434 435def _get_ae_invocation_identifiers(aarq_apdu): 436 calling = aarq_apdu[1].get('calling-AE-invocation-identifier') 437 called = aarq_apdu[1].get('called-AE-invocation-identifier') 438 return calling, called 439 440 441def _aarq_apdu(syntax_names, app_context_name, 442 calling_ap_title, called_ap_title, 443 calling_ae_qualifier, called_ae_qualifier, 444 user_data): 445 aarq_apdu = 'aarq', {'application-context-name': app_context_name} 446 447 if calling_ap_title is not None: 448 aarq_apdu[1]['calling-AP-title'] = 'ap-title-form2', calling_ap_title 449 450 if called_ap_title is not None: 451 aarq_apdu[1]['called-AP-title'] = 'ap-title-form2', called_ap_title 452 453 if calling_ae_qualifier is not None: 454 aarq_apdu[1]['calling-AE-qualifier'] = ('ae-qualifier-form2', 455 calling_ae_qualifier) 456 457 if called_ae_qualifier is not None: 458 aarq_apdu[1]['called-AE-qualifier'] = ('ae-qualifier-form2', 459 called_ae_qualifier) 460 461 if user_data: 462 aarq_apdu[1]['user-information'] = [ 463 asn1.External(direct_ref=_encoder.syntax_name, 464 indirect_ref=syntax_names.get_id(user_data[0]), 465 data=user_data[1])] 466 467 return aarq_apdu 468 469 470def _aare_apdu(syntax_names, user_data, 471 responding_ap_title, responding_ae_qualifier, 472 responding_ap_invocation_identifier, 473 responding_ae_invocation_identifier): 474 aare_apdu = 'aare', { 475 'application-context-name': user_data[0], 476 'result': 0, 477 'result-source-diagnostic': ('acse-service-user', 0), 478 'user-information': [ 479 asn1.External(direct_ref=_encoder.syntax_name, 480 indirect_ref=syntax_names.get_id(user_data[0]), 481 data=user_data[1])]} 482 483 if responding_ap_title is not None: 484 aare_apdu[1]['responding-AP-title'] = ('ap-title-form2', 485 responding_ap_title) 486 487 if responding_ae_qualifier is not None: 488 aare_apdu[1]['responding-AE-qualifier'] = ('ae-qualifier-form2', 489 responding_ae_qualifier) 490 491 if responding_ap_invocation_identifier is not None: 492 aare_apdu[1]['responding-AP-invocation-identifier'] = \ 493 responding_ap_invocation_identifier 494 495 if responding_ae_invocation_identifier is not None: 496 aare_apdu[1]['responding-AE-invocation-identifier'] = \ 497 responding_ae_invocation_identifier 498 499 return aare_apdu 500 501 502def _abrt_apdu(source): 503 return 'abrt', {'abort-source': source} 504 505 506def _rlre_apdu(): 507 return 'rlre', {} 508 509 510def _encode(value): 511 return _encoder.encode_value(asn1.TypeRef('ACSE-1', 'ACSE-apdu'), value) 512 513 514def _decode(entity): 515 return _encoder.decode_value(asn1.TypeRef('ACSE-1', 'ACSE-apdu'), entity)
28class ConnectionInfo(typing.NamedTuple): 29 local_addr: tcp.Address 30 local_tsel: int | None 31 local_ssel: int | None 32 local_psel: int | None 33 local_ap_title: asn1.ObjectIdentifier | None 34 local_ae_qualifier: int | None 35 remote_addr: tcp.Address 36 remote_tsel: int | None 37 remote_ssel: int | None 38 remote_psel: int | None 39 remote_ap_title: asn1.ObjectIdentifier | None 40 remote_ae_qualifier: int | None
ConnectionInfo(local_addr, local_tsel, local_ssel, local_psel, local_ap_title, local_ae_qualifier, remote_addr, remote_tsel, remote_ssel, remote_psel, remote_ap_title, remote_ae_qualifier)
Create new instance of ConnectionInfo(local_addr, local_tsel, local_ssel, local_psel, local_ap_title, local_ae_qualifier, remote_addr, remote_tsel, remote_ssel, remote_psel, remote_ap_title, remote_ae_qualifier)
Validate callback
Connection callback
52async def connect(addr: tcp.Address, 53 syntax_name_list: list[asn1.ObjectIdentifier], 54 app_context_name: asn1.ObjectIdentifier, 55 user_data: copp.IdentifiedEntity | None = None, 56 *, 57 local_ap_title: asn1.ObjectIdentifier | None = None, 58 remote_ap_title: asn1.ObjectIdentifier | None = None, 59 local_ae_qualifier: int | None = None, 60 remote_ae_qualifier: int | None = None, 61 acse_receive_queue_size: int = 1024, 62 acse_send_queue_size: int = 1024, 63 **kwargs 64 ) -> 'Connection': 65 """Connect to ACSE server 66 67 Additional arguments are passed directly to `hat.drivers.copp.connect` 68 (`syntax_names` is set by this coroutine). 69 70 """ 71 syntax_names = copp.SyntaxNames([_acse_syntax_name, *syntax_name_list]) 72 aarq_apdu = _aarq_apdu(syntax_names, app_context_name, 73 local_ap_title, remote_ap_title, 74 local_ae_qualifier, remote_ae_qualifier, 75 user_data) 76 copp_user_data = _acse_syntax_name, _encode(aarq_apdu) 77 conn = await copp.connect(addr, syntax_names, copp_user_data, **kwargs) 78 79 try: 80 aare_apdu_syntax_name, aare_apdu_entity = conn.conn_res_user_data 81 if aare_apdu_syntax_name != _acse_syntax_name: 82 raise Exception("invalid syntax name") 83 84 aare_apdu = _decode(aare_apdu_entity) 85 if aare_apdu[0] != 'aare' or aare_apdu[1]['result'] != 0: 86 raise Exception("invalid apdu") 87 88 calling_ap_title, called_ap_title = _get_ap_titles(aarq_apdu) 89 calling_ae_qualifier, called_ae_qualifier = _get_ae_qualifiers( 90 aarq_apdu) 91 return Connection(conn, aarq_apdu, aare_apdu, 92 calling_ap_title, called_ap_title, 93 calling_ae_qualifier, called_ae_qualifier, 94 acse_receive_queue_size, acse_send_queue_size) 95 96 except Exception: 97 await aio.uncancellable(_close_copp(conn, _abrt_apdu(1))) 98 raise
Connect to ACSE server
Additional arguments are passed directly to hat.drivers.copp.connect
(syntax_names
is set by this coroutine).
101async def listen(validate_cb: ValidateCb, 102 connection_cb: ConnectionCb, 103 addr: tcp.Address = tcp.Address('0.0.0.0', 102), 104 *, 105 bind_connections: bool = False, 106 acse_receive_queue_size: int = 1024, 107 acse_send_queue_size: int = 1024, 108 **kwargs 109 ) -> 'Server': 110 """Create ACSE listening server 111 112 Additional arguments are passed directly to `hat.drivers.copp.listen`. 113 114 Args: 115 validate_cb: callback function or coroutine called on new 116 incomming connection request prior to creating connection object 117 connection_cb: new connection callback 118 addr: local listening address 119 120 """ 121 server = Server() 122 server._validate_cb = validate_cb 123 server._connection_cb = connection_cb 124 server._bind_connections = bind_connections 125 server._receive_queue_size = acse_receive_queue_size 126 server._send_queue_size = acse_send_queue_size 127 128 server._srv = await copp.listen(server._on_validate, 129 server._on_connection, 130 addr, 131 bind_connections=False, 132 **kwargs) 133 134 return server
Create ACSE listening server
Additional arguments are passed directly to hat.drivers.copp.listen
.
Arguments:
- validate_cb: callback function or coroutine called on new incomming connection request prior to creating connection object
- connection_cb: new connection callback
- addr: local listening address
137class Server(aio.Resource): 138 """ACSE listening server 139 140 For creating new server see `listen`. 141 142 """ 143 144 @property 145 def async_group(self) -> aio.Group: 146 """Async group""" 147 return self._srv.async_group 148 149 @property 150 def addresses(self) -> list[tcp.Address]: 151 """Listening addresses""" 152 return self._srv.addresses 153 154 async def _on_validate(self, syntax_names, user_data): 155 aarq_apdu_syntax_name, aarq_apdu_entity = user_data 156 if aarq_apdu_syntax_name != _acse_syntax_name: 157 raise Exception('invalid acse syntax name') 158 159 aarq_apdu = _decode(aarq_apdu_entity) 160 if aarq_apdu[0] != 'aarq': 161 raise Exception('not aarq message') 162 163 aarq_external = aarq_apdu[1]['user-information'][0] 164 if aarq_external.direct_ref is not None: 165 if aarq_external.direct_ref != _encoder.syntax_name: 166 raise Exception('invalid encoder identifier') 167 168 _, called_ap_title = _get_ap_titles(aarq_apdu) 169 _, called_ae_qualifier = _get_ae_qualifiers(aarq_apdu) 170 _, called_ap_invocation_identifier = \ 171 _get_ap_invocation_identifiers(aarq_apdu) 172 _, called_ae_invocation_identifier = \ 173 _get_ae_invocation_identifiers(aarq_apdu) 174 175 aarq_user_data = (syntax_names.get_name(aarq_external.indirect_ref), 176 aarq_external.data) 177 178 user_validate_result = await aio.call(self._validate_cb, syntax_names, 179 aarq_user_data) 180 181 aare_apdu = _aare_apdu(syntax_names, 182 user_validate_result, 183 called_ap_title, called_ae_qualifier, 184 called_ap_invocation_identifier, 185 called_ae_invocation_identifier) 186 return _acse_syntax_name, _encode(aare_apdu) 187 188 async def _on_connection(self, copp_conn): 189 try: 190 try: 191 aarq_apdu = _decode(copp_conn.conn_req_user_data[1]) 192 aare_apdu = _decode(copp_conn.conn_res_user_data[1]) 193 194 calling_ap_title, called_ap_title = _get_ap_titles(aarq_apdu) 195 calling_ae_qualifier, called_ae_qualifier = _get_ae_qualifiers( 196 aarq_apdu) 197 198 conn = Connection(copp_conn, aarq_apdu, aare_apdu, 199 calling_ap_title, called_ap_title, 200 calling_ae_qualifier, called_ae_qualifier, 201 self._receive_queue_size, 202 self._send_queue_size) 203 204 except Exception: 205 await aio.uncancellable(_close_copp(copp_conn, _abrt_apdu(1))) 206 raise 207 208 try: 209 await aio.call(self._connection_cb, conn) 210 211 except BaseException: 212 await aio.uncancellable(conn.async_close()) 213 raise 214 215 except Exception as e: 216 mlog.error("error creating new incomming connection: %s", e, 217 exc_info=e) 218 return 219 220 if not self._bind_connections: 221 return 222 223 try: 224 await conn.wait_closed() 225 226 except BaseException: 227 await aio.uncancellable(conn.async_close()) 228 raise
ACSE listening server
For creating new server see listen
.
144 @property 145 def async_group(self) -> aio.Group: 146 """Async group""" 147 return self._srv.async_group
Async group
149 @property 150 def addresses(self) -> list[tcp.Address]: 151 """Listening addresses""" 152 return self._srv.addresses
Listening addresses
231class Connection(aio.Resource): 232 """ACSE connection 233 234 For creating new connection see `connect` or `listen`. 235 236 """ 237 238 def __init__(self, 239 conn: copp.Connection, 240 aarq_apdu: asn1.Value, 241 aare_apdu: asn1.Value, 242 local_ap_title: asn1.ObjectIdentifier | None, 243 remote_ap_title: asn1.ObjectIdentifier | None, 244 local_ae_qualifier: int | None, 245 remote_ae_qualifier: int | None, 246 receive_queue_size: int, 247 send_queue_size: int): 248 aarq_external = aarq_apdu[1]['user-information'][0] 249 aare_external = aare_apdu[1]['user-information'][0] 250 251 conn_req_user_data = ( 252 conn.syntax_names.get_name(aarq_external.indirect_ref), 253 aarq_external.data) 254 conn_res_user_data = ( 255 conn.syntax_names.get_name(aare_external.indirect_ref), 256 aare_external.data) 257 258 self._conn = conn 259 self._conn_req_user_data = conn_req_user_data 260 self._conn_res_user_data = conn_res_user_data 261 self._loop = asyncio.get_running_loop() 262 self._info = ConnectionInfo(local_ap_title=local_ap_title, 263 local_ae_qualifier=local_ae_qualifier, 264 remote_ap_title=remote_ap_title, 265 remote_ae_qualifier=remote_ae_qualifier, 266 **conn.info._asdict()) 267 self._close_apdu = _abrt_apdu(0) 268 self._receive_queue = aio.Queue(receive_queue_size) 269 self._send_queue = aio.Queue(send_queue_size) 270 self._async_group = aio.Group() 271 272 self.async_group.spawn(aio.call_on_cancel, self._on_close) 273 self.async_group.spawn(self._receive_loop) 274 self.async_group.spawn(self._send_loop) 275 self.async_group.spawn(aio.call_on_done, conn.wait_closing(), 276 self.close) 277 278 @property 279 def async_group(self) -> aio.Group: 280 """Async group""" 281 return self._async_group 282 283 @property 284 def info(self) -> ConnectionInfo: 285 """Connection info""" 286 return self._info 287 288 @property 289 def conn_req_user_data(self) -> copp.IdentifiedEntity: 290 """Connect request's user data""" 291 return self._conn_req_user_data 292 293 @property 294 def conn_res_user_data(self) -> copp.IdentifiedEntity: 295 """Connect response's user data""" 296 return self._conn_res_user_data 297 298 async def receive(self) -> copp.IdentifiedEntity: 299 """Receive data""" 300 try: 301 return await self._receive_queue.get() 302 303 except aio.QueueClosedError: 304 raise ConnectionError() 305 306 async def send(self, data: copp.IdentifiedEntity): 307 """Send data""" 308 try: 309 await self._send_queue.put((data, None)) 310 311 except aio.QueueClosedError: 312 raise ConnectionError() 313 314 async def drain(self): 315 """Drain output buffer""" 316 try: 317 future = self._loop.create_future() 318 await self._send_queue.put((None, future)) 319 await future 320 321 except aio.QueueClosedError: 322 raise ConnectionError() 323 324 async def _on_close(self): 325 await _close_copp(self._conn, self._close_apdu) 326 327 def _close(self, apdu): 328 if not self.is_open: 329 return 330 331 self._close_apdu = apdu 332 self._async_group.close() 333 334 async def _receive_loop(self): 335 try: 336 while True: 337 syntax_name, entity = await self._conn.receive() 338 339 if syntax_name == _acse_syntax_name: 340 if entity[0] == 'abrt': 341 close_apdu = None 342 343 elif entity[0] == 'rlrq': 344 close_apdu = _rlre_apdu() 345 346 else: 347 close_apdu = _abrt_apdu(1) 348 349 self._close(close_apdu) 350 break 351 352 await self._receive_queue.put((syntax_name, entity)) 353 354 except ConnectionError: 355 pass 356 357 except Exception as e: 358 mlog.error("receive loop error: %s", e, exc_info=e) 359 360 finally: 361 self._close(_abrt_apdu(1)) 362 self._receive_queue.close() 363 364 async def _send_loop(self): 365 future = None 366 try: 367 while True: 368 data, future = await self._send_queue.get() 369 370 if data is None: 371 await self._conn.drain() 372 373 else: 374 await self._conn.send(data) 375 376 if future and not future.done(): 377 future.set_result(None) 378 379 except ConnectionError: 380 pass 381 382 except Exception as e: 383 mlog.error("send loop error: %s", e, exc_info=e) 384 385 finally: 386 self._close(_abrt_apdu(1)) 387 self._send_queue.close() 388 389 while True: 390 if future and not future.done(): 391 future.set_result(None) 392 if self._send_queue.empty(): 393 break 394 _, future = self._send_queue.get_nowait()
238 def __init__(self, 239 conn: copp.Connection, 240 aarq_apdu: asn1.Value, 241 aare_apdu: asn1.Value, 242 local_ap_title: asn1.ObjectIdentifier | None, 243 remote_ap_title: asn1.ObjectIdentifier | None, 244 local_ae_qualifier: int | None, 245 remote_ae_qualifier: int | None, 246 receive_queue_size: int, 247 send_queue_size: int): 248 aarq_external = aarq_apdu[1]['user-information'][0] 249 aare_external = aare_apdu[1]['user-information'][0] 250 251 conn_req_user_data = ( 252 conn.syntax_names.get_name(aarq_external.indirect_ref), 253 aarq_external.data) 254 conn_res_user_data = ( 255 conn.syntax_names.get_name(aare_external.indirect_ref), 256 aare_external.data) 257 258 self._conn = conn 259 self._conn_req_user_data = conn_req_user_data 260 self._conn_res_user_data = conn_res_user_data 261 self._loop = asyncio.get_running_loop() 262 self._info = ConnectionInfo(local_ap_title=local_ap_title, 263 local_ae_qualifier=local_ae_qualifier, 264 remote_ap_title=remote_ap_title, 265 remote_ae_qualifier=remote_ae_qualifier, 266 **conn.info._asdict()) 267 self._close_apdu = _abrt_apdu(0) 268 self._receive_queue = aio.Queue(receive_queue_size) 269 self._send_queue = aio.Queue(send_queue_size) 270 self._async_group = aio.Group() 271 272 self.async_group.spawn(aio.call_on_cancel, self._on_close) 273 self.async_group.spawn(self._receive_loop) 274 self.async_group.spawn(self._send_loop) 275 self.async_group.spawn(aio.call_on_done, conn.wait_closing(), 276 self.close)
278 @property 279 def async_group(self) -> aio.Group: 280 """Async group""" 281 return self._async_group
Async group
288 @property 289 def conn_req_user_data(self) -> copp.IdentifiedEntity: 290 """Connect request's user data""" 291 return self._conn_req_user_data
Connect request's user data
293 @property 294 def conn_res_user_data(self) -> copp.IdentifiedEntity: 295 """Connect response's user data""" 296 return self._conn_res_user_data
Connect response's user data
298 async def receive(self) -> copp.IdentifiedEntity: 299 """Receive data""" 300 try: 301 return await self._receive_queue.get() 302 303 except aio.QueueClosedError: 304 raise ConnectionError()
Receive data