hat.drivers.acse

Association Controll Service Element

  1"""Association Controll Service Element"""
  2
  3import asyncio
  4import importlib.resources
  5import logging
  6import typing
  7
  8from hat import aio
  9from hat import asn1
 10
 11from hat.drivers import copp
 12from hat.drivers import tcp
 13
 14
 15mlog = logging.getLogger(__name__)
 16
 17# (joint-iso-itu-t, association-control, abstract-syntax, apdus, version1)
 18_acse_syntax_name = (2, 2, 1, 0, 1)
 19
 20with importlib.resources.as_file(importlib.resources.files(__package__) /
 21                                 'asn1_repo.json') as _path:
 22    _encoder = asn1.Encoder(asn1.Encoding.BER,
 23                            asn1.Repository.from_json(_path))
 24
 25
 26class ConnectionInfo(typing.NamedTuple):
 27    local_addr: tcp.Address
 28    local_tsel: int | None
 29    local_ssel: int | None
 30    local_psel: int | None
 31    local_ap_title: asn1.ObjectIdentifier | None
 32    local_ae_qualifier: int | None
 33    remote_addr: tcp.Address
 34    remote_tsel: int | None
 35    remote_ssel: int | None
 36    remote_psel: int | None
 37    remote_ap_title: asn1.ObjectIdentifier | None
 38    remote_ae_qualifier: int | None
 39
 40
 41ValidateCb: typing.TypeAlias = aio.AsyncCallable[[copp.SyntaxNames,
 42                                                  copp.IdentifiedEntity],
 43                                                 copp.IdentifiedEntity | None]
 44"""Validate callback"""
 45
 46ConnectionCb: typing.TypeAlias = aio.AsyncCallable[['Connection'], None]
 47"""Connection callback"""
 48
 49
 50async def connect(addr: tcp.Address,
 51                  syntax_name_list: list[asn1.ObjectIdentifier],
 52                  app_context_name: asn1.ObjectIdentifier,
 53                  user_data: copp.IdentifiedEntity | None = None,
 54                  *,
 55                  local_ap_title: asn1.ObjectIdentifier | None = None,
 56                  remote_ap_title: asn1.ObjectIdentifier | None = None,
 57                  local_ae_qualifier: int | None = None,
 58                  remote_ae_qualifier: int | None = None,
 59                  acse_receive_queue_size: int = 1024,
 60                  acse_send_queue_size: int = 1024,
 61                  **kwargs
 62                  ) -> 'Connection':
 63    """Connect to ACSE server
 64
 65    Additional arguments are passed directly to `hat.drivers.copp.connect`
 66    (`syntax_names` is set by this coroutine).
 67
 68    """
 69    syntax_names = copp.SyntaxNames([_acse_syntax_name, *syntax_name_list])
 70    aarq_apdu = _aarq_apdu(syntax_names, app_context_name,
 71                           local_ap_title, remote_ap_title,
 72                           local_ae_qualifier, remote_ae_qualifier,
 73                           user_data)
 74    copp_user_data = _acse_syntax_name, _encode(aarq_apdu)
 75    conn = await copp.connect(addr, syntax_names, copp_user_data, **kwargs)
 76
 77    try:
 78        aare_apdu_syntax_name, aare_apdu_entity = conn.conn_res_user_data
 79        if aare_apdu_syntax_name != _acse_syntax_name:
 80            raise Exception("invalid syntax name")
 81
 82        aare_apdu = _decode(aare_apdu_entity)
 83        if aare_apdu[0] != 'aare' or aare_apdu[1]['result'] != 0:
 84            raise Exception("invalid apdu")
 85
 86        calling_ap_title, called_ap_title = _get_ap_titles(aarq_apdu)
 87        calling_ae_qualifier, called_ae_qualifier = _get_ae_qualifiers(
 88            aarq_apdu)
 89        return Connection(conn, aarq_apdu, aare_apdu,
 90                          calling_ap_title, called_ap_title,
 91                          calling_ae_qualifier, called_ae_qualifier,
 92                          acse_receive_queue_size, acse_send_queue_size)
 93
 94    except Exception:
 95        await aio.uncancellable(_close_copp(conn, _abrt_apdu(1)))
 96        raise
 97
 98
 99async def listen(validate_cb: ValidateCb,
100                 connection_cb: ConnectionCb,
101                 addr: tcp.Address = tcp.Address('0.0.0.0', 102),
102                 *,
103                 bind_connections: bool = False,
104                 acse_receive_queue_size: int = 1024,
105                 acse_send_queue_size: int = 1024,
106                 **kwargs
107                 ) -> 'Server':
108    """Create ACSE listening server
109
110    Additional arguments are passed directly to `hat.drivers.copp.listen`.
111
112    Args:
113        validate_cb: callback function or coroutine called on new
114            incomming connection request prior to creating connection object
115        connection_cb: new connection callback
116        addr: local listening address
117
118    """
119    server = Server()
120    server._validate_cb = validate_cb
121    server._connection_cb = connection_cb
122    server._bind_connections = bind_connections
123    server._receive_queue_size = acse_receive_queue_size
124    server._send_queue_size = acse_send_queue_size
125
126    server._srv = await copp.listen(server._on_validate,
127                                    server._on_connection,
128                                    addr,
129                                    bind_connections=False,
130                                    **kwargs)
131
132    return server
133
134
135class Server(aio.Resource):
136    """ACSE listening server
137
138    For creating new server see `listen`.
139
140    """
141
142    @property
143    def async_group(self) -> aio.Group:
144        """Async group"""
145        return self._srv.async_group
146
147    @property
148    def addresses(self) -> list[tcp.Address]:
149        """Listening addresses"""
150        return self._srv.addresses
151
152    async def _on_validate(self, syntax_names, user_data):
153        aarq_apdu_syntax_name, aarq_apdu_entity = user_data
154        if aarq_apdu_syntax_name != _acse_syntax_name:
155            raise Exception('invalid acse syntax name')
156
157        aarq_apdu = _decode(aarq_apdu_entity)
158        if aarq_apdu[0] != 'aarq':
159            raise Exception('not aarq message')
160
161        aarq_external = aarq_apdu[1]['user-information'][0]
162        if aarq_external.direct_ref is not None:
163            if aarq_external.direct_ref != _encoder.syntax_name:
164                raise Exception('invalid encoder identifier')
165
166        _, called_ap_title = _get_ap_titles(aarq_apdu)
167        _, called_ae_qualifier = _get_ae_qualifiers(aarq_apdu)
168        _, called_ap_invocation_identifier = \
169            _get_ap_invocation_identifiers(aarq_apdu)
170        _, called_ae_invocation_identifier = \
171            _get_ae_invocation_identifiers(aarq_apdu)
172
173        aarq_user_data = (syntax_names.get_name(aarq_external.indirect_ref),
174                          aarq_external.data)
175
176        user_validate_result = await aio.call(self._validate_cb, syntax_names,
177                                              aarq_user_data)
178
179        aare_apdu = _aare_apdu(syntax_names,
180                               user_validate_result,
181                               called_ap_title, called_ae_qualifier,
182                               called_ap_invocation_identifier,
183                               called_ae_invocation_identifier)
184        return _acse_syntax_name, _encode(aare_apdu)
185
186    async def _on_connection(self, copp_conn):
187        try:
188            try:
189                aarq_apdu = _decode(copp_conn.conn_req_user_data[1])
190                aare_apdu = _decode(copp_conn.conn_res_user_data[1])
191
192                calling_ap_title, called_ap_title = _get_ap_titles(aarq_apdu)
193                calling_ae_qualifier, called_ae_qualifier = _get_ae_qualifiers(
194                    aarq_apdu)
195
196                conn = Connection(copp_conn, aarq_apdu, aare_apdu,
197                                  calling_ap_title, called_ap_title,
198                                  calling_ae_qualifier, called_ae_qualifier,
199                                  self._receive_queue_size,
200                                  self._send_queue_size)
201
202            except Exception:
203                await aio.uncancellable(_close_copp(copp_conn, _abrt_apdu(1)))
204                raise
205
206            try:
207                await aio.call(self._connection_cb, conn)
208
209            except BaseException:
210                await aio.uncancellable(conn.async_close())
211                raise
212
213        except Exception as e:
214            mlog.error("error creating new incomming connection: %s", e,
215                       exc_info=e)
216            return
217
218        if not self._bind_connections:
219            return
220
221        try:
222            await conn.wait_closed()
223
224        except BaseException:
225            await aio.uncancellable(conn.async_close())
226            raise
227
228
229class Connection(aio.Resource):
230    """ACSE connection
231
232    For creating new connection see `connect` or `listen`.
233
234    """
235
236    def __init__(self,
237                 conn: copp.Connection,
238                 aarq_apdu: asn1.Value,
239                 aare_apdu: asn1.Value,
240                 local_ap_title: asn1.ObjectIdentifier | None,
241                 remote_ap_title: asn1.ObjectIdentifier | None,
242                 local_ae_qualifier: int | None,
243                 remote_ae_qualifier: int | None,
244                 receive_queue_size: int,
245                 send_queue_size: int):
246        aarq_external = aarq_apdu[1]['user-information'][0]
247        aare_external = aare_apdu[1]['user-information'][0]
248
249        conn_req_user_data = (
250            conn.syntax_names.get_name(aarq_external.indirect_ref),
251            aarq_external.data)
252        conn_res_user_data = (
253            conn.syntax_names.get_name(aare_external.indirect_ref),
254            aare_external.data)
255
256        self._conn = conn
257        self._conn_req_user_data = conn_req_user_data
258        self._conn_res_user_data = conn_res_user_data
259        self._loop = asyncio.get_running_loop()
260        self._info = ConnectionInfo(local_ap_title=local_ap_title,
261                                    local_ae_qualifier=local_ae_qualifier,
262                                    remote_ap_title=remote_ap_title,
263                                    remote_ae_qualifier=remote_ae_qualifier,
264                                    **conn.info._asdict())
265        self._close_apdu = _abrt_apdu(0)
266        self._receive_queue = aio.Queue(receive_queue_size)
267        self._send_queue = aio.Queue(send_queue_size)
268        self._async_group = aio.Group()
269
270        self.async_group.spawn(aio.call_on_cancel, self._on_close)
271        self.async_group.spawn(self._receive_loop)
272        self.async_group.spawn(self._send_loop)
273        self.async_group.spawn(aio.call_on_done, conn.wait_closing(),
274                               self.close)
275
276    @property
277    def async_group(self) -> aio.Group:
278        """Async group"""
279        return self._async_group
280
281    @property
282    def info(self) -> ConnectionInfo:
283        """Connection info"""
284        return self._info
285
286    @property
287    def conn_req_user_data(self) -> copp.IdentifiedEntity:
288        """Connect request's user data"""
289        return self._conn_req_user_data
290
291    @property
292    def conn_res_user_data(self) -> copp.IdentifiedEntity:
293        """Connect response's user data"""
294        return self._conn_res_user_data
295
296    async def receive(self) -> copp.IdentifiedEntity:
297        """Receive data"""
298        try:
299            return await self._receive_queue.get()
300
301        except aio.QueueClosedError:
302            raise ConnectionError()
303
304    async def send(self, data: copp.IdentifiedEntity):
305        """Send data"""
306        try:
307            await self._send_queue.put((data, None))
308
309        except aio.QueueClosedError:
310            raise ConnectionError()
311
312    async def drain(self):
313        """Drain output buffer"""
314        try:
315            future = self._loop.create_future()
316            await self._send_queue.put((None, future))
317            await future
318
319        except aio.QueueClosedError:
320            raise ConnectionError()
321
322    async def _on_close(self):
323        await _close_copp(self._conn, self._close_apdu)
324
325    def _close(self, apdu):
326        if not self.is_open:
327            return
328
329        self._close_apdu = apdu
330        self._async_group.close()
331
332    async def _receive_loop(self):
333        try:
334            while True:
335                syntax_name, entity = await self._conn.receive()
336
337                if syntax_name == _acse_syntax_name:
338                    if entity[0] == 'abrt':
339                        close_apdu = None
340
341                    elif entity[0] == 'rlrq':
342                        close_apdu = _rlre_apdu()
343
344                    else:
345                        close_apdu = _abrt_apdu(1)
346
347                    self._close(close_apdu)
348                    break
349
350                await self._receive_queue.put((syntax_name, entity))
351
352        except ConnectionError:
353            pass
354
355        except Exception as e:
356            mlog.error("receive loop error: %s", e, exc_info=e)
357
358        finally:
359            self._close(_abrt_apdu(1))
360            self._receive_queue.close()
361
362    async def _send_loop(self):
363        future = None
364        try:
365            while True:
366                data, future = await self._send_queue.get()
367
368                if data is None:
369                    await self._conn.drain()
370
371                else:
372                    await self._conn.send(data)
373
374                if future and not future.done():
375                    future.set_result(None)
376
377        except ConnectionError:
378            pass
379
380        except Exception as e:
381            mlog.error("send loop error: %s", e, exc_info=e)
382
383        finally:
384            self._close(_abrt_apdu(1))
385            self._send_queue.close()
386
387            while True:
388                if future and not future.done():
389                    future.set_result(None)
390                if self._send_queue.empty():
391                    break
392                _, future = self._send_queue.get_nowait()
393
394
395async def _close_copp(copp_conn, apdu):
396    data = (_acse_syntax_name, _encode(apdu)) if apdu else None
397    await copp_conn.async_close(data)
398
399
400def _get_ap_titles(aarq_apdu):
401    calling = None
402    if 'calling-AP-title' in aarq_apdu[1]:
403        if aarq_apdu[1]['calling-AP-title'][0] == 'ap-title-form2':
404            calling = aarq_apdu[1]['calling-AP-title'][1]
405
406    called = None
407    if 'called-AP-title' in aarq_apdu[1]:
408        if aarq_apdu[1]['called-AP-title'][0] == 'ap-title-form2':
409            called = aarq_apdu[1]['called-AP-title'][1]
410
411    return calling, called
412
413
414def _get_ae_qualifiers(aarq_apdu):
415    calling = None
416    if 'calling-AE-qualifier' in aarq_apdu[1]:
417        if aarq_apdu[1]['calling-AE-qualifier'][0] == 'ap-qualifier-form2':
418            calling = aarq_apdu[1]['calling-AE-qualifier'][1]
419
420    called = None
421    if 'called-AE-qualifier' in aarq_apdu[1]:
422        if aarq_apdu[1]['called-AE-qualifier'][0] == 'ap-qualifier-form2':
423            called = aarq_apdu[1]['called-AE-qualifier'][1]
424
425    return calling, called
426
427
428def _get_ap_invocation_identifiers(aarq_apdu):
429    calling = aarq_apdu[1].get('calling-AP-invocation-identifier')
430    called = aarq_apdu[1].get('called-AP-invocation-identifier')
431    return calling, called
432
433
434def _get_ae_invocation_identifiers(aarq_apdu):
435    calling = aarq_apdu[1].get('calling-AE-invocation-identifier')
436    called = aarq_apdu[1].get('called-AE-invocation-identifier')
437    return calling, called
438
439
440def _aarq_apdu(syntax_names, app_context_name,
441               calling_ap_title, called_ap_title,
442               calling_ae_qualifier, called_ae_qualifier,
443               user_data):
444    aarq_apdu = 'aarq', {'application-context-name': app_context_name}
445
446    if calling_ap_title is not None:
447        aarq_apdu[1]['calling-AP-title'] = 'ap-title-form2', calling_ap_title
448
449    if called_ap_title is not None:
450        aarq_apdu[1]['called-AP-title'] = 'ap-title-form2', called_ap_title
451
452    if calling_ae_qualifier is not None:
453        aarq_apdu[1]['calling-AE-qualifier'] = ('ae-qualifier-form2',
454                                                calling_ae_qualifier)
455
456    if called_ae_qualifier is not None:
457        aarq_apdu[1]['called-AE-qualifier'] = ('ae-qualifier-form2',
458                                               called_ae_qualifier)
459
460    if user_data:
461        aarq_apdu[1]['user-information'] = [
462            asn1.External(direct_ref=_encoder.syntax_name,
463                          indirect_ref=syntax_names.get_id(user_data[0]),
464                          data=user_data[1])]
465
466    return aarq_apdu
467
468
469def _aare_apdu(syntax_names, user_data,
470               responding_ap_title, responding_ae_qualifier,
471               responding_ap_invocation_identifier,
472               responding_ae_invocation_identifier):
473    aare_apdu = 'aare', {
474        'application-context-name': user_data[0],
475        'result': 0,
476        'result-source-diagnostic': ('acse-service-user', 0),
477        'user-information': [
478            asn1.External(direct_ref=_encoder.syntax_name,
479                          indirect_ref=syntax_names.get_id(user_data[0]),
480                          data=user_data[1])]}
481
482    if responding_ap_title is not None:
483        aare_apdu[1]['responding-AP-title'] = ('ap-title-form2',
484                                               responding_ap_title)
485
486    if responding_ae_qualifier is not None:
487        aare_apdu[1]['responding-AE-qualifier'] = ('ae-qualifier-form2',
488                                                   responding_ae_qualifier)
489
490    if responding_ap_invocation_identifier is not None:
491        aare_apdu[1]['responding-AP-invocation-identifier'] = \
492            responding_ap_invocation_identifier
493
494    if responding_ae_invocation_identifier is not None:
495        aare_apdu[1]['responding-AE-invocation-identifier'] = \
496            responding_ae_invocation_identifier
497
498    return aare_apdu
499
500
501def _abrt_apdu(source):
502    return 'abrt', {'abort-source': source}
503
504
505def _rlre_apdu():
506    return 'rlre', {}
507
508
509def _encode(value):
510    return _encoder.encode_value('ACSE-1', 'ACSE-apdu', value)
511
512
513def _decode(entity):
514    return _encoder.decode_value('ACSE-1', 'ACSE-apdu', entity)
mlog = <Logger hat.drivers.acse (WARNING)>
class ConnectionInfo(typing.NamedTuple):
27class ConnectionInfo(typing.NamedTuple):
28    local_addr: tcp.Address
29    local_tsel: int | None
30    local_ssel: int | None
31    local_psel: int | None
32    local_ap_title: asn1.ObjectIdentifier | None
33    local_ae_qualifier: int | None
34    remote_addr: tcp.Address
35    remote_tsel: int | None
36    remote_ssel: int | None
37    remote_psel: int | None
38    remote_ap_title: asn1.ObjectIdentifier | None
39    remote_ae_qualifier: int | None

ConnectionInfo(local_addr, local_tsel, local_ssel, local_psel, local_ap_title, local_ae_qualifier, remote_addr, remote_tsel, remote_ssel, remote_psel, remote_ap_title, remote_ae_qualifier)

ConnectionInfo( local_addr: hat.drivers.tcp.Address, local_tsel: int | None, local_ssel: int | None, local_psel: int | None, local_ap_title: tuple[int, ...] | None, local_ae_qualifier: int | None, remote_addr: hat.drivers.tcp.Address, remote_tsel: int | None, remote_ssel: int | None, remote_psel: int | None, remote_ap_title: tuple[int, ...] | None, remote_ae_qualifier: int | None)

Create new instance of ConnectionInfo(local_addr, local_tsel, local_ssel, local_psel, local_ap_title, local_ae_qualifier, remote_addr, remote_tsel, remote_ssel, remote_psel, remote_ap_title, remote_ae_qualifier)

Alias for field number 0

local_tsel: int | None

Alias for field number 1

local_ssel: int | None

Alias for field number 2

local_psel: int | None

Alias for field number 3

local_ap_title: tuple[int, ...] | None

Alias for field number 4

local_ae_qualifier: int | None

Alias for field number 5

Alias for field number 6

remote_tsel: int | None

Alias for field number 7

remote_ssel: int | None

Alias for field number 8

remote_psel: int | None

Alias for field number 9

remote_ap_title: tuple[int, ...] | None

Alias for field number 10

remote_ae_qualifier: int | None

Alias for field number 11

ValidateCb: TypeAlias = Callable[[hat.drivers.copp.SyntaxNames, tuple[tuple[int, ...], hat.asn1.common.Entity]], Union[tuple[tuple[int, ...], hat.asn1.common.Entity], NoneType, Awaitable[tuple[tuple[int, ...], hat.asn1.common.Entity] | None]]]

Validate callback

ConnectionCb: TypeAlias = Callable[[ForwardRef('Connection')], Optional[Awaitable[NoneType]]]

Connection callback

async def connect( addr: hat.drivers.tcp.Address, syntax_name_list: list[tuple[int, ...]], app_context_name: tuple[int, ...], user_data: tuple[tuple[int, ...], hat.asn1.common.Entity] | None = None, *, local_ap_title: tuple[int, ...] | None = None, remote_ap_title: tuple[int, ...] | None = None, local_ae_qualifier: int | None = None, remote_ae_qualifier: int | None = None, acse_receive_queue_size: int = 1024, acse_send_queue_size: int = 1024, **kwargs) -> Connection:
51async def connect(addr: tcp.Address,
52                  syntax_name_list: list[asn1.ObjectIdentifier],
53                  app_context_name: asn1.ObjectIdentifier,
54                  user_data: copp.IdentifiedEntity | None = None,
55                  *,
56                  local_ap_title: asn1.ObjectIdentifier | None = None,
57                  remote_ap_title: asn1.ObjectIdentifier | None = None,
58                  local_ae_qualifier: int | None = None,
59                  remote_ae_qualifier: int | None = None,
60                  acse_receive_queue_size: int = 1024,
61                  acse_send_queue_size: int = 1024,
62                  **kwargs
63                  ) -> 'Connection':
64    """Connect to ACSE server
65
66    Additional arguments are passed directly to `hat.drivers.copp.connect`
67    (`syntax_names` is set by this coroutine).
68
69    """
70    syntax_names = copp.SyntaxNames([_acse_syntax_name, *syntax_name_list])
71    aarq_apdu = _aarq_apdu(syntax_names, app_context_name,
72                           local_ap_title, remote_ap_title,
73                           local_ae_qualifier, remote_ae_qualifier,
74                           user_data)
75    copp_user_data = _acse_syntax_name, _encode(aarq_apdu)
76    conn = await copp.connect(addr, syntax_names, copp_user_data, **kwargs)
77
78    try:
79        aare_apdu_syntax_name, aare_apdu_entity = conn.conn_res_user_data
80        if aare_apdu_syntax_name != _acse_syntax_name:
81            raise Exception("invalid syntax name")
82
83        aare_apdu = _decode(aare_apdu_entity)
84        if aare_apdu[0] != 'aare' or aare_apdu[1]['result'] != 0:
85            raise Exception("invalid apdu")
86
87        calling_ap_title, called_ap_title = _get_ap_titles(aarq_apdu)
88        calling_ae_qualifier, called_ae_qualifier = _get_ae_qualifiers(
89            aarq_apdu)
90        return Connection(conn, aarq_apdu, aare_apdu,
91                          calling_ap_title, called_ap_title,
92                          calling_ae_qualifier, called_ae_qualifier,
93                          acse_receive_queue_size, acse_send_queue_size)
94
95    except Exception:
96        await aio.uncancellable(_close_copp(conn, _abrt_apdu(1)))
97        raise

Connect to ACSE server

Additional arguments are passed directly to hat.drivers.copp.connect (syntax_names is set by this coroutine).

async def listen( validate_cb: Callable[[hat.drivers.copp.SyntaxNames, tuple[tuple[int, ...], hat.asn1.common.Entity]], Union[tuple[tuple[int, ...], hat.asn1.common.Entity], NoneType, Awaitable[tuple[tuple[int, ...], hat.asn1.common.Entity] | None]]], connection_cb: Callable[[Connection], Optional[Awaitable[NoneType]]], addr: hat.drivers.tcp.Address = Address(host='0.0.0.0', port=102), *, bind_connections: bool = False, acse_receive_queue_size: int = 1024, acse_send_queue_size: int = 1024, **kwargs) -> Server:
100async def listen(validate_cb: ValidateCb,
101                 connection_cb: ConnectionCb,
102                 addr: tcp.Address = tcp.Address('0.0.0.0', 102),
103                 *,
104                 bind_connections: bool = False,
105                 acse_receive_queue_size: int = 1024,
106                 acse_send_queue_size: int = 1024,
107                 **kwargs
108                 ) -> 'Server':
109    """Create ACSE listening server
110
111    Additional arguments are passed directly to `hat.drivers.copp.listen`.
112
113    Args:
114        validate_cb: callback function or coroutine called on new
115            incomming connection request prior to creating connection object
116        connection_cb: new connection callback
117        addr: local listening address
118
119    """
120    server = Server()
121    server._validate_cb = validate_cb
122    server._connection_cb = connection_cb
123    server._bind_connections = bind_connections
124    server._receive_queue_size = acse_receive_queue_size
125    server._send_queue_size = acse_send_queue_size
126
127    server._srv = await copp.listen(server._on_validate,
128                                    server._on_connection,
129                                    addr,
130                                    bind_connections=False,
131                                    **kwargs)
132
133    return server

Create ACSE listening server

Additional arguments are passed directly to hat.drivers.copp.listen.

Arguments:
  • validate_cb: callback function or coroutine called on new incomming connection request prior to creating connection object
  • connection_cb: new connection callback
  • addr: local listening address
class Server(hat.aio.group.Resource):
136class Server(aio.Resource):
137    """ACSE listening server
138
139    For creating new server see `listen`.
140
141    """
142
143    @property
144    def async_group(self) -> aio.Group:
145        """Async group"""
146        return self._srv.async_group
147
148    @property
149    def addresses(self) -> list[tcp.Address]:
150        """Listening addresses"""
151        return self._srv.addresses
152
153    async def _on_validate(self, syntax_names, user_data):
154        aarq_apdu_syntax_name, aarq_apdu_entity = user_data
155        if aarq_apdu_syntax_name != _acse_syntax_name:
156            raise Exception('invalid acse syntax name')
157
158        aarq_apdu = _decode(aarq_apdu_entity)
159        if aarq_apdu[0] != 'aarq':
160            raise Exception('not aarq message')
161
162        aarq_external = aarq_apdu[1]['user-information'][0]
163        if aarq_external.direct_ref is not None:
164            if aarq_external.direct_ref != _encoder.syntax_name:
165                raise Exception('invalid encoder identifier')
166
167        _, called_ap_title = _get_ap_titles(aarq_apdu)
168        _, called_ae_qualifier = _get_ae_qualifiers(aarq_apdu)
169        _, called_ap_invocation_identifier = \
170            _get_ap_invocation_identifiers(aarq_apdu)
171        _, called_ae_invocation_identifier = \
172            _get_ae_invocation_identifiers(aarq_apdu)
173
174        aarq_user_data = (syntax_names.get_name(aarq_external.indirect_ref),
175                          aarq_external.data)
176
177        user_validate_result = await aio.call(self._validate_cb, syntax_names,
178                                              aarq_user_data)
179
180        aare_apdu = _aare_apdu(syntax_names,
181                               user_validate_result,
182                               called_ap_title, called_ae_qualifier,
183                               called_ap_invocation_identifier,
184                               called_ae_invocation_identifier)
185        return _acse_syntax_name, _encode(aare_apdu)
186
187    async def _on_connection(self, copp_conn):
188        try:
189            try:
190                aarq_apdu = _decode(copp_conn.conn_req_user_data[1])
191                aare_apdu = _decode(copp_conn.conn_res_user_data[1])
192
193                calling_ap_title, called_ap_title = _get_ap_titles(aarq_apdu)
194                calling_ae_qualifier, called_ae_qualifier = _get_ae_qualifiers(
195                    aarq_apdu)
196
197                conn = Connection(copp_conn, aarq_apdu, aare_apdu,
198                                  calling_ap_title, called_ap_title,
199                                  calling_ae_qualifier, called_ae_qualifier,
200                                  self._receive_queue_size,
201                                  self._send_queue_size)
202
203            except Exception:
204                await aio.uncancellable(_close_copp(copp_conn, _abrt_apdu(1)))
205                raise
206
207            try:
208                await aio.call(self._connection_cb, conn)
209
210            except BaseException:
211                await aio.uncancellable(conn.async_close())
212                raise
213
214        except Exception as e:
215            mlog.error("error creating new incomming connection: %s", e,
216                       exc_info=e)
217            return
218
219        if not self._bind_connections:
220            return
221
222        try:
223            await conn.wait_closed()
224
225        except BaseException:
226            await aio.uncancellable(conn.async_close())
227            raise

ACSE listening server

For creating new server see listen.

async_group: hat.aio.group.Group
143    @property
144    def async_group(self) -> aio.Group:
145        """Async group"""
146        return self._srv.async_group

Async group

addresses: list[hat.drivers.tcp.Address]
148    @property
149    def addresses(self) -> list[tcp.Address]:
150        """Listening addresses"""
151        return self._srv.addresses

Listening addresses

class Connection(hat.aio.group.Resource):
230class Connection(aio.Resource):
231    """ACSE connection
232
233    For creating new connection see `connect` or `listen`.
234
235    """
236
237    def __init__(self,
238                 conn: copp.Connection,
239                 aarq_apdu: asn1.Value,
240                 aare_apdu: asn1.Value,
241                 local_ap_title: asn1.ObjectIdentifier | None,
242                 remote_ap_title: asn1.ObjectIdentifier | None,
243                 local_ae_qualifier: int | None,
244                 remote_ae_qualifier: int | None,
245                 receive_queue_size: int,
246                 send_queue_size: int):
247        aarq_external = aarq_apdu[1]['user-information'][0]
248        aare_external = aare_apdu[1]['user-information'][0]
249
250        conn_req_user_data = (
251            conn.syntax_names.get_name(aarq_external.indirect_ref),
252            aarq_external.data)
253        conn_res_user_data = (
254            conn.syntax_names.get_name(aare_external.indirect_ref),
255            aare_external.data)
256
257        self._conn = conn
258        self._conn_req_user_data = conn_req_user_data
259        self._conn_res_user_data = conn_res_user_data
260        self._loop = asyncio.get_running_loop()
261        self._info = ConnectionInfo(local_ap_title=local_ap_title,
262                                    local_ae_qualifier=local_ae_qualifier,
263                                    remote_ap_title=remote_ap_title,
264                                    remote_ae_qualifier=remote_ae_qualifier,
265                                    **conn.info._asdict())
266        self._close_apdu = _abrt_apdu(0)
267        self._receive_queue = aio.Queue(receive_queue_size)
268        self._send_queue = aio.Queue(send_queue_size)
269        self._async_group = aio.Group()
270
271        self.async_group.spawn(aio.call_on_cancel, self._on_close)
272        self.async_group.spawn(self._receive_loop)
273        self.async_group.spawn(self._send_loop)
274        self.async_group.spawn(aio.call_on_done, conn.wait_closing(),
275                               self.close)
276
277    @property
278    def async_group(self) -> aio.Group:
279        """Async group"""
280        return self._async_group
281
282    @property
283    def info(self) -> ConnectionInfo:
284        """Connection info"""
285        return self._info
286
287    @property
288    def conn_req_user_data(self) -> copp.IdentifiedEntity:
289        """Connect request's user data"""
290        return self._conn_req_user_data
291
292    @property
293    def conn_res_user_data(self) -> copp.IdentifiedEntity:
294        """Connect response's user data"""
295        return self._conn_res_user_data
296
297    async def receive(self) -> copp.IdentifiedEntity:
298        """Receive data"""
299        try:
300            return await self._receive_queue.get()
301
302        except aio.QueueClosedError:
303            raise ConnectionError()
304
305    async def send(self, data: copp.IdentifiedEntity):
306        """Send data"""
307        try:
308            await self._send_queue.put((data, None))
309
310        except aio.QueueClosedError:
311            raise ConnectionError()
312
313    async def drain(self):
314        """Drain output buffer"""
315        try:
316            future = self._loop.create_future()
317            await self._send_queue.put((None, future))
318            await future
319
320        except aio.QueueClosedError:
321            raise ConnectionError()
322
323    async def _on_close(self):
324        await _close_copp(self._conn, self._close_apdu)
325
326    def _close(self, apdu):
327        if not self.is_open:
328            return
329
330        self._close_apdu = apdu
331        self._async_group.close()
332
333    async def _receive_loop(self):
334        try:
335            while True:
336                syntax_name, entity = await self._conn.receive()
337
338                if syntax_name == _acse_syntax_name:
339                    if entity[0] == 'abrt':
340                        close_apdu = None
341
342                    elif entity[0] == 'rlrq':
343                        close_apdu = _rlre_apdu()
344
345                    else:
346                        close_apdu = _abrt_apdu(1)
347
348                    self._close(close_apdu)
349                    break
350
351                await self._receive_queue.put((syntax_name, entity))
352
353        except ConnectionError:
354            pass
355
356        except Exception as e:
357            mlog.error("receive loop error: %s", e, exc_info=e)
358
359        finally:
360            self._close(_abrt_apdu(1))
361            self._receive_queue.close()
362
363    async def _send_loop(self):
364        future = None
365        try:
366            while True:
367                data, future = await self._send_queue.get()
368
369                if data is None:
370                    await self._conn.drain()
371
372                else:
373                    await self._conn.send(data)
374
375                if future and not future.done():
376                    future.set_result(None)
377
378        except ConnectionError:
379            pass
380
381        except Exception as e:
382            mlog.error("send loop error: %s", e, exc_info=e)
383
384        finally:
385            self._close(_abrt_apdu(1))
386            self._send_queue.close()
387
388            while True:
389                if future and not future.done():
390                    future.set_result(None)
391                if self._send_queue.empty():
392                    break
393                _, future = self._send_queue.get_nowait()

ACSE connection

For creating new connection see connect or listen.

Connection( conn: hat.drivers.copp.Connection, aarq_apdu: Union[bool, int, list[bool], bytes, bytearray, memoryview, NoneType, tuple[int, ...], str, hat.asn1.common.External, float, hat.asn1.common.EmbeddedPDV, Tuple[str, ForwardRef('Value')], Dict[str, ForwardRef('Value')], Iterable[ForwardRef('Value')], List[ForwardRef('Value')], hat.asn1.common.Entity], aare_apdu: Union[bool, int, list[bool], bytes, bytearray, memoryview, NoneType, tuple[int, ...], str, hat.asn1.common.External, float, hat.asn1.common.EmbeddedPDV, Tuple[str, ForwardRef('Value')], Dict[str, ForwardRef('Value')], Iterable[ForwardRef('Value')], List[ForwardRef('Value')], hat.asn1.common.Entity], local_ap_title: tuple[int, ...] | None, remote_ap_title: tuple[int, ...] | None, local_ae_qualifier: int | None, remote_ae_qualifier: int | None, receive_queue_size: int, send_queue_size: int)
237    def __init__(self,
238                 conn: copp.Connection,
239                 aarq_apdu: asn1.Value,
240                 aare_apdu: asn1.Value,
241                 local_ap_title: asn1.ObjectIdentifier | None,
242                 remote_ap_title: asn1.ObjectIdentifier | None,
243                 local_ae_qualifier: int | None,
244                 remote_ae_qualifier: int | None,
245                 receive_queue_size: int,
246                 send_queue_size: int):
247        aarq_external = aarq_apdu[1]['user-information'][0]
248        aare_external = aare_apdu[1]['user-information'][0]
249
250        conn_req_user_data = (
251            conn.syntax_names.get_name(aarq_external.indirect_ref),
252            aarq_external.data)
253        conn_res_user_data = (
254            conn.syntax_names.get_name(aare_external.indirect_ref),
255            aare_external.data)
256
257        self._conn = conn
258        self._conn_req_user_data = conn_req_user_data
259        self._conn_res_user_data = conn_res_user_data
260        self._loop = asyncio.get_running_loop()
261        self._info = ConnectionInfo(local_ap_title=local_ap_title,
262                                    local_ae_qualifier=local_ae_qualifier,
263                                    remote_ap_title=remote_ap_title,
264                                    remote_ae_qualifier=remote_ae_qualifier,
265                                    **conn.info._asdict())
266        self._close_apdu = _abrt_apdu(0)
267        self._receive_queue = aio.Queue(receive_queue_size)
268        self._send_queue = aio.Queue(send_queue_size)
269        self._async_group = aio.Group()
270
271        self.async_group.spawn(aio.call_on_cancel, self._on_close)
272        self.async_group.spawn(self._receive_loop)
273        self.async_group.spawn(self._send_loop)
274        self.async_group.spawn(aio.call_on_done, conn.wait_closing(),
275                               self.close)
async_group: hat.aio.group.Group
277    @property
278    def async_group(self) -> aio.Group:
279        """Async group"""
280        return self._async_group

Async group

info: ConnectionInfo
282    @property
283    def info(self) -> ConnectionInfo:
284        """Connection info"""
285        return self._info

Connection info

conn_req_user_data: tuple[tuple[int, ...], hat.asn1.common.Entity]
287    @property
288    def conn_req_user_data(self) -> copp.IdentifiedEntity:
289        """Connect request's user data"""
290        return self._conn_req_user_data

Connect request's user data

conn_res_user_data: tuple[tuple[int, ...], hat.asn1.common.Entity]
292    @property
293    def conn_res_user_data(self) -> copp.IdentifiedEntity:
294        """Connect response's user data"""
295        return self._conn_res_user_data

Connect response's user data

async def receive(self) -> tuple[tuple[int, ...], hat.asn1.common.Entity]:
297    async def receive(self) -> copp.IdentifiedEntity:
298        """Receive data"""
299        try:
300            return await self._receive_queue.get()
301
302        except aio.QueueClosedError:
303            raise ConnectionError()

Receive data

async def send(self, data: tuple[tuple[int, ...], hat.asn1.common.Entity]):
305    async def send(self, data: copp.IdentifiedEntity):
306        """Send data"""
307        try:
308            await self._send_queue.put((data, None))
309
310        except aio.QueueClosedError:
311            raise ConnectionError()

Send data

async def drain(self):
313    async def drain(self):
314        """Drain output buffer"""
315        try:
316            future = self._loop.create_future()
317            await self._send_queue.put((None, future))
318            await future
319
320        except aio.QueueClosedError:
321            raise ConnectionError()

Drain output buffer