hat.drivers.acse

Association Controll Service Element

  1"""Association Controll Service Element"""
  2
  3import asyncio
  4import importlib.resources
  5import logging
  6import typing
  7
  8from hat import aio
  9from hat import asn1
 10from hat import json
 11
 12from hat.drivers import copp
 13from hat.drivers import tcp
 14
 15
 16mlog = logging.getLogger(__name__)
 17
 18# (joint-iso-itu-t, association-control, abstract-syntax, apdus, version1)
 19_acse_syntax_name = (2, 2, 1, 0, 1)
 20
 21with importlib.resources.open_text(__package__, 'asn1_repo.json') as _f:
 22    _encoder = asn1.ber.BerEncoder(
 23        asn1.repository_from_json(
 24            json.decode_stream(_f)))
 25
 26
 27class ConnectionInfo(typing.NamedTuple):
 28    local_addr: tcp.Address
 29    local_tsel: int | None
 30    local_ssel: int | None
 31    local_psel: int | None
 32    local_ap_title: asn1.ObjectIdentifier | None
 33    local_ae_qualifier: int | None
 34    remote_addr: tcp.Address
 35    remote_tsel: int | None
 36    remote_ssel: int | None
 37    remote_psel: int | None
 38    remote_ap_title: asn1.ObjectIdentifier | None
 39    remote_ae_qualifier: int | None
 40
 41
 42ValidateCb: typing.TypeAlias = aio.AsyncCallable[[copp.SyntaxNames,
 43                                                  copp.IdentifiedEntity],
 44                                                 copp.IdentifiedEntity | None]
 45"""Validate callback"""
 46
 47ConnectionCb: typing.TypeAlias = aio.AsyncCallable[['Connection'], None]
 48"""Connection callback"""
 49
 50
 51async def connect(addr: tcp.Address,
 52                  syntax_name_list: list[asn1.ObjectIdentifier],
 53                  app_context_name: asn1.ObjectIdentifier,
 54                  user_data: copp.IdentifiedEntity | None = None,
 55                  *,
 56                  local_ap_title: asn1.ObjectIdentifier | None = None,
 57                  remote_ap_title: asn1.ObjectIdentifier | None = None,
 58                  local_ae_qualifier: int | None = None,
 59                  remote_ae_qualifier: int | None = None,
 60                  acse_receive_queue_size: int = 1024,
 61                  acse_send_queue_size: int = 1024,
 62                  **kwargs
 63                  ) -> 'Connection':
 64    """Connect to ACSE server
 65
 66    Additional arguments are passed directly to `hat.drivers.copp.connect`
 67    (`syntax_names` is set by this coroutine).
 68
 69    """
 70    syntax_names = copp.SyntaxNames([_acse_syntax_name, *syntax_name_list])
 71    aarq_apdu = _aarq_apdu(syntax_names, app_context_name,
 72                           local_ap_title, remote_ap_title,
 73                           local_ae_qualifier, remote_ae_qualifier,
 74                           user_data)
 75    copp_user_data = _acse_syntax_name, _encode(aarq_apdu)
 76    conn = await copp.connect(addr, syntax_names, copp_user_data, **kwargs)
 77
 78    try:
 79        aare_apdu_syntax_name, aare_apdu_entity = conn.conn_res_user_data
 80        if aare_apdu_syntax_name != _acse_syntax_name:
 81            raise Exception("invalid syntax name")
 82
 83        aare_apdu = _decode(aare_apdu_entity)
 84        if aare_apdu[0] != 'aare' or aare_apdu[1]['result'] != 0:
 85            raise Exception("invalid apdu")
 86
 87        calling_ap_title, called_ap_title = _get_ap_titles(aarq_apdu)
 88        calling_ae_qualifier, called_ae_qualifier = _get_ae_qualifiers(
 89            aarq_apdu)
 90        return Connection(conn, aarq_apdu, aare_apdu,
 91                          calling_ap_title, called_ap_title,
 92                          calling_ae_qualifier, called_ae_qualifier,
 93                          acse_receive_queue_size, acse_send_queue_size)
 94
 95    except Exception:
 96        await aio.uncancellable(_close_copp(conn, _abrt_apdu(1)))
 97        raise
 98
 99
100async def listen(validate_cb: ValidateCb,
101                 connection_cb: ConnectionCb,
102                 addr: tcp.Address = tcp.Address('0.0.0.0', 102),
103                 *,
104                 bind_connections: bool = False,
105                 acse_receive_queue_size: int = 1024,
106                 acse_send_queue_size: int = 1024,
107                 **kwargs
108                 ) -> 'Server':
109    """Create ACSE listening server
110
111    Additional arguments are passed directly to `hat.drivers.copp.listen`.
112
113    Args:
114        validate_cb: callback function or coroutine called on new
115            incomming connection request prior to creating connection object
116        connection_cb: new connection callback
117        addr: local listening address
118
119    """
120    server = Server()
121    server._validate_cb = validate_cb
122    server._connection_cb = connection_cb
123    server._bind_connections = bind_connections
124    server._receive_queue_size = acse_receive_queue_size
125    server._send_queue_size = acse_send_queue_size
126
127    server._srv = await copp.listen(server._on_validate,
128                                    server._on_connection,
129                                    addr,
130                                    bind_connections=False,
131                                    **kwargs)
132
133    return server
134
135
136class Server(aio.Resource):
137    """ACSE listening server
138
139    For creating new server see `listen`.
140
141    """
142
143    @property
144    def async_group(self) -> aio.Group:
145        """Async group"""
146        return self._srv.async_group
147
148    @property
149    def addresses(self) -> list[tcp.Address]:
150        """Listening addresses"""
151        return self._srv.addresses
152
153    async def _on_validate(self, syntax_names, user_data):
154        aarq_apdu_syntax_name, aarq_apdu_entity = user_data
155        if aarq_apdu_syntax_name != _acse_syntax_name:
156            raise Exception('invalid acse syntax name')
157
158        aarq_apdu = _decode(aarq_apdu_entity)
159        if aarq_apdu[0] != 'aarq':
160            raise Exception('not aarq message')
161
162        aarq_external = aarq_apdu[1]['user-information'][0]
163        if aarq_external.direct_ref is not None:
164            if aarq_external.direct_ref != _encoder.syntax_name:
165                raise Exception('invalid encoder identifier')
166
167        _, called_ap_title = _get_ap_titles(aarq_apdu)
168        _, called_ae_qualifier = _get_ae_qualifiers(aarq_apdu)
169        _, called_ap_invocation_identifier = \
170            _get_ap_invocation_identifiers(aarq_apdu)
171        _, called_ae_invocation_identifier = \
172            _get_ae_invocation_identifiers(aarq_apdu)
173
174        aarq_user_data = (syntax_names.get_name(aarq_external.indirect_ref),
175                          aarq_external.data)
176
177        user_validate_result = await aio.call(self._validate_cb, syntax_names,
178                                              aarq_user_data)
179
180        aare_apdu = _aare_apdu(syntax_names,
181                               user_validate_result,
182                               called_ap_title, called_ae_qualifier,
183                               called_ap_invocation_identifier,
184                               called_ae_invocation_identifier)
185        return _acse_syntax_name, _encode(aare_apdu)
186
187    async def _on_connection(self, copp_conn):
188        try:
189            try:
190                aarq_apdu = _decode(copp_conn.conn_req_user_data[1])
191                aare_apdu = _decode(copp_conn.conn_res_user_data[1])
192
193                calling_ap_title, called_ap_title = _get_ap_titles(aarq_apdu)
194                calling_ae_qualifier, called_ae_qualifier = _get_ae_qualifiers(
195                    aarq_apdu)
196
197                conn = Connection(copp_conn, aarq_apdu, aare_apdu,
198                                  calling_ap_title, called_ap_title,
199                                  calling_ae_qualifier, called_ae_qualifier,
200                                  self._receive_queue_size,
201                                  self._send_queue_size)
202
203            except Exception:
204                await aio.uncancellable(_close_copp(copp_conn, _abrt_apdu(1)))
205                raise
206
207            try:
208                await aio.call(self._connection_cb, conn)
209
210            except BaseException:
211                await aio.uncancellable(conn.async_close())
212                raise
213
214        except Exception as e:
215            mlog.error("error creating new incomming connection: %s", e,
216                       exc_info=e)
217            return
218
219        if not self._bind_connections:
220            return
221
222        try:
223            await conn.wait_closed()
224
225        except BaseException:
226            await aio.uncancellable(conn.async_close())
227            raise
228
229
230class Connection(aio.Resource):
231    """ACSE connection
232
233    For creating new connection see `connect` or `listen`.
234
235    """
236
237    def __init__(self,
238                 conn: copp.Connection,
239                 aarq_apdu: asn1.Value,
240                 aare_apdu: asn1.Value,
241                 local_ap_title: asn1.ObjectIdentifier | None,
242                 remote_ap_title: asn1.ObjectIdentifier | None,
243                 local_ae_qualifier: int | None,
244                 remote_ae_qualifier: int | None,
245                 receive_queue_size: int,
246                 send_queue_size: int):
247        aarq_external = aarq_apdu[1]['user-information'][0]
248        aare_external = aare_apdu[1]['user-information'][0]
249
250        conn_req_user_data = (
251            conn.syntax_names.get_name(aarq_external.indirect_ref),
252            aarq_external.data)
253        conn_res_user_data = (
254            conn.syntax_names.get_name(aare_external.indirect_ref),
255            aare_external.data)
256
257        self._conn = conn
258        self._conn_req_user_data = conn_req_user_data
259        self._conn_res_user_data = conn_res_user_data
260        self._loop = asyncio.get_running_loop()
261        self._info = ConnectionInfo(local_ap_title=local_ap_title,
262                                    local_ae_qualifier=local_ae_qualifier,
263                                    remote_ap_title=remote_ap_title,
264                                    remote_ae_qualifier=remote_ae_qualifier,
265                                    **conn.info._asdict())
266        self._close_apdu = _abrt_apdu(0)
267        self._receive_queue = aio.Queue(receive_queue_size)
268        self._send_queue = aio.Queue(send_queue_size)
269        self._async_group = aio.Group()
270
271        self.async_group.spawn(aio.call_on_cancel, self._on_close)
272        self.async_group.spawn(self._receive_loop)
273        self.async_group.spawn(self._send_loop)
274        self.async_group.spawn(aio.call_on_done, conn.wait_closing(),
275                               self.close)
276
277    @property
278    def async_group(self) -> aio.Group:
279        """Async group"""
280        return self._async_group
281
282    @property
283    def info(self) -> ConnectionInfo:
284        """Connection info"""
285        return self._info
286
287    @property
288    def conn_req_user_data(self) -> copp.IdentifiedEntity:
289        """Connect request's user data"""
290        return self._conn_req_user_data
291
292    @property
293    def conn_res_user_data(self) -> copp.IdentifiedEntity:
294        """Connect response's user data"""
295        return self._conn_res_user_data
296
297    async def receive(self) -> copp.IdentifiedEntity:
298        """Receive data"""
299        try:
300            return await self._receive_queue.get()
301
302        except aio.QueueClosedError:
303            raise ConnectionError()
304
305    async def send(self, data: copp.IdentifiedEntity):
306        """Send data"""
307        try:
308            await self._send_queue.put((data, None))
309
310        except aio.QueueClosedError:
311            raise ConnectionError()
312
313    async def drain(self):
314        """Drain output buffer"""
315        try:
316            future = self._loop.create_future()
317            await self._send_queue.put((None, future))
318            await future
319
320        except aio.QueueClosedError:
321            raise ConnectionError()
322
323    async def _on_close(self):
324        await _close_copp(self._conn, self._close_apdu)
325
326    def _close(self, apdu):
327        if not self.is_open:
328            return
329
330        self._close_apdu = apdu
331        self._async_group.close()
332
333    async def _receive_loop(self):
334        try:
335            while True:
336                syntax_name, entity = await self._conn.receive()
337
338                if syntax_name == _acse_syntax_name:
339                    if entity[0] == 'abrt':
340                        close_apdu = None
341
342                    elif entity[0] == 'rlrq':
343                        close_apdu = _rlre_apdu()
344
345                    else:
346                        close_apdu = _abrt_apdu(1)
347
348                    self._close(close_apdu)
349                    break
350
351                await self._receive_queue.put((syntax_name, entity))
352
353        except ConnectionError:
354            pass
355
356        except Exception as e:
357            mlog.error("receive loop error: %s", e, exc_info=e)
358
359        finally:
360            self._close(_abrt_apdu(1))
361            self._receive_queue.close()
362
363    async def _send_loop(self):
364        future = None
365        try:
366            while True:
367                data, future = await self._send_queue.get()
368
369                if data is None:
370                    await self._conn.drain()
371
372                else:
373                    await self._conn.send(data)
374
375                if future and not future.done():
376                    future.set_result(None)
377
378        except ConnectionError:
379            pass
380
381        except Exception as e:
382            mlog.error("send loop error: %s", e, exc_info=e)
383
384        finally:
385            self._close(_abrt_apdu(1))
386            self._send_queue.close()
387
388            while True:
389                if future and not future.done():
390                    future.set_result(None)
391                if self._send_queue.empty():
392                    break
393                _, future = self._send_queue.get_nowait()
394
395
396async def _close_copp(copp_conn, apdu):
397    data = (_acse_syntax_name, _encode(apdu)) if apdu else None
398    await copp_conn.async_close(data)
399
400
401def _get_ap_titles(aarq_apdu):
402    calling = None
403    if 'calling-AP-title' in aarq_apdu[1]:
404        if aarq_apdu[1]['calling-AP-title'][0] == 'ap-title-form2':
405            calling = aarq_apdu[1]['calling-AP-title'][1]
406
407    called = None
408    if 'called-AP-title' in aarq_apdu[1]:
409        if aarq_apdu[1]['called-AP-title'][0] == 'ap-title-form2':
410            called = aarq_apdu[1]['called-AP-title'][1]
411
412    return calling, called
413
414
415def _get_ae_qualifiers(aarq_apdu):
416    calling = None
417    if 'calling-AE-qualifier' in aarq_apdu[1]:
418        if aarq_apdu[1]['calling-AE-qualifier'][0] == 'ap-qualifier-form2':
419            calling = aarq_apdu[1]['calling-AE-qualifier'][1]
420
421    called = None
422    if 'called-AE-qualifier' in aarq_apdu[1]:
423        if aarq_apdu[1]['called-AE-qualifier'][0] == 'ap-qualifier-form2':
424            called = aarq_apdu[1]['called-AE-qualifier'][1]
425
426    return calling, called
427
428
429def _get_ap_invocation_identifiers(aarq_apdu):
430    calling = aarq_apdu[1].get('calling-AP-invocation-identifier')
431    called = aarq_apdu[1].get('called-AP-invocation-identifier')
432    return calling, called
433
434
435def _get_ae_invocation_identifiers(aarq_apdu):
436    calling = aarq_apdu[1].get('calling-AE-invocation-identifier')
437    called = aarq_apdu[1].get('called-AE-invocation-identifier')
438    return calling, called
439
440
441def _aarq_apdu(syntax_names, app_context_name,
442               calling_ap_title, called_ap_title,
443               calling_ae_qualifier, called_ae_qualifier,
444               user_data):
445    aarq_apdu = 'aarq', {'application-context-name': app_context_name}
446
447    if calling_ap_title is not None:
448        aarq_apdu[1]['calling-AP-title'] = 'ap-title-form2', calling_ap_title
449
450    if called_ap_title is not None:
451        aarq_apdu[1]['called-AP-title'] = 'ap-title-form2', called_ap_title
452
453    if calling_ae_qualifier is not None:
454        aarq_apdu[1]['calling-AE-qualifier'] = ('ae-qualifier-form2',
455                                                calling_ae_qualifier)
456
457    if called_ae_qualifier is not None:
458        aarq_apdu[1]['called-AE-qualifier'] = ('ae-qualifier-form2',
459                                               called_ae_qualifier)
460
461    if user_data:
462        aarq_apdu[1]['user-information'] = [
463            asn1.External(direct_ref=_encoder.syntax_name,
464                          indirect_ref=syntax_names.get_id(user_data[0]),
465                          data=user_data[1])]
466
467    return aarq_apdu
468
469
470def _aare_apdu(syntax_names, user_data,
471               responding_ap_title, responding_ae_qualifier,
472               responding_ap_invocation_identifier,
473               responding_ae_invocation_identifier):
474    aare_apdu = 'aare', {
475        'application-context-name': user_data[0],
476        'result': 0,
477        'result-source-diagnostic': ('acse-service-user', 0),
478        'user-information': [
479            asn1.External(direct_ref=_encoder.syntax_name,
480                          indirect_ref=syntax_names.get_id(user_data[0]),
481                          data=user_data[1])]}
482
483    if responding_ap_title is not None:
484        aare_apdu[1]['responding-AP-title'] = ('ap-title-form2',
485                                               responding_ap_title)
486
487    if responding_ae_qualifier is not None:
488        aare_apdu[1]['responding-AE-qualifier'] = ('ae-qualifier-form2',
489                                                   responding_ae_qualifier)
490
491    if responding_ap_invocation_identifier is not None:
492        aare_apdu[1]['responding-AP-invocation-identifier'] = \
493            responding_ap_invocation_identifier
494
495    if responding_ae_invocation_identifier is not None:
496        aare_apdu[1]['responding-AE-invocation-identifier'] = \
497            responding_ae_invocation_identifier
498
499    return aare_apdu
500
501
502def _abrt_apdu(source):
503    return 'abrt', {'abort-source': source}
504
505
506def _rlre_apdu():
507    return 'rlre', {}
508
509
510def _encode(value):
511    return _encoder.encode_value(asn1.TypeRef('ACSE-1', 'ACSE-apdu'), value)
512
513
514def _decode(entity):
515    return _encoder.decode_value(asn1.TypeRef('ACSE-1', 'ACSE-apdu'), entity)
mlog = <Logger hat.drivers.acse (WARNING)>
class ConnectionInfo(typing.NamedTuple):
28class ConnectionInfo(typing.NamedTuple):
29    local_addr: tcp.Address
30    local_tsel: int | None
31    local_ssel: int | None
32    local_psel: int | None
33    local_ap_title: asn1.ObjectIdentifier | None
34    local_ae_qualifier: int | None
35    remote_addr: tcp.Address
36    remote_tsel: int | None
37    remote_ssel: int | None
38    remote_psel: int | None
39    remote_ap_title: asn1.ObjectIdentifier | None
40    remote_ae_qualifier: int | None

ConnectionInfo(local_addr, local_tsel, local_ssel, local_psel, local_ap_title, local_ae_qualifier, remote_addr, remote_tsel, remote_ssel, remote_psel, remote_ap_title, remote_ae_qualifier)

ConnectionInfo( local_addr: hat.drivers.tcp.Address, local_tsel: int | None, local_ssel: int | None, local_psel: int | None, local_ap_title: tuple[int, ...] | None, local_ae_qualifier: int | None, remote_addr: hat.drivers.tcp.Address, remote_tsel: int | None, remote_ssel: int | None, remote_psel: int | None, remote_ap_title: tuple[int, ...] | None, remote_ae_qualifier: int | None)

Create new instance of ConnectionInfo(local_addr, local_tsel, local_ssel, local_psel, local_ap_title, local_ae_qualifier, remote_addr, remote_tsel, remote_ssel, remote_psel, remote_ap_title, remote_ae_qualifier)

Alias for field number 0

local_tsel: int | None

Alias for field number 1

local_ssel: int | None

Alias for field number 2

local_psel: int | None

Alias for field number 3

local_ap_title: tuple[int, ...] | None

Alias for field number 4

local_ae_qualifier: int | None

Alias for field number 5

Alias for field number 6

remote_tsel: int | None

Alias for field number 7

remote_ssel: int | None

Alias for field number 8

remote_psel: int | None

Alias for field number 9

remote_ap_title: tuple[int, ...] | None

Alias for field number 10

remote_ae_qualifier: int | None

Alias for field number 11

ValidateCb: TypeAlias = Callable[[hat.drivers.copp.SyntaxNames, tuple[tuple[int, ...], hat.asn1.common.Entity]], tuple[tuple[int, ...], hat.asn1.common.Entity] | None | Awaitable[tuple[tuple[int, ...], hat.asn1.common.Entity] | None]]

Validate callback

ConnectionCb: TypeAlias = Callable[[ForwardRef('Connection')], None | Awaitable[None]]

Connection callback

async def connect( addr: hat.drivers.tcp.Address, syntax_name_list: list[tuple[int, ...]], app_context_name: tuple[int, ...], user_data: tuple[tuple[int, ...], hat.asn1.common.Entity] | None = None, *, local_ap_title: tuple[int, ...] | None = None, remote_ap_title: tuple[int, ...] | None = None, local_ae_qualifier: int | None = None, remote_ae_qualifier: int | None = None, acse_receive_queue_size: int = 1024, acse_send_queue_size: int = 1024, **kwargs) -> Connection:
52async def connect(addr: tcp.Address,
53                  syntax_name_list: list[asn1.ObjectIdentifier],
54                  app_context_name: asn1.ObjectIdentifier,
55                  user_data: copp.IdentifiedEntity | None = None,
56                  *,
57                  local_ap_title: asn1.ObjectIdentifier | None = None,
58                  remote_ap_title: asn1.ObjectIdentifier | None = None,
59                  local_ae_qualifier: int | None = None,
60                  remote_ae_qualifier: int | None = None,
61                  acse_receive_queue_size: int = 1024,
62                  acse_send_queue_size: int = 1024,
63                  **kwargs
64                  ) -> 'Connection':
65    """Connect to ACSE server
66
67    Additional arguments are passed directly to `hat.drivers.copp.connect`
68    (`syntax_names` is set by this coroutine).
69
70    """
71    syntax_names = copp.SyntaxNames([_acse_syntax_name, *syntax_name_list])
72    aarq_apdu = _aarq_apdu(syntax_names, app_context_name,
73                           local_ap_title, remote_ap_title,
74                           local_ae_qualifier, remote_ae_qualifier,
75                           user_data)
76    copp_user_data = _acse_syntax_name, _encode(aarq_apdu)
77    conn = await copp.connect(addr, syntax_names, copp_user_data, **kwargs)
78
79    try:
80        aare_apdu_syntax_name, aare_apdu_entity = conn.conn_res_user_data
81        if aare_apdu_syntax_name != _acse_syntax_name:
82            raise Exception("invalid syntax name")
83
84        aare_apdu = _decode(aare_apdu_entity)
85        if aare_apdu[0] != 'aare' or aare_apdu[1]['result'] != 0:
86            raise Exception("invalid apdu")
87
88        calling_ap_title, called_ap_title = _get_ap_titles(aarq_apdu)
89        calling_ae_qualifier, called_ae_qualifier = _get_ae_qualifiers(
90            aarq_apdu)
91        return Connection(conn, aarq_apdu, aare_apdu,
92                          calling_ap_title, called_ap_title,
93                          calling_ae_qualifier, called_ae_qualifier,
94                          acse_receive_queue_size, acse_send_queue_size)
95
96    except Exception:
97        await aio.uncancellable(_close_copp(conn, _abrt_apdu(1)))
98        raise

Connect to ACSE server

Additional arguments are passed directly to hat.drivers.copp.connect (syntax_names is set by this coroutine).

async def listen( validate_cb: Callable[[hat.drivers.copp.SyntaxNames, tuple[tuple[int, ...], hat.asn1.common.Entity]], tuple[tuple[int, ...], hat.asn1.common.Entity] | None | Awaitable[tuple[tuple[int, ...], hat.asn1.common.Entity] | None]], connection_cb: Callable[[Connection], None | Awaitable[None]], addr: hat.drivers.tcp.Address = Address(host='0.0.0.0', port=102), *, bind_connections: bool = False, acse_receive_queue_size: int = 1024, acse_send_queue_size: int = 1024, **kwargs) -> Server:
101async def listen(validate_cb: ValidateCb,
102                 connection_cb: ConnectionCb,
103                 addr: tcp.Address = tcp.Address('0.0.0.0', 102),
104                 *,
105                 bind_connections: bool = False,
106                 acse_receive_queue_size: int = 1024,
107                 acse_send_queue_size: int = 1024,
108                 **kwargs
109                 ) -> 'Server':
110    """Create ACSE listening server
111
112    Additional arguments are passed directly to `hat.drivers.copp.listen`.
113
114    Args:
115        validate_cb: callback function or coroutine called on new
116            incomming connection request prior to creating connection object
117        connection_cb: new connection callback
118        addr: local listening address
119
120    """
121    server = Server()
122    server._validate_cb = validate_cb
123    server._connection_cb = connection_cb
124    server._bind_connections = bind_connections
125    server._receive_queue_size = acse_receive_queue_size
126    server._send_queue_size = acse_send_queue_size
127
128    server._srv = await copp.listen(server._on_validate,
129                                    server._on_connection,
130                                    addr,
131                                    bind_connections=False,
132                                    **kwargs)
133
134    return server

Create ACSE listening server

Additional arguments are passed directly to hat.drivers.copp.listen.

Arguments:
  • validate_cb: callback function or coroutine called on new incomming connection request prior to creating connection object
  • connection_cb: new connection callback
  • addr: local listening address
class Server(hat.aio.group.Resource):
137class Server(aio.Resource):
138    """ACSE listening server
139
140    For creating new server see `listen`.
141
142    """
143
144    @property
145    def async_group(self) -> aio.Group:
146        """Async group"""
147        return self._srv.async_group
148
149    @property
150    def addresses(self) -> list[tcp.Address]:
151        """Listening addresses"""
152        return self._srv.addresses
153
154    async def _on_validate(self, syntax_names, user_data):
155        aarq_apdu_syntax_name, aarq_apdu_entity = user_data
156        if aarq_apdu_syntax_name != _acse_syntax_name:
157            raise Exception('invalid acse syntax name')
158
159        aarq_apdu = _decode(aarq_apdu_entity)
160        if aarq_apdu[0] != 'aarq':
161            raise Exception('not aarq message')
162
163        aarq_external = aarq_apdu[1]['user-information'][0]
164        if aarq_external.direct_ref is not None:
165            if aarq_external.direct_ref != _encoder.syntax_name:
166                raise Exception('invalid encoder identifier')
167
168        _, called_ap_title = _get_ap_titles(aarq_apdu)
169        _, called_ae_qualifier = _get_ae_qualifiers(aarq_apdu)
170        _, called_ap_invocation_identifier = \
171            _get_ap_invocation_identifiers(aarq_apdu)
172        _, called_ae_invocation_identifier = \
173            _get_ae_invocation_identifiers(aarq_apdu)
174
175        aarq_user_data = (syntax_names.get_name(aarq_external.indirect_ref),
176                          aarq_external.data)
177
178        user_validate_result = await aio.call(self._validate_cb, syntax_names,
179                                              aarq_user_data)
180
181        aare_apdu = _aare_apdu(syntax_names,
182                               user_validate_result,
183                               called_ap_title, called_ae_qualifier,
184                               called_ap_invocation_identifier,
185                               called_ae_invocation_identifier)
186        return _acse_syntax_name, _encode(aare_apdu)
187
188    async def _on_connection(self, copp_conn):
189        try:
190            try:
191                aarq_apdu = _decode(copp_conn.conn_req_user_data[1])
192                aare_apdu = _decode(copp_conn.conn_res_user_data[1])
193
194                calling_ap_title, called_ap_title = _get_ap_titles(aarq_apdu)
195                calling_ae_qualifier, called_ae_qualifier = _get_ae_qualifiers(
196                    aarq_apdu)
197
198                conn = Connection(copp_conn, aarq_apdu, aare_apdu,
199                                  calling_ap_title, called_ap_title,
200                                  calling_ae_qualifier, called_ae_qualifier,
201                                  self._receive_queue_size,
202                                  self._send_queue_size)
203
204            except Exception:
205                await aio.uncancellable(_close_copp(copp_conn, _abrt_apdu(1)))
206                raise
207
208            try:
209                await aio.call(self._connection_cb, conn)
210
211            except BaseException:
212                await aio.uncancellable(conn.async_close())
213                raise
214
215        except Exception as e:
216            mlog.error("error creating new incomming connection: %s", e,
217                       exc_info=e)
218            return
219
220        if not self._bind_connections:
221            return
222
223        try:
224            await conn.wait_closed()
225
226        except BaseException:
227            await aio.uncancellable(conn.async_close())
228            raise

ACSE listening server

For creating new server see listen.

async_group: hat.aio.group.Group
144    @property
145    def async_group(self) -> aio.Group:
146        """Async group"""
147        return self._srv.async_group

Async group

addresses: list[hat.drivers.tcp.Address]
149    @property
150    def addresses(self) -> list[tcp.Address]:
151        """Listening addresses"""
152        return self._srv.addresses

Listening addresses

class Connection(hat.aio.group.Resource):
231class Connection(aio.Resource):
232    """ACSE connection
233
234    For creating new connection see `connect` or `listen`.
235
236    """
237
238    def __init__(self,
239                 conn: copp.Connection,
240                 aarq_apdu: asn1.Value,
241                 aare_apdu: asn1.Value,
242                 local_ap_title: asn1.ObjectIdentifier | None,
243                 remote_ap_title: asn1.ObjectIdentifier | None,
244                 local_ae_qualifier: int | None,
245                 remote_ae_qualifier: int | None,
246                 receive_queue_size: int,
247                 send_queue_size: int):
248        aarq_external = aarq_apdu[1]['user-information'][0]
249        aare_external = aare_apdu[1]['user-information'][0]
250
251        conn_req_user_data = (
252            conn.syntax_names.get_name(aarq_external.indirect_ref),
253            aarq_external.data)
254        conn_res_user_data = (
255            conn.syntax_names.get_name(aare_external.indirect_ref),
256            aare_external.data)
257
258        self._conn = conn
259        self._conn_req_user_data = conn_req_user_data
260        self._conn_res_user_data = conn_res_user_data
261        self._loop = asyncio.get_running_loop()
262        self._info = ConnectionInfo(local_ap_title=local_ap_title,
263                                    local_ae_qualifier=local_ae_qualifier,
264                                    remote_ap_title=remote_ap_title,
265                                    remote_ae_qualifier=remote_ae_qualifier,
266                                    **conn.info._asdict())
267        self._close_apdu = _abrt_apdu(0)
268        self._receive_queue = aio.Queue(receive_queue_size)
269        self._send_queue = aio.Queue(send_queue_size)
270        self._async_group = aio.Group()
271
272        self.async_group.spawn(aio.call_on_cancel, self._on_close)
273        self.async_group.spawn(self._receive_loop)
274        self.async_group.spawn(self._send_loop)
275        self.async_group.spawn(aio.call_on_done, conn.wait_closing(),
276                               self.close)
277
278    @property
279    def async_group(self) -> aio.Group:
280        """Async group"""
281        return self._async_group
282
283    @property
284    def info(self) -> ConnectionInfo:
285        """Connection info"""
286        return self._info
287
288    @property
289    def conn_req_user_data(self) -> copp.IdentifiedEntity:
290        """Connect request's user data"""
291        return self._conn_req_user_data
292
293    @property
294    def conn_res_user_data(self) -> copp.IdentifiedEntity:
295        """Connect response's user data"""
296        return self._conn_res_user_data
297
298    async def receive(self) -> copp.IdentifiedEntity:
299        """Receive data"""
300        try:
301            return await self._receive_queue.get()
302
303        except aio.QueueClosedError:
304            raise ConnectionError()
305
306    async def send(self, data: copp.IdentifiedEntity):
307        """Send data"""
308        try:
309            await self._send_queue.put((data, None))
310
311        except aio.QueueClosedError:
312            raise ConnectionError()
313
314    async def drain(self):
315        """Drain output buffer"""
316        try:
317            future = self._loop.create_future()
318            await self._send_queue.put((None, future))
319            await future
320
321        except aio.QueueClosedError:
322            raise ConnectionError()
323
324    async def _on_close(self):
325        await _close_copp(self._conn, self._close_apdu)
326
327    def _close(self, apdu):
328        if not self.is_open:
329            return
330
331        self._close_apdu = apdu
332        self._async_group.close()
333
334    async def _receive_loop(self):
335        try:
336            while True:
337                syntax_name, entity = await self._conn.receive()
338
339                if syntax_name == _acse_syntax_name:
340                    if entity[0] == 'abrt':
341                        close_apdu = None
342
343                    elif entity[0] == 'rlrq':
344                        close_apdu = _rlre_apdu()
345
346                    else:
347                        close_apdu = _abrt_apdu(1)
348
349                    self._close(close_apdu)
350                    break
351
352                await self._receive_queue.put((syntax_name, entity))
353
354        except ConnectionError:
355            pass
356
357        except Exception as e:
358            mlog.error("receive loop error: %s", e, exc_info=e)
359
360        finally:
361            self._close(_abrt_apdu(1))
362            self._receive_queue.close()
363
364    async def _send_loop(self):
365        future = None
366        try:
367            while True:
368                data, future = await self._send_queue.get()
369
370                if data is None:
371                    await self._conn.drain()
372
373                else:
374                    await self._conn.send(data)
375
376                if future and not future.done():
377                    future.set_result(None)
378
379        except ConnectionError:
380            pass
381
382        except Exception as e:
383            mlog.error("send loop error: %s", e, exc_info=e)
384
385        finally:
386            self._close(_abrt_apdu(1))
387            self._send_queue.close()
388
389            while True:
390                if future and not future.done():
391                    future.set_result(None)
392                if self._send_queue.empty():
393                    break
394                _, future = self._send_queue.get_nowait()

ACSE connection

For creating new connection see connect or listen.

Connection( conn: hat.drivers.copp.Connection, aarq_apdu: Union[bool, int, Collection[bool], bytes, bytearray, memoryview, NoneType, tuple[int, ...], str, float, Tuple[str, ForwardRef('Value')], Dict[str, ForwardRef('Value')], Collection['Value'], hat.asn1.common.Entity, hat.asn1.common.External, hat.asn1.common.EmbeddedPDV], aare_apdu: Union[bool, int, Collection[bool], bytes, bytearray, memoryview, NoneType, tuple[int, ...], str, float, Tuple[str, ForwardRef('Value')], Dict[str, ForwardRef('Value')], Collection['Value'], hat.asn1.common.Entity, hat.asn1.common.External, hat.asn1.common.EmbeddedPDV], local_ap_title: tuple[int, ...] | None, remote_ap_title: tuple[int, ...] | None, local_ae_qualifier: int | None, remote_ae_qualifier: int | None, receive_queue_size: int, send_queue_size: int)
238    def __init__(self,
239                 conn: copp.Connection,
240                 aarq_apdu: asn1.Value,
241                 aare_apdu: asn1.Value,
242                 local_ap_title: asn1.ObjectIdentifier | None,
243                 remote_ap_title: asn1.ObjectIdentifier | None,
244                 local_ae_qualifier: int | None,
245                 remote_ae_qualifier: int | None,
246                 receive_queue_size: int,
247                 send_queue_size: int):
248        aarq_external = aarq_apdu[1]['user-information'][0]
249        aare_external = aare_apdu[1]['user-information'][0]
250
251        conn_req_user_data = (
252            conn.syntax_names.get_name(aarq_external.indirect_ref),
253            aarq_external.data)
254        conn_res_user_data = (
255            conn.syntax_names.get_name(aare_external.indirect_ref),
256            aare_external.data)
257
258        self._conn = conn
259        self._conn_req_user_data = conn_req_user_data
260        self._conn_res_user_data = conn_res_user_data
261        self._loop = asyncio.get_running_loop()
262        self._info = ConnectionInfo(local_ap_title=local_ap_title,
263                                    local_ae_qualifier=local_ae_qualifier,
264                                    remote_ap_title=remote_ap_title,
265                                    remote_ae_qualifier=remote_ae_qualifier,
266                                    **conn.info._asdict())
267        self._close_apdu = _abrt_apdu(0)
268        self._receive_queue = aio.Queue(receive_queue_size)
269        self._send_queue = aio.Queue(send_queue_size)
270        self._async_group = aio.Group()
271
272        self.async_group.spawn(aio.call_on_cancel, self._on_close)
273        self.async_group.spawn(self._receive_loop)
274        self.async_group.spawn(self._send_loop)
275        self.async_group.spawn(aio.call_on_done, conn.wait_closing(),
276                               self.close)
async_group: hat.aio.group.Group
278    @property
279    def async_group(self) -> aio.Group:
280        """Async group"""
281        return self._async_group

Async group

info: ConnectionInfo
283    @property
284    def info(self) -> ConnectionInfo:
285        """Connection info"""
286        return self._info

Connection info

conn_req_user_data: tuple[tuple[int, ...], hat.asn1.common.Entity]
288    @property
289    def conn_req_user_data(self) -> copp.IdentifiedEntity:
290        """Connect request's user data"""
291        return self._conn_req_user_data

Connect request's user data

conn_res_user_data: tuple[tuple[int, ...], hat.asn1.common.Entity]
293    @property
294    def conn_res_user_data(self) -> copp.IdentifiedEntity:
295        """Connect response's user data"""
296        return self._conn_res_user_data

Connect response's user data

async def receive(self) -> tuple[tuple[int, ...], hat.asn1.common.Entity]:
298    async def receive(self) -> copp.IdentifiedEntity:
299        """Receive data"""
300        try:
301            return await self._receive_queue.get()
302
303        except aio.QueueClosedError:
304            raise ConnectionError()

Receive data

async def send(self, data: tuple[tuple[int, ...], hat.asn1.common.Entity]):
306    async def send(self, data: copp.IdentifiedEntity):
307        """Send data"""
308        try:
309            await self._send_queue.put((data, None))
310
311        except aio.QueueClosedError:
312            raise ConnectionError()

Send data

async def drain(self):
314    async def drain(self):
315        """Drain output buffer"""
316        try:
317            future = self._loop.create_future()
318            await self._send_queue.put((None, future))
319            await future
320
321        except aio.QueueClosedError:
322            raise ConnectionError()

Drain output buffer