hat.drivers.acse
Association Controll Service Element
1"""Association Controll Service Element""" 2 3import asyncio 4import importlib.resources 5import logging 6import typing 7 8from hat import aio 9from hat import asn1 10 11from hat.drivers import copp 12from hat.drivers import tcp 13 14 15mlog = logging.getLogger(__name__) 16 17# (joint-iso-itu-t, association-control, abstract-syntax, apdus, version1) 18_acse_syntax_name = (2, 2, 1, 0, 1) 19 20with importlib.resources.as_file(importlib.resources.files(__package__) / 21 'asn1_repo.json') as _path: 22 _encoder = asn1.Encoder(asn1.Encoding.BER, 23 asn1.Repository.from_json(_path)) 24 25 26class ConnectionInfo(typing.NamedTuple): 27 local_addr: tcp.Address 28 local_tsel: int | None 29 local_ssel: int | None 30 local_psel: int | None 31 local_ap_title: asn1.ObjectIdentifier | None 32 local_ae_qualifier: int | None 33 remote_addr: tcp.Address 34 remote_tsel: int | None 35 remote_ssel: int | None 36 remote_psel: int | None 37 remote_ap_title: asn1.ObjectIdentifier | None 38 remote_ae_qualifier: int | None 39 40 41ValidateCb: typing.TypeAlias = aio.AsyncCallable[[copp.SyntaxNames, 42 copp.IdentifiedEntity], 43 copp.IdentifiedEntity | None] 44"""Validate callback""" 45 46ConnectionCb: typing.TypeAlias = aio.AsyncCallable[['Connection'], None] 47"""Connection callback""" 48 49 50async def connect(addr: tcp.Address, 51 syntax_name_list: list[asn1.ObjectIdentifier], 52 app_context_name: asn1.ObjectIdentifier, 53 user_data: copp.IdentifiedEntity | None = None, 54 *, 55 local_ap_title: asn1.ObjectIdentifier | None = None, 56 remote_ap_title: asn1.ObjectIdentifier | None = None, 57 local_ae_qualifier: int | None = None, 58 remote_ae_qualifier: int | None = None, 59 acse_receive_queue_size: int = 1024, 60 acse_send_queue_size: int = 1024, 61 **kwargs 62 ) -> 'Connection': 63 """Connect to ACSE server 64 65 Additional arguments are passed directly to `hat.drivers.copp.connect` 66 (`syntax_names` is set by this coroutine). 67 68 """ 69 syntax_names = copp.SyntaxNames([_acse_syntax_name, *syntax_name_list]) 70 aarq_apdu = _aarq_apdu(syntax_names, app_context_name, 71 local_ap_title, remote_ap_title, 72 local_ae_qualifier, remote_ae_qualifier, 73 user_data) 74 copp_user_data = _acse_syntax_name, _encode(aarq_apdu) 75 conn = await copp.connect(addr, syntax_names, copp_user_data, **kwargs) 76 77 try: 78 aare_apdu_syntax_name, aare_apdu_entity = conn.conn_res_user_data 79 if aare_apdu_syntax_name != _acse_syntax_name: 80 raise Exception("invalid syntax name") 81 82 aare_apdu = _decode(aare_apdu_entity) 83 if aare_apdu[0] != 'aare' or aare_apdu[1]['result'] != 0: 84 raise Exception("invalid apdu") 85 86 calling_ap_title, called_ap_title = _get_ap_titles(aarq_apdu) 87 calling_ae_qualifier, called_ae_qualifier = _get_ae_qualifiers( 88 aarq_apdu) 89 return Connection(conn, aarq_apdu, aare_apdu, 90 calling_ap_title, called_ap_title, 91 calling_ae_qualifier, called_ae_qualifier, 92 acse_receive_queue_size, acse_send_queue_size) 93 94 except Exception: 95 await aio.uncancellable(_close_copp(conn, _abrt_apdu(1))) 96 raise 97 98 99async def listen(validate_cb: ValidateCb, 100 connection_cb: ConnectionCb, 101 addr: tcp.Address = tcp.Address('0.0.0.0', 102), 102 *, 103 bind_connections: bool = False, 104 acse_receive_queue_size: int = 1024, 105 acse_send_queue_size: int = 1024, 106 **kwargs 107 ) -> 'Server': 108 """Create ACSE listening server 109 110 Additional arguments are passed directly to `hat.drivers.copp.listen`. 111 112 Args: 113 validate_cb: callback function or coroutine called on new 114 incomming connection request prior to creating connection object 115 connection_cb: new connection callback 116 addr: local listening address 117 118 """ 119 server = Server() 120 server._validate_cb = validate_cb 121 server._connection_cb = connection_cb 122 server._bind_connections = bind_connections 123 server._receive_queue_size = acse_receive_queue_size 124 server._send_queue_size = acse_send_queue_size 125 126 server._srv = await copp.listen(server._on_validate, 127 server._on_connection, 128 addr, 129 bind_connections=False, 130 **kwargs) 131 132 return server 133 134 135class Server(aio.Resource): 136 """ACSE listening server 137 138 For creating new server see `listen`. 139 140 """ 141 142 @property 143 def async_group(self) -> aio.Group: 144 """Async group""" 145 return self._srv.async_group 146 147 @property 148 def addresses(self) -> list[tcp.Address]: 149 """Listening addresses""" 150 return self._srv.addresses 151 152 async def _on_validate(self, syntax_names, user_data): 153 aarq_apdu_syntax_name, aarq_apdu_entity = user_data 154 if aarq_apdu_syntax_name != _acse_syntax_name: 155 raise Exception('invalid acse syntax name') 156 157 aarq_apdu = _decode(aarq_apdu_entity) 158 if aarq_apdu[0] != 'aarq': 159 raise Exception('not aarq message') 160 161 aarq_external = aarq_apdu[1]['user-information'][0] 162 if aarq_external.direct_ref is not None: 163 if aarq_external.direct_ref != _encoder.syntax_name: 164 raise Exception('invalid encoder identifier') 165 166 _, called_ap_title = _get_ap_titles(aarq_apdu) 167 _, called_ae_qualifier = _get_ae_qualifiers(aarq_apdu) 168 _, called_ap_invocation_identifier = \ 169 _get_ap_invocation_identifiers(aarq_apdu) 170 _, called_ae_invocation_identifier = \ 171 _get_ae_invocation_identifiers(aarq_apdu) 172 173 aarq_user_data = (syntax_names.get_name(aarq_external.indirect_ref), 174 aarq_external.data) 175 176 user_validate_result = await aio.call(self._validate_cb, syntax_names, 177 aarq_user_data) 178 179 aare_apdu = _aare_apdu(syntax_names, 180 user_validate_result, 181 called_ap_title, called_ae_qualifier, 182 called_ap_invocation_identifier, 183 called_ae_invocation_identifier) 184 return _acse_syntax_name, _encode(aare_apdu) 185 186 async def _on_connection(self, copp_conn): 187 try: 188 try: 189 aarq_apdu = _decode(copp_conn.conn_req_user_data[1]) 190 aare_apdu = _decode(copp_conn.conn_res_user_data[1]) 191 192 calling_ap_title, called_ap_title = _get_ap_titles(aarq_apdu) 193 calling_ae_qualifier, called_ae_qualifier = _get_ae_qualifiers( 194 aarq_apdu) 195 196 conn = Connection(copp_conn, aarq_apdu, aare_apdu, 197 calling_ap_title, called_ap_title, 198 calling_ae_qualifier, called_ae_qualifier, 199 self._receive_queue_size, 200 self._send_queue_size) 201 202 except Exception: 203 await aio.uncancellable(_close_copp(copp_conn, _abrt_apdu(1))) 204 raise 205 206 try: 207 await aio.call(self._connection_cb, conn) 208 209 except BaseException: 210 await aio.uncancellable(conn.async_close()) 211 raise 212 213 except Exception as e: 214 mlog.error("error creating new incomming connection: %s", e, 215 exc_info=e) 216 return 217 218 if not self._bind_connections: 219 return 220 221 try: 222 await conn.wait_closed() 223 224 except BaseException: 225 await aio.uncancellable(conn.async_close()) 226 raise 227 228 229class Connection(aio.Resource): 230 """ACSE connection 231 232 For creating new connection see `connect` or `listen`. 233 234 """ 235 236 def __init__(self, 237 conn: copp.Connection, 238 aarq_apdu: asn1.Value, 239 aare_apdu: asn1.Value, 240 local_ap_title: asn1.ObjectIdentifier | None, 241 remote_ap_title: asn1.ObjectIdentifier | None, 242 local_ae_qualifier: int | None, 243 remote_ae_qualifier: int | None, 244 receive_queue_size: int, 245 send_queue_size: int): 246 aarq_external = aarq_apdu[1]['user-information'][0] 247 aare_external = aare_apdu[1]['user-information'][0] 248 249 conn_req_user_data = ( 250 conn.syntax_names.get_name(aarq_external.indirect_ref), 251 aarq_external.data) 252 conn_res_user_data = ( 253 conn.syntax_names.get_name(aare_external.indirect_ref), 254 aare_external.data) 255 256 self._conn = conn 257 self._conn_req_user_data = conn_req_user_data 258 self._conn_res_user_data = conn_res_user_data 259 self._loop = asyncio.get_running_loop() 260 self._info = ConnectionInfo(local_ap_title=local_ap_title, 261 local_ae_qualifier=local_ae_qualifier, 262 remote_ap_title=remote_ap_title, 263 remote_ae_qualifier=remote_ae_qualifier, 264 **conn.info._asdict()) 265 self._close_apdu = _abrt_apdu(0) 266 self._receive_queue = aio.Queue(receive_queue_size) 267 self._send_queue = aio.Queue(send_queue_size) 268 self._async_group = aio.Group() 269 270 self.async_group.spawn(aio.call_on_cancel, self._on_close) 271 self.async_group.spawn(self._receive_loop) 272 self.async_group.spawn(self._send_loop) 273 self.async_group.spawn(aio.call_on_done, conn.wait_closing(), 274 self.close) 275 276 @property 277 def async_group(self) -> aio.Group: 278 """Async group""" 279 return self._async_group 280 281 @property 282 def info(self) -> ConnectionInfo: 283 """Connection info""" 284 return self._info 285 286 @property 287 def conn_req_user_data(self) -> copp.IdentifiedEntity: 288 """Connect request's user data""" 289 return self._conn_req_user_data 290 291 @property 292 def conn_res_user_data(self) -> copp.IdentifiedEntity: 293 """Connect response's user data""" 294 return self._conn_res_user_data 295 296 async def receive(self) -> copp.IdentifiedEntity: 297 """Receive data""" 298 try: 299 return await self._receive_queue.get() 300 301 except aio.QueueClosedError: 302 raise ConnectionError() 303 304 async def send(self, data: copp.IdentifiedEntity): 305 """Send data""" 306 try: 307 await self._send_queue.put((data, None)) 308 309 except aio.QueueClosedError: 310 raise ConnectionError() 311 312 async def drain(self): 313 """Drain output buffer""" 314 try: 315 future = self._loop.create_future() 316 await self._send_queue.put((None, future)) 317 await future 318 319 except aio.QueueClosedError: 320 raise ConnectionError() 321 322 async def _on_close(self): 323 await _close_copp(self._conn, self._close_apdu) 324 325 def _close(self, apdu): 326 if not self.is_open: 327 return 328 329 self._close_apdu = apdu 330 self._async_group.close() 331 332 async def _receive_loop(self): 333 try: 334 while True: 335 syntax_name, entity = await self._conn.receive() 336 337 if syntax_name == _acse_syntax_name: 338 if entity[0] == 'abrt': 339 close_apdu = None 340 341 elif entity[0] == 'rlrq': 342 close_apdu = _rlre_apdu() 343 344 else: 345 close_apdu = _abrt_apdu(1) 346 347 self._close(close_apdu) 348 break 349 350 await self._receive_queue.put((syntax_name, entity)) 351 352 except ConnectionError: 353 pass 354 355 except Exception as e: 356 mlog.error("receive loop error: %s", e, exc_info=e) 357 358 finally: 359 self._close(_abrt_apdu(1)) 360 self._receive_queue.close() 361 362 async def _send_loop(self): 363 future = None 364 try: 365 while True: 366 data, future = await self._send_queue.get() 367 368 if data is None: 369 await self._conn.drain() 370 371 else: 372 await self._conn.send(data) 373 374 if future and not future.done(): 375 future.set_result(None) 376 377 except ConnectionError: 378 pass 379 380 except Exception as e: 381 mlog.error("send loop error: %s", e, exc_info=e) 382 383 finally: 384 self._close(_abrt_apdu(1)) 385 self._send_queue.close() 386 387 while True: 388 if future and not future.done(): 389 future.set_result(None) 390 if self._send_queue.empty(): 391 break 392 _, future = self._send_queue.get_nowait() 393 394 395async def _close_copp(copp_conn, apdu): 396 data = (_acse_syntax_name, _encode(apdu)) if apdu else None 397 await copp_conn.async_close(data) 398 399 400def _get_ap_titles(aarq_apdu): 401 calling = None 402 if 'calling-AP-title' in aarq_apdu[1]: 403 if aarq_apdu[1]['calling-AP-title'][0] == 'ap-title-form2': 404 calling = aarq_apdu[1]['calling-AP-title'][1] 405 406 called = None 407 if 'called-AP-title' in aarq_apdu[1]: 408 if aarq_apdu[1]['called-AP-title'][0] == 'ap-title-form2': 409 called = aarq_apdu[1]['called-AP-title'][1] 410 411 return calling, called 412 413 414def _get_ae_qualifiers(aarq_apdu): 415 calling = None 416 if 'calling-AE-qualifier' in aarq_apdu[1]: 417 if aarq_apdu[1]['calling-AE-qualifier'][0] == 'ap-qualifier-form2': 418 calling = aarq_apdu[1]['calling-AE-qualifier'][1] 419 420 called = None 421 if 'called-AE-qualifier' in aarq_apdu[1]: 422 if aarq_apdu[1]['called-AE-qualifier'][0] == 'ap-qualifier-form2': 423 called = aarq_apdu[1]['called-AE-qualifier'][1] 424 425 return calling, called 426 427 428def _get_ap_invocation_identifiers(aarq_apdu): 429 calling = aarq_apdu[1].get('calling-AP-invocation-identifier') 430 called = aarq_apdu[1].get('called-AP-invocation-identifier') 431 return calling, called 432 433 434def _get_ae_invocation_identifiers(aarq_apdu): 435 calling = aarq_apdu[1].get('calling-AE-invocation-identifier') 436 called = aarq_apdu[1].get('called-AE-invocation-identifier') 437 return calling, called 438 439 440def _aarq_apdu(syntax_names, app_context_name, 441 calling_ap_title, called_ap_title, 442 calling_ae_qualifier, called_ae_qualifier, 443 user_data): 444 aarq_apdu = 'aarq', {'application-context-name': app_context_name} 445 446 if calling_ap_title is not None: 447 aarq_apdu[1]['calling-AP-title'] = 'ap-title-form2', calling_ap_title 448 449 if called_ap_title is not None: 450 aarq_apdu[1]['called-AP-title'] = 'ap-title-form2', called_ap_title 451 452 if calling_ae_qualifier is not None: 453 aarq_apdu[1]['calling-AE-qualifier'] = ('ae-qualifier-form2', 454 calling_ae_qualifier) 455 456 if called_ae_qualifier is not None: 457 aarq_apdu[1]['called-AE-qualifier'] = ('ae-qualifier-form2', 458 called_ae_qualifier) 459 460 if user_data: 461 aarq_apdu[1]['user-information'] = [ 462 asn1.External(direct_ref=_encoder.syntax_name, 463 indirect_ref=syntax_names.get_id(user_data[0]), 464 data=user_data[1])] 465 466 return aarq_apdu 467 468 469def _aare_apdu(syntax_names, user_data, 470 responding_ap_title, responding_ae_qualifier, 471 responding_ap_invocation_identifier, 472 responding_ae_invocation_identifier): 473 aare_apdu = 'aare', { 474 'application-context-name': user_data[0], 475 'result': 0, 476 'result-source-diagnostic': ('acse-service-user', 0), 477 'user-information': [ 478 asn1.External(direct_ref=_encoder.syntax_name, 479 indirect_ref=syntax_names.get_id(user_data[0]), 480 data=user_data[1])]} 481 482 if responding_ap_title is not None: 483 aare_apdu[1]['responding-AP-title'] = ('ap-title-form2', 484 responding_ap_title) 485 486 if responding_ae_qualifier is not None: 487 aare_apdu[1]['responding-AE-qualifier'] = ('ae-qualifier-form2', 488 responding_ae_qualifier) 489 490 if responding_ap_invocation_identifier is not None: 491 aare_apdu[1]['responding-AP-invocation-identifier'] = \ 492 responding_ap_invocation_identifier 493 494 if responding_ae_invocation_identifier is not None: 495 aare_apdu[1]['responding-AE-invocation-identifier'] = \ 496 responding_ae_invocation_identifier 497 498 return aare_apdu 499 500 501def _abrt_apdu(source): 502 return 'abrt', {'abort-source': source} 503 504 505def _rlre_apdu(): 506 return 'rlre', {} 507 508 509def _encode(value): 510 return _encoder.encode_value('ACSE-1', 'ACSE-apdu', value) 511 512 513def _decode(entity): 514 return _encoder.decode_value('ACSE-1', 'ACSE-apdu', entity)
27class ConnectionInfo(typing.NamedTuple): 28 local_addr: tcp.Address 29 local_tsel: int | None 30 local_ssel: int | None 31 local_psel: int | None 32 local_ap_title: asn1.ObjectIdentifier | None 33 local_ae_qualifier: int | None 34 remote_addr: tcp.Address 35 remote_tsel: int | None 36 remote_ssel: int | None 37 remote_psel: int | None 38 remote_ap_title: asn1.ObjectIdentifier | None 39 remote_ae_qualifier: int | None
ConnectionInfo(local_addr, local_tsel, local_ssel, local_psel, local_ap_title, local_ae_qualifier, remote_addr, remote_tsel, remote_ssel, remote_psel, remote_ap_title, remote_ae_qualifier)
Create new instance of ConnectionInfo(local_addr, local_tsel, local_ssel, local_psel, local_ap_title, local_ae_qualifier, remote_addr, remote_tsel, remote_ssel, remote_psel, remote_ap_title, remote_ae_qualifier)
Validate callback
Connection callback
51async def connect(addr: tcp.Address, 52 syntax_name_list: list[asn1.ObjectIdentifier], 53 app_context_name: asn1.ObjectIdentifier, 54 user_data: copp.IdentifiedEntity | None = None, 55 *, 56 local_ap_title: asn1.ObjectIdentifier | None = None, 57 remote_ap_title: asn1.ObjectIdentifier | None = None, 58 local_ae_qualifier: int | None = None, 59 remote_ae_qualifier: int | None = None, 60 acse_receive_queue_size: int = 1024, 61 acse_send_queue_size: int = 1024, 62 **kwargs 63 ) -> 'Connection': 64 """Connect to ACSE server 65 66 Additional arguments are passed directly to `hat.drivers.copp.connect` 67 (`syntax_names` is set by this coroutine). 68 69 """ 70 syntax_names = copp.SyntaxNames([_acse_syntax_name, *syntax_name_list]) 71 aarq_apdu = _aarq_apdu(syntax_names, app_context_name, 72 local_ap_title, remote_ap_title, 73 local_ae_qualifier, remote_ae_qualifier, 74 user_data) 75 copp_user_data = _acse_syntax_name, _encode(aarq_apdu) 76 conn = await copp.connect(addr, syntax_names, copp_user_data, **kwargs) 77 78 try: 79 aare_apdu_syntax_name, aare_apdu_entity = conn.conn_res_user_data 80 if aare_apdu_syntax_name != _acse_syntax_name: 81 raise Exception("invalid syntax name") 82 83 aare_apdu = _decode(aare_apdu_entity) 84 if aare_apdu[0] != 'aare' or aare_apdu[1]['result'] != 0: 85 raise Exception("invalid apdu") 86 87 calling_ap_title, called_ap_title = _get_ap_titles(aarq_apdu) 88 calling_ae_qualifier, called_ae_qualifier = _get_ae_qualifiers( 89 aarq_apdu) 90 return Connection(conn, aarq_apdu, aare_apdu, 91 calling_ap_title, called_ap_title, 92 calling_ae_qualifier, called_ae_qualifier, 93 acse_receive_queue_size, acse_send_queue_size) 94 95 except Exception: 96 await aio.uncancellable(_close_copp(conn, _abrt_apdu(1))) 97 raise
Connect to ACSE server
Additional arguments are passed directly to hat.drivers.copp.connect
(syntax_names
is set by this coroutine).
100async def listen(validate_cb: ValidateCb, 101 connection_cb: ConnectionCb, 102 addr: tcp.Address = tcp.Address('0.0.0.0', 102), 103 *, 104 bind_connections: bool = False, 105 acse_receive_queue_size: int = 1024, 106 acse_send_queue_size: int = 1024, 107 **kwargs 108 ) -> 'Server': 109 """Create ACSE listening server 110 111 Additional arguments are passed directly to `hat.drivers.copp.listen`. 112 113 Args: 114 validate_cb: callback function or coroutine called on new 115 incomming connection request prior to creating connection object 116 connection_cb: new connection callback 117 addr: local listening address 118 119 """ 120 server = Server() 121 server._validate_cb = validate_cb 122 server._connection_cb = connection_cb 123 server._bind_connections = bind_connections 124 server._receive_queue_size = acse_receive_queue_size 125 server._send_queue_size = acse_send_queue_size 126 127 server._srv = await copp.listen(server._on_validate, 128 server._on_connection, 129 addr, 130 bind_connections=False, 131 **kwargs) 132 133 return server
Create ACSE listening server
Additional arguments are passed directly to hat.drivers.copp.listen
.
Arguments:
- validate_cb: callback function or coroutine called on new incomming connection request prior to creating connection object
- connection_cb: new connection callback
- addr: local listening address
136class Server(aio.Resource): 137 """ACSE listening server 138 139 For creating new server see `listen`. 140 141 """ 142 143 @property 144 def async_group(self) -> aio.Group: 145 """Async group""" 146 return self._srv.async_group 147 148 @property 149 def addresses(self) -> list[tcp.Address]: 150 """Listening addresses""" 151 return self._srv.addresses 152 153 async def _on_validate(self, syntax_names, user_data): 154 aarq_apdu_syntax_name, aarq_apdu_entity = user_data 155 if aarq_apdu_syntax_name != _acse_syntax_name: 156 raise Exception('invalid acse syntax name') 157 158 aarq_apdu = _decode(aarq_apdu_entity) 159 if aarq_apdu[0] != 'aarq': 160 raise Exception('not aarq message') 161 162 aarq_external = aarq_apdu[1]['user-information'][0] 163 if aarq_external.direct_ref is not None: 164 if aarq_external.direct_ref != _encoder.syntax_name: 165 raise Exception('invalid encoder identifier') 166 167 _, called_ap_title = _get_ap_titles(aarq_apdu) 168 _, called_ae_qualifier = _get_ae_qualifiers(aarq_apdu) 169 _, called_ap_invocation_identifier = \ 170 _get_ap_invocation_identifiers(aarq_apdu) 171 _, called_ae_invocation_identifier = \ 172 _get_ae_invocation_identifiers(aarq_apdu) 173 174 aarq_user_data = (syntax_names.get_name(aarq_external.indirect_ref), 175 aarq_external.data) 176 177 user_validate_result = await aio.call(self._validate_cb, syntax_names, 178 aarq_user_data) 179 180 aare_apdu = _aare_apdu(syntax_names, 181 user_validate_result, 182 called_ap_title, called_ae_qualifier, 183 called_ap_invocation_identifier, 184 called_ae_invocation_identifier) 185 return _acse_syntax_name, _encode(aare_apdu) 186 187 async def _on_connection(self, copp_conn): 188 try: 189 try: 190 aarq_apdu = _decode(copp_conn.conn_req_user_data[1]) 191 aare_apdu = _decode(copp_conn.conn_res_user_data[1]) 192 193 calling_ap_title, called_ap_title = _get_ap_titles(aarq_apdu) 194 calling_ae_qualifier, called_ae_qualifier = _get_ae_qualifiers( 195 aarq_apdu) 196 197 conn = Connection(copp_conn, aarq_apdu, aare_apdu, 198 calling_ap_title, called_ap_title, 199 calling_ae_qualifier, called_ae_qualifier, 200 self._receive_queue_size, 201 self._send_queue_size) 202 203 except Exception: 204 await aio.uncancellable(_close_copp(copp_conn, _abrt_apdu(1))) 205 raise 206 207 try: 208 await aio.call(self._connection_cb, conn) 209 210 except BaseException: 211 await aio.uncancellable(conn.async_close()) 212 raise 213 214 except Exception as e: 215 mlog.error("error creating new incomming connection: %s", e, 216 exc_info=e) 217 return 218 219 if not self._bind_connections: 220 return 221 222 try: 223 await conn.wait_closed() 224 225 except BaseException: 226 await aio.uncancellable(conn.async_close()) 227 raise
ACSE listening server
For creating new server see listen
.
143 @property 144 def async_group(self) -> aio.Group: 145 """Async group""" 146 return self._srv.async_group
Async group
148 @property 149 def addresses(self) -> list[tcp.Address]: 150 """Listening addresses""" 151 return self._srv.addresses
Listening addresses
230class Connection(aio.Resource): 231 """ACSE connection 232 233 For creating new connection see `connect` or `listen`. 234 235 """ 236 237 def __init__(self, 238 conn: copp.Connection, 239 aarq_apdu: asn1.Value, 240 aare_apdu: asn1.Value, 241 local_ap_title: asn1.ObjectIdentifier | None, 242 remote_ap_title: asn1.ObjectIdentifier | None, 243 local_ae_qualifier: int | None, 244 remote_ae_qualifier: int | None, 245 receive_queue_size: int, 246 send_queue_size: int): 247 aarq_external = aarq_apdu[1]['user-information'][0] 248 aare_external = aare_apdu[1]['user-information'][0] 249 250 conn_req_user_data = ( 251 conn.syntax_names.get_name(aarq_external.indirect_ref), 252 aarq_external.data) 253 conn_res_user_data = ( 254 conn.syntax_names.get_name(aare_external.indirect_ref), 255 aare_external.data) 256 257 self._conn = conn 258 self._conn_req_user_data = conn_req_user_data 259 self._conn_res_user_data = conn_res_user_data 260 self._loop = asyncio.get_running_loop() 261 self._info = ConnectionInfo(local_ap_title=local_ap_title, 262 local_ae_qualifier=local_ae_qualifier, 263 remote_ap_title=remote_ap_title, 264 remote_ae_qualifier=remote_ae_qualifier, 265 **conn.info._asdict()) 266 self._close_apdu = _abrt_apdu(0) 267 self._receive_queue = aio.Queue(receive_queue_size) 268 self._send_queue = aio.Queue(send_queue_size) 269 self._async_group = aio.Group() 270 271 self.async_group.spawn(aio.call_on_cancel, self._on_close) 272 self.async_group.spawn(self._receive_loop) 273 self.async_group.spawn(self._send_loop) 274 self.async_group.spawn(aio.call_on_done, conn.wait_closing(), 275 self.close) 276 277 @property 278 def async_group(self) -> aio.Group: 279 """Async group""" 280 return self._async_group 281 282 @property 283 def info(self) -> ConnectionInfo: 284 """Connection info""" 285 return self._info 286 287 @property 288 def conn_req_user_data(self) -> copp.IdentifiedEntity: 289 """Connect request's user data""" 290 return self._conn_req_user_data 291 292 @property 293 def conn_res_user_data(self) -> copp.IdentifiedEntity: 294 """Connect response's user data""" 295 return self._conn_res_user_data 296 297 async def receive(self) -> copp.IdentifiedEntity: 298 """Receive data""" 299 try: 300 return await self._receive_queue.get() 301 302 except aio.QueueClosedError: 303 raise ConnectionError() 304 305 async def send(self, data: copp.IdentifiedEntity): 306 """Send data""" 307 try: 308 await self._send_queue.put((data, None)) 309 310 except aio.QueueClosedError: 311 raise ConnectionError() 312 313 async def drain(self): 314 """Drain output buffer""" 315 try: 316 future = self._loop.create_future() 317 await self._send_queue.put((None, future)) 318 await future 319 320 except aio.QueueClosedError: 321 raise ConnectionError() 322 323 async def _on_close(self): 324 await _close_copp(self._conn, self._close_apdu) 325 326 def _close(self, apdu): 327 if not self.is_open: 328 return 329 330 self._close_apdu = apdu 331 self._async_group.close() 332 333 async def _receive_loop(self): 334 try: 335 while True: 336 syntax_name, entity = await self._conn.receive() 337 338 if syntax_name == _acse_syntax_name: 339 if entity[0] == 'abrt': 340 close_apdu = None 341 342 elif entity[0] == 'rlrq': 343 close_apdu = _rlre_apdu() 344 345 else: 346 close_apdu = _abrt_apdu(1) 347 348 self._close(close_apdu) 349 break 350 351 await self._receive_queue.put((syntax_name, entity)) 352 353 except ConnectionError: 354 pass 355 356 except Exception as e: 357 mlog.error("receive loop error: %s", e, exc_info=e) 358 359 finally: 360 self._close(_abrt_apdu(1)) 361 self._receive_queue.close() 362 363 async def _send_loop(self): 364 future = None 365 try: 366 while True: 367 data, future = await self._send_queue.get() 368 369 if data is None: 370 await self._conn.drain() 371 372 else: 373 await self._conn.send(data) 374 375 if future and not future.done(): 376 future.set_result(None) 377 378 except ConnectionError: 379 pass 380 381 except Exception as e: 382 mlog.error("send loop error: %s", e, exc_info=e) 383 384 finally: 385 self._close(_abrt_apdu(1)) 386 self._send_queue.close() 387 388 while True: 389 if future and not future.done(): 390 future.set_result(None) 391 if self._send_queue.empty(): 392 break 393 _, future = self._send_queue.get_nowait()
237 def __init__(self, 238 conn: copp.Connection, 239 aarq_apdu: asn1.Value, 240 aare_apdu: asn1.Value, 241 local_ap_title: asn1.ObjectIdentifier | None, 242 remote_ap_title: asn1.ObjectIdentifier | None, 243 local_ae_qualifier: int | None, 244 remote_ae_qualifier: int | None, 245 receive_queue_size: int, 246 send_queue_size: int): 247 aarq_external = aarq_apdu[1]['user-information'][0] 248 aare_external = aare_apdu[1]['user-information'][0] 249 250 conn_req_user_data = ( 251 conn.syntax_names.get_name(aarq_external.indirect_ref), 252 aarq_external.data) 253 conn_res_user_data = ( 254 conn.syntax_names.get_name(aare_external.indirect_ref), 255 aare_external.data) 256 257 self._conn = conn 258 self._conn_req_user_data = conn_req_user_data 259 self._conn_res_user_data = conn_res_user_data 260 self._loop = asyncio.get_running_loop() 261 self._info = ConnectionInfo(local_ap_title=local_ap_title, 262 local_ae_qualifier=local_ae_qualifier, 263 remote_ap_title=remote_ap_title, 264 remote_ae_qualifier=remote_ae_qualifier, 265 **conn.info._asdict()) 266 self._close_apdu = _abrt_apdu(0) 267 self._receive_queue = aio.Queue(receive_queue_size) 268 self._send_queue = aio.Queue(send_queue_size) 269 self._async_group = aio.Group() 270 271 self.async_group.spawn(aio.call_on_cancel, self._on_close) 272 self.async_group.spawn(self._receive_loop) 273 self.async_group.spawn(self._send_loop) 274 self.async_group.spawn(aio.call_on_done, conn.wait_closing(), 275 self.close)
277 @property 278 def async_group(self) -> aio.Group: 279 """Async group""" 280 return self._async_group
Async group
287 @property 288 def conn_req_user_data(self) -> copp.IdentifiedEntity: 289 """Connect request's user data""" 290 return self._conn_req_user_data
Connect request's user data
292 @property 293 def conn_res_user_data(self) -> copp.IdentifiedEntity: 294 """Connect response's user data""" 295 return self._conn_res_user_data
Connect response's user data
297 async def receive(self) -> copp.IdentifiedEntity: 298 """Receive data""" 299 try: 300 return await self._receive_queue.get() 301 302 except aio.QueueClosedError: 303 raise ConnectionError()
Receive data