hat.drivers.acse

Association Controll Service Element

  1"""Association Controll Service Element"""
  2
  3import asyncio
  4import importlib.resources
  5import logging
  6import typing
  7
  8from hat import aio
  9from hat import asn1
 10from hat import json
 11
 12from hat.drivers import copp
 13from hat.drivers import tcp
 14
 15
 16mlog = logging.getLogger(__name__)
 17
 18# (joint-iso-itu-t, association-control, abstract-syntax, apdus, version1)
 19_acse_syntax_name = (2, 2, 1, 0, 1)
 20
 21with importlib.resources.open_text(__package__, 'asn1_repo.json') as _f:
 22    _encoder = asn1.ber.BerEncoder(
 23        asn1.repository_from_json(
 24            json.decode_stream(_f)))
 25
 26
 27class ConnectionInfo(typing.NamedTuple):
 28    name: str | None
 29    local_addr: tcp.Address
 30    local_tsel: int | None
 31    local_ssel: int | None
 32    local_psel: int | None
 33    local_ap_title: asn1.ObjectIdentifier | None
 34    local_ae_qualifier: int | None
 35    remote_addr: tcp.Address
 36    remote_tsel: int | None
 37    remote_ssel: int | None
 38    remote_psel: int | None
 39    remote_ap_title: asn1.ObjectIdentifier | None
 40    remote_ae_qualifier: int | None
 41
 42
 43ValidateCb: typing.TypeAlias = aio.AsyncCallable[[copp.SyntaxNames,
 44                                                  copp.IdentifiedEntity],
 45                                                 copp.IdentifiedEntity | None]
 46"""Validate callback"""
 47
 48ConnectionCb: typing.TypeAlias = aio.AsyncCallable[['Connection'], None]
 49"""Connection callback"""
 50
 51
 52async def connect(addr: tcp.Address,
 53                  syntax_name_list: list[asn1.ObjectIdentifier],
 54                  app_context_name: asn1.ObjectIdentifier,
 55                  user_data: copp.IdentifiedEntity | None = None,
 56                  *,
 57                  local_ap_title: asn1.ObjectIdentifier | None = None,
 58                  remote_ap_title: asn1.ObjectIdentifier | None = None,
 59                  local_ae_qualifier: int | None = None,
 60                  remote_ae_qualifier: int | None = None,
 61                  acse_receive_queue_size: int = 1024,
 62                  acse_send_queue_size: int = 1024,
 63                  **kwargs
 64                  ) -> 'Connection':
 65    """Connect to ACSE server
 66
 67    Additional arguments are passed directly to `hat.drivers.copp.connect`
 68    (`syntax_names` is set by this coroutine).
 69
 70    """
 71    syntax_names = copp.SyntaxNames([_acse_syntax_name, *syntax_name_list])
 72    aarq_apdu = _aarq_apdu(syntax_names, app_context_name,
 73                           local_ap_title, remote_ap_title,
 74                           local_ae_qualifier, remote_ae_qualifier,
 75                           user_data)
 76    copp_user_data = _acse_syntax_name, _encode(aarq_apdu)
 77    conn = await copp.connect(addr, syntax_names, copp_user_data, **kwargs)
 78
 79    try:
 80        aare_apdu_syntax_name, aare_apdu_entity = conn.conn_res_user_data
 81        if aare_apdu_syntax_name != _acse_syntax_name:
 82            raise Exception("invalid syntax name")
 83
 84        aare_apdu = _decode(aare_apdu_entity)
 85        if aare_apdu[0] != 'aare' or aare_apdu[1]['result'] != 0:
 86            raise Exception("invalid apdu")
 87
 88        calling_ap_title, called_ap_title = _get_ap_titles(aarq_apdu)
 89        calling_ae_qualifier, called_ae_qualifier = _get_ae_qualifiers(
 90            aarq_apdu)
 91        return Connection(conn, aarq_apdu, aare_apdu,
 92                          calling_ap_title, called_ap_title,
 93                          calling_ae_qualifier, called_ae_qualifier,
 94                          acse_receive_queue_size, acse_send_queue_size)
 95
 96    except Exception:
 97        await aio.uncancellable(_close_copp(conn, _abrt_apdu(1)))
 98        raise
 99
100
101async def listen(validate_cb: ValidateCb,
102                 connection_cb: ConnectionCb,
103                 addr: tcp.Address = tcp.Address('0.0.0.0', 102),
104                 *,
105                 bind_connections: bool = False,
106                 acse_receive_queue_size: int = 1024,
107                 acse_send_queue_size: int = 1024,
108                 **kwargs
109                 ) -> 'Server':
110    """Create ACSE listening server
111
112    Additional arguments are passed directly to `hat.drivers.copp.listen`.
113
114    Args:
115        validate_cb: callback function or coroutine called on new
116            incomming connection request prior to creating connection object
117        connection_cb: new connection callback
118        addr: local listening address
119
120    """
121    server = Server()
122    server._validate_cb = validate_cb
123    server._connection_cb = connection_cb
124    server._bind_connections = bind_connections
125    server._receive_queue_size = acse_receive_queue_size
126    server._send_queue_size = acse_send_queue_size
127    server._log = _create_server_logger(kwargs.get('name'), None)
128
129    server._srv = await copp.listen(server._on_validate,
130                                    server._on_connection,
131                                    addr,
132                                    bind_connections=False,
133                                    **kwargs)
134
135    server._log = _create_server_logger(kwargs.get('name'), server._srv.info)
136
137    return server
138
139
140class Server(aio.Resource):
141    """ACSE listening server
142
143    For creating new server see `listen`.
144
145    """
146
147    @property
148    def async_group(self) -> aio.Group:
149        """Async group"""
150        return self._srv.async_group
151
152    @property
153    def info(self) -> tcp.ServerInfo:
154        """Server info"""
155        return self._srv.info
156
157    async def _on_validate(self, syntax_names, user_data):
158        aarq_apdu_syntax_name, aarq_apdu_entity = user_data
159        if aarq_apdu_syntax_name != _acse_syntax_name:
160            raise Exception('invalid acse syntax name')
161
162        aarq_apdu = _decode(aarq_apdu_entity)
163        if aarq_apdu[0] != 'aarq':
164            raise Exception('not aarq message')
165
166        aarq_external = aarq_apdu[1]['user-information'][0]
167        if aarq_external.direct_ref is not None:
168            if aarq_external.direct_ref != _encoder.syntax_name:
169                raise Exception('invalid encoder identifier')
170
171        _, called_ap_title = _get_ap_titles(aarq_apdu)
172        _, called_ae_qualifier = _get_ae_qualifiers(aarq_apdu)
173        _, called_ap_invocation_identifier = \
174            _get_ap_invocation_identifiers(aarq_apdu)
175        _, called_ae_invocation_identifier = \
176            _get_ae_invocation_identifiers(aarq_apdu)
177
178        aarq_user_data = (syntax_names.get_name(aarq_external.indirect_ref),
179                          aarq_external.data)
180
181        user_validate_result = await aio.call(self._validate_cb, syntax_names,
182                                              aarq_user_data)
183
184        aare_apdu = _aare_apdu(syntax_names,
185                               user_validate_result,
186                               called_ap_title, called_ae_qualifier,
187                               called_ap_invocation_identifier,
188                               called_ae_invocation_identifier)
189        return _acse_syntax_name, _encode(aare_apdu)
190
191    async def _on_connection(self, copp_conn):
192        try:
193            try:
194                aarq_apdu = _decode(copp_conn.conn_req_user_data[1])
195                aare_apdu = _decode(copp_conn.conn_res_user_data[1])
196
197                calling_ap_title, called_ap_title = _get_ap_titles(aarq_apdu)
198                calling_ae_qualifier, called_ae_qualifier = _get_ae_qualifiers(
199                    aarq_apdu)
200
201                conn = Connection(copp_conn, aarq_apdu, aare_apdu,
202                                  called_ap_title, calling_ap_title,
203                                  called_ae_qualifier, calling_ae_qualifier,
204                                  self._receive_queue_size,
205                                  self._send_queue_size)
206
207            except Exception:
208                await aio.uncancellable(_close_copp(copp_conn, _abrt_apdu(1)))
209                raise
210
211            try:
212                await aio.call(self._connection_cb, conn)
213
214            except BaseException:
215                await aio.uncancellable(conn.async_close())
216                raise
217
218        except Exception as e:
219            self._log.error("error creating new incomming connection: %s",
220                            e, exc_info=e)
221            return
222
223        if not self._bind_connections:
224            return
225
226        try:
227            await conn.wait_closed()
228
229        except BaseException:
230            await aio.uncancellable(conn.async_close())
231            raise
232
233
234class Connection(aio.Resource):
235    """ACSE connection
236
237    For creating new connection see `connect` or `listen`.
238
239    """
240
241    def __init__(self,
242                 conn: copp.Connection,
243                 aarq_apdu: asn1.Value,
244                 aare_apdu: asn1.Value,
245                 local_ap_title: asn1.ObjectIdentifier | None,
246                 remote_ap_title: asn1.ObjectIdentifier | None,
247                 local_ae_qualifier: int | None,
248                 remote_ae_qualifier: int | None,
249                 receive_queue_size: int,
250                 send_queue_size: int):
251        aarq_external = aarq_apdu[1]['user-information'][0]
252        aare_external = aare_apdu[1]['user-information'][0]
253
254        conn_req_user_data = (
255            conn.syntax_names.get_name(aarq_external.indirect_ref),
256            aarq_external.data)
257        conn_res_user_data = (
258            conn.syntax_names.get_name(aare_external.indirect_ref),
259            aare_external.data)
260
261        self._conn = conn
262        self._conn_req_user_data = conn_req_user_data
263        self._conn_res_user_data = conn_res_user_data
264        self._loop = asyncio.get_running_loop()
265        self._info = ConnectionInfo(local_ap_title=local_ap_title,
266                                    local_ae_qualifier=local_ae_qualifier,
267                                    remote_ap_title=remote_ap_title,
268                                    remote_ae_qualifier=remote_ae_qualifier,
269                                    **conn.info._asdict())
270        self._close_apdu = _abrt_apdu(0)
271        self._receive_queue = aio.Queue(receive_queue_size)
272        self._send_queue = aio.Queue(send_queue_size)
273        self._async_group = aio.Group()
274        self._log = _create_connection_logger(self._info)
275
276        self.async_group.spawn(aio.call_on_cancel, self._on_close)
277        self.async_group.spawn(self._receive_loop)
278        self.async_group.spawn(self._send_loop)
279        self.async_group.spawn(aio.call_on_done, conn.wait_closing(),
280                               self.close)
281
282    @property
283    def async_group(self) -> aio.Group:
284        """Async group"""
285        return self._async_group
286
287    @property
288    def info(self) -> ConnectionInfo:
289        """Connection info"""
290        return self._info
291
292    @property
293    def conn_req_user_data(self) -> copp.IdentifiedEntity:
294        """Connect request's user data"""
295        return self._conn_req_user_data
296
297    @property
298    def conn_res_user_data(self) -> copp.IdentifiedEntity:
299        """Connect response's user data"""
300        return self._conn_res_user_data
301
302    async def receive(self) -> copp.IdentifiedEntity:
303        """Receive data"""
304        try:
305            return await self._receive_queue.get()
306
307        except aio.QueueClosedError:
308            raise ConnectionError()
309
310    async def send(self, data: copp.IdentifiedEntity):
311        """Send data"""
312        try:
313            await self._send_queue.put((data, None))
314
315        except aio.QueueClosedError:
316            raise ConnectionError()
317
318    async def drain(self):
319        """Drain output buffer"""
320        try:
321            future = self._loop.create_future()
322            await self._send_queue.put((None, future))
323            await future
324
325        except aio.QueueClosedError:
326            raise ConnectionError()
327
328    async def _on_close(self):
329        await _close_copp(self._conn, self._close_apdu)
330
331    def _close(self, apdu):
332        if not self.is_open:
333            return
334
335        self._close_apdu = apdu
336        self._async_group.close()
337
338    async def _receive_loop(self):
339        try:
340            while True:
341                syntax_name, entity = await self._conn.receive()
342
343                if syntax_name == _acse_syntax_name:
344                    if entity[0] == 'abrt':
345                        close_apdu = None
346
347                    elif entity[0] == 'rlrq':
348                        close_apdu = _rlre_apdu()
349
350                    else:
351                        close_apdu = _abrt_apdu(1)
352
353                    self._close(close_apdu)
354                    break
355
356                await self._receive_queue.put((syntax_name, entity))
357
358        except ConnectionError:
359            pass
360
361        except Exception as e:
362            self._log.error("receive loop error: %s", e, exc_info=e)
363
364        finally:
365            self._close(_abrt_apdu(1))
366            self._receive_queue.close()
367
368    async def _send_loop(self):
369        future = None
370        try:
371            while True:
372                data, future = await self._send_queue.get()
373
374                if data is None:
375                    await self._conn.drain()
376
377                else:
378                    await self._conn.send(data)
379
380                if future and not future.done():
381                    future.set_result(None)
382
383        except ConnectionError:
384            pass
385
386        except Exception as e:
387            self._log.error("send loop error: %s", e, exc_info=e)
388
389        finally:
390            self._close(_abrt_apdu(1))
391            self._send_queue.close()
392
393            while True:
394                if future and not future.done():
395                    future.set_result(None)
396                if self._send_queue.empty():
397                    break
398                _, future = self._send_queue.get_nowait()
399
400
401async def _close_copp(copp_conn, apdu):
402    data = (_acse_syntax_name, _encode(apdu)) if apdu else None
403    await copp_conn.async_close(data)
404
405
406def _get_ap_titles(aarq_apdu):
407    calling = None
408    if 'calling-AP-title' in aarq_apdu[1]:
409        if aarq_apdu[1]['calling-AP-title'][0] == 'ap-title-form2':
410            calling = aarq_apdu[1]['calling-AP-title'][1]
411
412    called = None
413    if 'called-AP-title' in aarq_apdu[1]:
414        if aarq_apdu[1]['called-AP-title'][0] == 'ap-title-form2':
415            called = aarq_apdu[1]['called-AP-title'][1]
416
417    return calling, called
418
419
420def _get_ae_qualifiers(aarq_apdu):
421    calling = None
422    if 'calling-AE-qualifier' in aarq_apdu[1]:
423        if aarq_apdu[1]['calling-AE-qualifier'][0] == 'ae-qualifier-form2':
424            calling = aarq_apdu[1]['calling-AE-qualifier'][1]
425
426    called = None
427    if 'called-AE-qualifier' in aarq_apdu[1]:
428        if aarq_apdu[1]['called-AE-qualifier'][0] == 'ae-qualifier-form2':
429            called = aarq_apdu[1]['called-AE-qualifier'][1]
430
431    return calling, called
432
433
434def _get_ap_invocation_identifiers(aarq_apdu):
435    calling = aarq_apdu[1].get('calling-AP-invocation-identifier')
436    called = aarq_apdu[1].get('called-AP-invocation-identifier')
437    return calling, called
438
439
440def _get_ae_invocation_identifiers(aarq_apdu):
441    calling = aarq_apdu[1].get('calling-AE-invocation-identifier')
442    called = aarq_apdu[1].get('called-AE-invocation-identifier')
443    return calling, called
444
445
446def _aarq_apdu(syntax_names, app_context_name,
447               calling_ap_title, called_ap_title,
448               calling_ae_qualifier, called_ae_qualifier,
449               user_data):
450    aarq_apdu = 'aarq', {'application-context-name': app_context_name}
451
452    if calling_ap_title is not None:
453        aarq_apdu[1]['calling-AP-title'] = 'ap-title-form2', calling_ap_title
454
455    if called_ap_title is not None:
456        aarq_apdu[1]['called-AP-title'] = 'ap-title-form2', called_ap_title
457
458    if calling_ae_qualifier is not None:
459        aarq_apdu[1]['calling-AE-qualifier'] = ('ae-qualifier-form2',
460                                                calling_ae_qualifier)
461
462    if called_ae_qualifier is not None:
463        aarq_apdu[1]['called-AE-qualifier'] = ('ae-qualifier-form2',
464                                               called_ae_qualifier)
465
466    if user_data:
467        aarq_apdu[1]['user-information'] = [
468            asn1.External(direct_ref=_encoder.syntax_name,
469                          indirect_ref=syntax_names.get_id(user_data[0]),
470                          data=user_data[1])]
471
472    return aarq_apdu
473
474
475def _aare_apdu(syntax_names, user_data,
476               responding_ap_title, responding_ae_qualifier,
477               responding_ap_invocation_identifier,
478               responding_ae_invocation_identifier):
479    aare_apdu = 'aare', {
480        'application-context-name': user_data[0],
481        'result': 0,
482        'result-source-diagnostic': ('acse-service-user', 0),
483        'user-information': [
484            asn1.External(direct_ref=_encoder.syntax_name,
485                          indirect_ref=syntax_names.get_id(user_data[0]),
486                          data=user_data[1])]}
487
488    if responding_ap_title is not None:
489        aare_apdu[1]['responding-AP-title'] = ('ap-title-form2',
490                                               responding_ap_title)
491
492    if responding_ae_qualifier is not None:
493        aare_apdu[1]['responding-AE-qualifier'] = ('ae-qualifier-form2',
494                                                   responding_ae_qualifier)
495
496    if responding_ap_invocation_identifier is not None:
497        aare_apdu[1]['responding-AP-invocation-identifier'] = \
498            responding_ap_invocation_identifier
499
500    if responding_ae_invocation_identifier is not None:
501        aare_apdu[1]['responding-AE-invocation-identifier'] = \
502            responding_ae_invocation_identifier
503
504    return aare_apdu
505
506
507def _abrt_apdu(source):
508    return 'abrt', {'abort-source': source}
509
510
511def _rlre_apdu():
512    return 'rlre', {}
513
514
515def _encode(value):
516    return _encoder.encode_value(asn1.TypeRef('ACSE-1', 'ACSE-apdu'), value)
517
518
519def _decode(entity):
520    return _encoder.decode_value(asn1.TypeRef('ACSE-1', 'ACSE-apdu'), entity)
521
522
523def _create_server_logger(name, info):
524    extra = {'meta': {'type': 'AcseServer',
525                      'name': name}}
526
527    if info is not None:
528        extra['meta']['addresses'] = [{'host': addr.host,
529                                       'port': addr.port}
530                                      for addr in info.addresses]
531
532    return logging.LoggerAdapter(mlog, extra)
533
534
535def _create_connection_logger(info):
536    extra = {'meta': {'type': 'AcseConnection',
537                      'name': info.name,
538                      'local_addr': {'host': info.local_addr.host,
539                                     'port': info.local_addr.port},
540                      'remote_addr': {'host': info.remote_addr.host,
541                                      'port': info.remote_addr.port}}}
542
543    return logging.LoggerAdapter(mlog, extra)
mlog = <Logger hat.drivers.acse (WARNING)>
class ConnectionInfo(typing.NamedTuple):
28class ConnectionInfo(typing.NamedTuple):
29    name: str | None
30    local_addr: tcp.Address
31    local_tsel: int | None
32    local_ssel: int | None
33    local_psel: int | None
34    local_ap_title: asn1.ObjectIdentifier | None
35    local_ae_qualifier: int | None
36    remote_addr: tcp.Address
37    remote_tsel: int | None
38    remote_ssel: int | None
39    remote_psel: int | None
40    remote_ap_title: asn1.ObjectIdentifier | None
41    remote_ae_qualifier: int | None

ConnectionInfo(name, local_addr, local_tsel, local_ssel, local_psel, local_ap_title, local_ae_qualifier, remote_addr, remote_tsel, remote_ssel, remote_psel, remote_ap_title, remote_ae_qualifier)

ConnectionInfo( name: str | None, local_addr: hat.drivers.tcp.Address, local_tsel: int | None, local_ssel: int | None, local_psel: int | None, local_ap_title: tuple[int, ...] | None, local_ae_qualifier: int | None, remote_addr: hat.drivers.tcp.Address, remote_tsel: int | None, remote_ssel: int | None, remote_psel: int | None, remote_ap_title: tuple[int, ...] | None, remote_ae_qualifier: int | None)

Create new instance of ConnectionInfo(name, local_addr, local_tsel, local_ssel, local_psel, local_ap_title, local_ae_qualifier, remote_addr, remote_tsel, remote_ssel, remote_psel, remote_ap_title, remote_ae_qualifier)

name: str | None

Alias for field number 0

Alias for field number 1

local_tsel: int | None

Alias for field number 2

local_ssel: int | None

Alias for field number 3

local_psel: int | None

Alias for field number 4

local_ap_title: tuple[int, ...] | None

Alias for field number 5

local_ae_qualifier: int | None

Alias for field number 6

Alias for field number 7

remote_tsel: int | None

Alias for field number 8

remote_ssel: int | None

Alias for field number 9

remote_psel: int | None

Alias for field number 10

remote_ap_title: tuple[int, ...] | None

Alias for field number 11

remote_ae_qualifier: int | None

Alias for field number 12

ValidateCb: TypeAlias = Callable[[hat.drivers.copp.SyntaxNames, tuple[tuple[int, ...], hat.asn1.common.Entity]], tuple[tuple[int, ...], hat.asn1.common.Entity] | None | Awaitable[tuple[tuple[int, ...], hat.asn1.common.Entity] | None]]

Validate callback

ConnectionCb: TypeAlias = Callable[[ForwardRef('Connection')], None | Awaitable[None]]

Connection callback

async def connect( addr: hat.drivers.tcp.Address, syntax_name_list: list[tuple[int, ...]], app_context_name: tuple[int, ...], user_data: tuple[tuple[int, ...], hat.asn1.common.Entity] | None = None, *, local_ap_title: tuple[int, ...] | None = None, remote_ap_title: tuple[int, ...] | None = None, local_ae_qualifier: int | None = None, remote_ae_qualifier: int | None = None, acse_receive_queue_size: int = 1024, acse_send_queue_size: int = 1024, **kwargs) -> Connection:
53async def connect(addr: tcp.Address,
54                  syntax_name_list: list[asn1.ObjectIdentifier],
55                  app_context_name: asn1.ObjectIdentifier,
56                  user_data: copp.IdentifiedEntity | None = None,
57                  *,
58                  local_ap_title: asn1.ObjectIdentifier | None = None,
59                  remote_ap_title: asn1.ObjectIdentifier | None = None,
60                  local_ae_qualifier: int | None = None,
61                  remote_ae_qualifier: int | None = None,
62                  acse_receive_queue_size: int = 1024,
63                  acse_send_queue_size: int = 1024,
64                  **kwargs
65                  ) -> 'Connection':
66    """Connect to ACSE server
67
68    Additional arguments are passed directly to `hat.drivers.copp.connect`
69    (`syntax_names` is set by this coroutine).
70
71    """
72    syntax_names = copp.SyntaxNames([_acse_syntax_name, *syntax_name_list])
73    aarq_apdu = _aarq_apdu(syntax_names, app_context_name,
74                           local_ap_title, remote_ap_title,
75                           local_ae_qualifier, remote_ae_qualifier,
76                           user_data)
77    copp_user_data = _acse_syntax_name, _encode(aarq_apdu)
78    conn = await copp.connect(addr, syntax_names, copp_user_data, **kwargs)
79
80    try:
81        aare_apdu_syntax_name, aare_apdu_entity = conn.conn_res_user_data
82        if aare_apdu_syntax_name != _acse_syntax_name:
83            raise Exception("invalid syntax name")
84
85        aare_apdu = _decode(aare_apdu_entity)
86        if aare_apdu[0] != 'aare' or aare_apdu[1]['result'] != 0:
87            raise Exception("invalid apdu")
88
89        calling_ap_title, called_ap_title = _get_ap_titles(aarq_apdu)
90        calling_ae_qualifier, called_ae_qualifier = _get_ae_qualifiers(
91            aarq_apdu)
92        return Connection(conn, aarq_apdu, aare_apdu,
93                          calling_ap_title, called_ap_title,
94                          calling_ae_qualifier, called_ae_qualifier,
95                          acse_receive_queue_size, acse_send_queue_size)
96
97    except Exception:
98        await aio.uncancellable(_close_copp(conn, _abrt_apdu(1)))
99        raise

Connect to ACSE server

Additional arguments are passed directly to hat.drivers.copp.connect (syntax_names is set by this coroutine).

async def listen( validate_cb: Callable[[hat.drivers.copp.SyntaxNames, tuple[tuple[int, ...], hat.asn1.common.Entity]], tuple[tuple[int, ...], hat.asn1.common.Entity] | None | Awaitable[tuple[tuple[int, ...], hat.asn1.common.Entity] | None]], connection_cb: Callable[[Connection], None | Awaitable[None]], addr: hat.drivers.tcp.Address = Address(host='0.0.0.0', port=102), *, bind_connections: bool = False, acse_receive_queue_size: int = 1024, acse_send_queue_size: int = 1024, **kwargs) -> Server:
102async def listen(validate_cb: ValidateCb,
103                 connection_cb: ConnectionCb,
104                 addr: tcp.Address = tcp.Address('0.0.0.0', 102),
105                 *,
106                 bind_connections: bool = False,
107                 acse_receive_queue_size: int = 1024,
108                 acse_send_queue_size: int = 1024,
109                 **kwargs
110                 ) -> 'Server':
111    """Create ACSE listening server
112
113    Additional arguments are passed directly to `hat.drivers.copp.listen`.
114
115    Args:
116        validate_cb: callback function or coroutine called on new
117            incomming connection request prior to creating connection object
118        connection_cb: new connection callback
119        addr: local listening address
120
121    """
122    server = Server()
123    server._validate_cb = validate_cb
124    server._connection_cb = connection_cb
125    server._bind_connections = bind_connections
126    server._receive_queue_size = acse_receive_queue_size
127    server._send_queue_size = acse_send_queue_size
128    server._log = _create_server_logger(kwargs.get('name'), None)
129
130    server._srv = await copp.listen(server._on_validate,
131                                    server._on_connection,
132                                    addr,
133                                    bind_connections=False,
134                                    **kwargs)
135
136    server._log = _create_server_logger(kwargs.get('name'), server._srv.info)
137
138    return server

Create ACSE listening server

Additional arguments are passed directly to hat.drivers.copp.listen.

Arguments:
  • validate_cb: callback function or coroutine called on new incomming connection request prior to creating connection object
  • connection_cb: new connection callback
  • addr: local listening address
class Server(hat.aio.group.Resource):
141class Server(aio.Resource):
142    """ACSE listening server
143
144    For creating new server see `listen`.
145
146    """
147
148    @property
149    def async_group(self) -> aio.Group:
150        """Async group"""
151        return self._srv.async_group
152
153    @property
154    def info(self) -> tcp.ServerInfo:
155        """Server info"""
156        return self._srv.info
157
158    async def _on_validate(self, syntax_names, user_data):
159        aarq_apdu_syntax_name, aarq_apdu_entity = user_data
160        if aarq_apdu_syntax_name != _acse_syntax_name:
161            raise Exception('invalid acse syntax name')
162
163        aarq_apdu = _decode(aarq_apdu_entity)
164        if aarq_apdu[0] != 'aarq':
165            raise Exception('not aarq message')
166
167        aarq_external = aarq_apdu[1]['user-information'][0]
168        if aarq_external.direct_ref is not None:
169            if aarq_external.direct_ref != _encoder.syntax_name:
170                raise Exception('invalid encoder identifier')
171
172        _, called_ap_title = _get_ap_titles(aarq_apdu)
173        _, called_ae_qualifier = _get_ae_qualifiers(aarq_apdu)
174        _, called_ap_invocation_identifier = \
175            _get_ap_invocation_identifiers(aarq_apdu)
176        _, called_ae_invocation_identifier = \
177            _get_ae_invocation_identifiers(aarq_apdu)
178
179        aarq_user_data = (syntax_names.get_name(aarq_external.indirect_ref),
180                          aarq_external.data)
181
182        user_validate_result = await aio.call(self._validate_cb, syntax_names,
183                                              aarq_user_data)
184
185        aare_apdu = _aare_apdu(syntax_names,
186                               user_validate_result,
187                               called_ap_title, called_ae_qualifier,
188                               called_ap_invocation_identifier,
189                               called_ae_invocation_identifier)
190        return _acse_syntax_name, _encode(aare_apdu)
191
192    async def _on_connection(self, copp_conn):
193        try:
194            try:
195                aarq_apdu = _decode(copp_conn.conn_req_user_data[1])
196                aare_apdu = _decode(copp_conn.conn_res_user_data[1])
197
198                calling_ap_title, called_ap_title = _get_ap_titles(aarq_apdu)
199                calling_ae_qualifier, called_ae_qualifier = _get_ae_qualifiers(
200                    aarq_apdu)
201
202                conn = Connection(copp_conn, aarq_apdu, aare_apdu,
203                                  called_ap_title, calling_ap_title,
204                                  called_ae_qualifier, calling_ae_qualifier,
205                                  self._receive_queue_size,
206                                  self._send_queue_size)
207
208            except Exception:
209                await aio.uncancellable(_close_copp(copp_conn, _abrt_apdu(1)))
210                raise
211
212            try:
213                await aio.call(self._connection_cb, conn)
214
215            except BaseException:
216                await aio.uncancellable(conn.async_close())
217                raise
218
219        except Exception as e:
220            self._log.error("error creating new incomming connection: %s",
221                            e, exc_info=e)
222            return
223
224        if not self._bind_connections:
225            return
226
227        try:
228            await conn.wait_closed()
229
230        except BaseException:
231            await aio.uncancellable(conn.async_close())
232            raise

ACSE listening server

For creating new server see listen.

async_group: hat.aio.group.Group
148    @property
149    def async_group(self) -> aio.Group:
150        """Async group"""
151        return self._srv.async_group

Async group

info: hat.drivers.tcp.ServerInfo
153    @property
154    def info(self) -> tcp.ServerInfo:
155        """Server info"""
156        return self._srv.info

Server info

class Connection(hat.aio.group.Resource):
235class Connection(aio.Resource):
236    """ACSE connection
237
238    For creating new connection see `connect` or `listen`.
239
240    """
241
242    def __init__(self,
243                 conn: copp.Connection,
244                 aarq_apdu: asn1.Value,
245                 aare_apdu: asn1.Value,
246                 local_ap_title: asn1.ObjectIdentifier | None,
247                 remote_ap_title: asn1.ObjectIdentifier | None,
248                 local_ae_qualifier: int | None,
249                 remote_ae_qualifier: int | None,
250                 receive_queue_size: int,
251                 send_queue_size: int):
252        aarq_external = aarq_apdu[1]['user-information'][0]
253        aare_external = aare_apdu[1]['user-information'][0]
254
255        conn_req_user_data = (
256            conn.syntax_names.get_name(aarq_external.indirect_ref),
257            aarq_external.data)
258        conn_res_user_data = (
259            conn.syntax_names.get_name(aare_external.indirect_ref),
260            aare_external.data)
261
262        self._conn = conn
263        self._conn_req_user_data = conn_req_user_data
264        self._conn_res_user_data = conn_res_user_data
265        self._loop = asyncio.get_running_loop()
266        self._info = ConnectionInfo(local_ap_title=local_ap_title,
267                                    local_ae_qualifier=local_ae_qualifier,
268                                    remote_ap_title=remote_ap_title,
269                                    remote_ae_qualifier=remote_ae_qualifier,
270                                    **conn.info._asdict())
271        self._close_apdu = _abrt_apdu(0)
272        self._receive_queue = aio.Queue(receive_queue_size)
273        self._send_queue = aio.Queue(send_queue_size)
274        self._async_group = aio.Group()
275        self._log = _create_connection_logger(self._info)
276
277        self.async_group.spawn(aio.call_on_cancel, self._on_close)
278        self.async_group.spawn(self._receive_loop)
279        self.async_group.spawn(self._send_loop)
280        self.async_group.spawn(aio.call_on_done, conn.wait_closing(),
281                               self.close)
282
283    @property
284    def async_group(self) -> aio.Group:
285        """Async group"""
286        return self._async_group
287
288    @property
289    def info(self) -> ConnectionInfo:
290        """Connection info"""
291        return self._info
292
293    @property
294    def conn_req_user_data(self) -> copp.IdentifiedEntity:
295        """Connect request's user data"""
296        return self._conn_req_user_data
297
298    @property
299    def conn_res_user_data(self) -> copp.IdentifiedEntity:
300        """Connect response's user data"""
301        return self._conn_res_user_data
302
303    async def receive(self) -> copp.IdentifiedEntity:
304        """Receive data"""
305        try:
306            return await self._receive_queue.get()
307
308        except aio.QueueClosedError:
309            raise ConnectionError()
310
311    async def send(self, data: copp.IdentifiedEntity):
312        """Send data"""
313        try:
314            await self._send_queue.put((data, None))
315
316        except aio.QueueClosedError:
317            raise ConnectionError()
318
319    async def drain(self):
320        """Drain output buffer"""
321        try:
322            future = self._loop.create_future()
323            await self._send_queue.put((None, future))
324            await future
325
326        except aio.QueueClosedError:
327            raise ConnectionError()
328
329    async def _on_close(self):
330        await _close_copp(self._conn, self._close_apdu)
331
332    def _close(self, apdu):
333        if not self.is_open:
334            return
335
336        self._close_apdu = apdu
337        self._async_group.close()
338
339    async def _receive_loop(self):
340        try:
341            while True:
342                syntax_name, entity = await self._conn.receive()
343
344                if syntax_name == _acse_syntax_name:
345                    if entity[0] == 'abrt':
346                        close_apdu = None
347
348                    elif entity[0] == 'rlrq':
349                        close_apdu = _rlre_apdu()
350
351                    else:
352                        close_apdu = _abrt_apdu(1)
353
354                    self._close(close_apdu)
355                    break
356
357                await self._receive_queue.put((syntax_name, entity))
358
359        except ConnectionError:
360            pass
361
362        except Exception as e:
363            self._log.error("receive loop error: %s", e, exc_info=e)
364
365        finally:
366            self._close(_abrt_apdu(1))
367            self._receive_queue.close()
368
369    async def _send_loop(self):
370        future = None
371        try:
372            while True:
373                data, future = await self._send_queue.get()
374
375                if data is None:
376                    await self._conn.drain()
377
378                else:
379                    await self._conn.send(data)
380
381                if future and not future.done():
382                    future.set_result(None)
383
384        except ConnectionError:
385            pass
386
387        except Exception as e:
388            self._log.error("send loop error: %s", e, exc_info=e)
389
390        finally:
391            self._close(_abrt_apdu(1))
392            self._send_queue.close()
393
394            while True:
395                if future and not future.done():
396                    future.set_result(None)
397                if self._send_queue.empty():
398                    break
399                _, future = self._send_queue.get_nowait()

ACSE connection

For creating new connection see connect or listen.

Connection( conn: hat.drivers.copp.Connection, aarq_apdu: Union[bool, int, Collection[bool], bytes, bytearray, memoryview, NoneType, tuple[int, ...], str, float, Tuple[str, ForwardRef('Value')], Dict[str, ForwardRef('Value')], Collection['Value'], hat.asn1.common.Entity, hat.asn1.common.External, hat.asn1.common.EmbeddedPDV], aare_apdu: Union[bool, int, Collection[bool], bytes, bytearray, memoryview, NoneType, tuple[int, ...], str, float, Tuple[str, ForwardRef('Value')], Dict[str, ForwardRef('Value')], Collection['Value'], hat.asn1.common.Entity, hat.asn1.common.External, hat.asn1.common.EmbeddedPDV], local_ap_title: tuple[int, ...] | None, remote_ap_title: tuple[int, ...] | None, local_ae_qualifier: int | None, remote_ae_qualifier: int | None, receive_queue_size: int, send_queue_size: int)
242    def __init__(self,
243                 conn: copp.Connection,
244                 aarq_apdu: asn1.Value,
245                 aare_apdu: asn1.Value,
246                 local_ap_title: asn1.ObjectIdentifier | None,
247                 remote_ap_title: asn1.ObjectIdentifier | None,
248                 local_ae_qualifier: int | None,
249                 remote_ae_qualifier: int | None,
250                 receive_queue_size: int,
251                 send_queue_size: int):
252        aarq_external = aarq_apdu[1]['user-information'][0]
253        aare_external = aare_apdu[1]['user-information'][0]
254
255        conn_req_user_data = (
256            conn.syntax_names.get_name(aarq_external.indirect_ref),
257            aarq_external.data)
258        conn_res_user_data = (
259            conn.syntax_names.get_name(aare_external.indirect_ref),
260            aare_external.data)
261
262        self._conn = conn
263        self._conn_req_user_data = conn_req_user_data
264        self._conn_res_user_data = conn_res_user_data
265        self._loop = asyncio.get_running_loop()
266        self._info = ConnectionInfo(local_ap_title=local_ap_title,
267                                    local_ae_qualifier=local_ae_qualifier,
268                                    remote_ap_title=remote_ap_title,
269                                    remote_ae_qualifier=remote_ae_qualifier,
270                                    **conn.info._asdict())
271        self._close_apdu = _abrt_apdu(0)
272        self._receive_queue = aio.Queue(receive_queue_size)
273        self._send_queue = aio.Queue(send_queue_size)
274        self._async_group = aio.Group()
275        self._log = _create_connection_logger(self._info)
276
277        self.async_group.spawn(aio.call_on_cancel, self._on_close)
278        self.async_group.spawn(self._receive_loop)
279        self.async_group.spawn(self._send_loop)
280        self.async_group.spawn(aio.call_on_done, conn.wait_closing(),
281                               self.close)
async_group: hat.aio.group.Group
283    @property
284    def async_group(self) -> aio.Group:
285        """Async group"""
286        return self._async_group

Async group

info: ConnectionInfo
288    @property
289    def info(self) -> ConnectionInfo:
290        """Connection info"""
291        return self._info

Connection info

conn_req_user_data: tuple[tuple[int, ...], hat.asn1.common.Entity]
293    @property
294    def conn_req_user_data(self) -> copp.IdentifiedEntity:
295        """Connect request's user data"""
296        return self._conn_req_user_data

Connect request's user data

conn_res_user_data: tuple[tuple[int, ...], hat.asn1.common.Entity]
298    @property
299    def conn_res_user_data(self) -> copp.IdentifiedEntity:
300        """Connect response's user data"""
301        return self._conn_res_user_data

Connect response's user data

async def receive(self) -> tuple[tuple[int, ...], hat.asn1.common.Entity]:
303    async def receive(self) -> copp.IdentifiedEntity:
304        """Receive data"""
305        try:
306            return await self._receive_queue.get()
307
308        except aio.QueueClosedError:
309            raise ConnectionError()

Receive data

async def send(self, data: tuple[tuple[int, ...], hat.asn1.common.Entity]):
311    async def send(self, data: copp.IdentifiedEntity):
312        """Send data"""
313        try:
314            await self._send_queue.put((data, None))
315
316        except aio.QueueClosedError:
317            raise ConnectionError()

Send data

async def drain(self):
319    async def drain(self):
320        """Drain output buffer"""
321        try:
322            future = self._loop.create_future()
323            await self._send_queue.put((None, future))
324            await future
325
326        except aio.QueueClosedError:
327            raise ConnectionError()

Drain output buffer