hat.drivers.copp

Connection oriented presentation protocol

  1"""Connection oriented presentation protocol"""
  2
  3import asyncio
  4import importlib.resources
  5import logging
  6import typing
  7
  8from hat import aio
  9from hat import asn1
 10from hat import json
 11
 12from hat.drivers import cosp
 13from hat.drivers import tcp
 14
 15
 16mlog = logging.getLogger(__name__)
 17
 18with importlib.resources.open_text(__package__, 'asn1_repo.json') as _f:
 19    _encoder = asn1.ber.BerEncoder(
 20        asn1.repository_from_json(
 21            json.decode_stream(_f)))
 22
 23
 24class ConnectionInfo(typing.NamedTuple):
 25    name: str | None
 26    local_addr: tcp.Address
 27    local_tsel: int | None
 28    local_ssel: int | None
 29    local_psel: int | None
 30    remote_addr: tcp.Address
 31    remote_tsel: int | None
 32    remote_ssel: int | None
 33    remote_psel: int | None
 34
 35
 36IdentifiedEntity: typing.TypeAlias = tuple[asn1.ObjectIdentifier, asn1.Entity]
 37"""Identified entity"""
 38
 39ValidateCb: typing.TypeAlias = aio.AsyncCallable[['SyntaxNames',
 40                                                  IdentifiedEntity],
 41                                                 IdentifiedEntity | None]
 42"""Validate callback"""
 43
 44ConnectionCb: typing.TypeAlias = aio.AsyncCallable[['Connection'], None]
 45"""Connection callback"""
 46
 47
 48class SyntaxNames:
 49    """Syntax name registry
 50
 51    Args:
 52        syntax_names: list of ASN.1 ObjectIdentifiers representing syntax names
 53
 54    """
 55
 56    def __init__(self, syntax_names: list[asn1.ObjectIdentifier]):
 57        self._syntax_id_names = {(i * 2 + 1): name
 58                                 for i, name in enumerate(syntax_names)}
 59        self._syntax_name_ids = {v: k
 60                                 for k, v in self._syntax_id_names.items()}
 61
 62    def get_name(self, syntax_id: int) -> asn1.ObjectIdentifier:
 63        """Get syntax name associated with id"""
 64        return self._syntax_id_names[syntax_id]
 65
 66    def get_id(self, syntax_name: asn1.ObjectIdentifier) -> int:
 67        """Get syntax id associated with name"""
 68        return self._syntax_name_ids[syntax_name]
 69
 70
 71async def connect(addr: tcp.Address,
 72                  syntax_names: SyntaxNames,
 73                  user_data: IdentifiedEntity | None = None,
 74                  *,
 75                  local_psel: int | None = None,
 76                  remote_psel: int | None = None,
 77                  copp_receive_queue_size: int = 1024,
 78                  copp_send_queue_size: int = 1024,
 79                  **kwargs
 80                  ) -> 'Connection':
 81    """Connect to COPP server
 82
 83    Additional arguments are passed directly to `hat.drivers.cosp.connect`.
 84
 85    """
 86    log = _create_connection_logger(kwargs.get('name'), None)
 87    cp_ppdu = _cp_ppdu(syntax_names, local_psel, remote_psel, user_data)
 88    cp_ppdu_data = _encode('CP-type', cp_ppdu)
 89    conn = await cosp.connect(addr, cp_ppdu_data, **kwargs)
 90
 91    try:
 92        cpa_ppdu = _decode('CPA-PPDU', conn.conn_res_user_data)
 93        _validate_connect_response(cp_ppdu, cpa_ppdu)
 94
 95        calling_psel, called_psel = _get_psels(cp_ppdu)
 96        return Connection(conn, syntax_names, cp_ppdu, cpa_ppdu,
 97                          calling_psel, called_psel,
 98                          copp_receive_queue_size, copp_send_queue_size)
 99
100    except Exception:
101        await aio.uncancellable(_close_cosp(conn, _arp_ppdu(), log))
102        raise
103
104
105async def listen(validate_cb: ValidateCb,
106                 connection_cb: ConnectionCb,
107                 addr: tcp.Address = tcp.Address('0.0.0.0', 102),
108                 *,
109                 bind_connections: bool = False,
110                 copp_receive_queue_size: int = 1024,
111                 copp_send_queue_size: int = 1024,
112                 **kwargs
113                 ) -> 'Server':
114    """Create COPP listening server
115
116    Additional arguments are passed directly to `hat.drivers.cosp.listen`.
117
118    Args:
119        validate_cb: callback function or coroutine called on new
120            incomming connection request prior to creating connection object
121        connection_cb: new connection callback
122        addr: local listening address
123
124    """
125    server = Server()
126    server._validate_cb = validate_cb
127    server._connection_cb = connection_cb
128    server._bind_connections = bind_connections
129    server._receive_queue_size = copp_receive_queue_size
130    server._send_queue_size = copp_send_queue_size
131    server._log = _create_server_logger(kwargs.get('name'), None)
132
133    server._srv = await cosp.listen(server._on_validate,
134                                    server._on_connection,
135                                    addr,
136                                    bind_connections=False,
137                                    **kwargs)
138
139    server._log = _create_server_logger(kwargs.get('name'), server._srv.info)
140
141    return server
142
143
144class Server(aio.Resource):
145    """COPP listening server
146
147    For creating new server see `listen`.
148
149    """
150
151    @property
152    def async_group(self) -> aio.Group:
153        """Async group"""
154        return self._srv.async_group
155
156    @property
157    def info(self) -> tcp.ServerInfo:
158        """Server info"""
159        return self._srv.info
160
161    async def _on_validate(self, user_data):
162        cp_ppdu = _decode('CP-type', user_data)
163        cp_params = cp_ppdu['normal-mode-parameters']
164        called_psel_data = cp_params.get('called-presentation-selector')
165        called_psel = (int.from_bytes(called_psel_data, 'big')
166                       if called_psel_data else None)
167        cp_pdv_list = cp_params['user-data'][1][0]
168        syntax_names = _sytax_names_from_cp_ppdu(cp_ppdu)
169        cp_user_data = (
170            syntax_names.get_name(
171                cp_pdv_list['presentation-context-identifier']),
172            cp_pdv_list['presentation-data-values'][1])
173
174        cpa_user_data = await aio.call(self._validate_cb, syntax_names,
175                                       cp_user_data)
176
177        cpa_ppdu = _cpa_ppdu(syntax_names, called_psel, cpa_user_data)
178        cpa_ppdu_data = _encode('CPA-PPDU', cpa_ppdu)
179        return cpa_ppdu_data
180
181    async def _on_connection(self, cosp_conn):
182        try:
183            try:
184                cp_ppdu = _decode('CP-type', cosp_conn.conn_req_user_data)
185                cpa_ppdu = _decode('CPA-PPDU', cosp_conn.conn_res_user_data)
186
187                syntax_names = _sytax_names_from_cp_ppdu(cp_ppdu)
188                calling_psel, called_psel = _get_psels(cp_ppdu)
189
190                conn = Connection(cosp_conn, syntax_names, cp_ppdu, cpa_ppdu,
191                                  called_psel, calling_psel,
192                                  self._receive_queue_size,
193                                  self._send_queue_size)
194
195            except Exception:
196                await aio.uncancellable(
197                    _close_cosp(cosp_conn, _arp_ppdu(), self._log))
198                raise
199
200            try:
201                await aio.call(self._connection_cb, conn)
202
203            except BaseException:
204                await aio.uncancellable(conn.async_close())
205                raise
206
207        except Exception as e:
208            self._log.error("error creating new incomming connection: %s",
209                            e, exc_info=e)
210            return
211
212        if not self._bind_connections:
213            return
214
215        try:
216            await conn.wait_closed()
217
218        except BaseException:
219            await aio.uncancellable(conn.async_close())
220            raise
221
222
223class Connection(aio.Resource):
224    """COPP connection
225
226    For creating new connection see `connect` or `listen`.
227
228    """
229
230    def __init__(self,
231                 conn: cosp.Connection,
232                 syntax_names: SyntaxNames,
233                 cp_ppdu: asn1.Value,
234                 cpa_ppdu: asn1.Value,
235                 local_psel: int | None,
236                 remote_psel: int | None,
237                 receive_queue_size: int,
238                 send_queue_size: int):
239        cp_user_data = cp_ppdu['normal-mode-parameters']['user-data']
240        cpa_user_data = cpa_ppdu['normal-mode-parameters']['user-data']
241
242        conn_req_user_data = (
243            syntax_names.get_name(
244                cp_user_data[1][0]['presentation-context-identifier']),
245            cp_user_data[1][0]['presentation-data-values'][1])
246        conn_res_user_data = (
247            syntax_names.get_name(
248                cpa_user_data[1][0]['presentation-context-identifier']),
249            cpa_user_data[1][0]['presentation-data-values'][1])
250
251        self._conn = conn
252        self._syntax_names = syntax_names
253        self._conn_req_user_data = conn_req_user_data
254        self._conn_res_user_data = conn_res_user_data
255        self._loop = asyncio.get_running_loop()
256        self._info = ConnectionInfo(local_psel=local_psel,
257                                    remote_psel=remote_psel,
258                                    **conn.info._asdict())
259        self._close_ppdu = _arp_ppdu()
260        self._receive_queue = aio.Queue(receive_queue_size)
261        self._send_queue = aio.Queue(send_queue_size)
262        self._async_group = aio.Group()
263        self._log = _create_connection_logger(self._info.name, self._info)
264
265        self.async_group.spawn(aio.call_on_cancel, self._on_close)
266        self.async_group.spawn(self._receive_loop)
267        self.async_group.spawn(self._send_loop)
268        self.async_group.spawn(aio.call_on_done, conn.wait_closing(),
269                               self.close)
270
271    @property
272    def async_group(self) -> aio.Group:
273        """Async group"""
274        return self._async_group
275
276    @property
277    def info(self) -> ConnectionInfo:
278        """Connection info"""
279        return self._info
280
281    @property
282    def syntax_names(self) -> SyntaxNames:
283        """Syntax names"""
284        return self._syntax_names
285
286    @property
287    def conn_req_user_data(self) -> IdentifiedEntity:
288        """Connect request's user data"""
289        return self._conn_req_user_data
290
291    @property
292    def conn_res_user_data(self) -> IdentifiedEntity:
293        """Connect response's user data"""
294        return self._conn_res_user_data
295
296    def close(self, user_data: IdentifiedEntity | None = None):
297        """Close connection"""
298        self._close(_aru_ppdu(self._syntax_names, user_data))
299
300    async def async_close(self, user_data: IdentifiedEntity | None = None):
301        """Async close"""
302        self.close(user_data)
303        await self.wait_closed()
304
305    async def receive(self) -> IdentifiedEntity:
306        """Receive data"""
307        try:
308            return await self._receive_queue.get()
309
310        except aio.QueueClosedError:
311            raise ConnectionError()
312
313    async def send(self, data: IdentifiedEntity):
314        """Send data"""
315        try:
316            await self._send_queue.put((data, None))
317
318        except aio.QueueClosedError:
319            raise ConnectionError()
320
321    async def drain(self):
322        """Drain output buffer"""
323        try:
324            future = self._loop.create_future()
325            await self._send_queue.put((None, future))
326            await future
327
328        except aio.QueueClosedError:
329            raise ConnectionError()
330
331    async def _on_close(self):
332        await _close_cosp(self._conn, self._close_ppdu, self._log)
333
334    def _close(self, ppdu):
335        if not self.is_open:
336            return
337
338        self._close_ppdu = ppdu
339        self._async_group.close()
340
341    async def _receive_loop(self):
342        try:
343            while True:
344                cosp_data = await self._conn.receive()
345
346                user_data = _decode('User-data', cosp_data)
347
348                pdv_list = user_data[1][0]
349                syntax_name = self._syntax_names.get_name(
350                    pdv_list['presentation-context-identifier'])
351                data = pdv_list['presentation-data-values'][1]
352
353                await self._receive_queue.put((syntax_name, data))
354
355        except ConnectionError:
356            pass
357
358        except Exception as e:
359            self._log.error("receive loop error: %s", e, exc_info=e)
360
361        finally:
362            self._close(_arp_ppdu())
363            self._receive_queue.close()
364
365    async def _send_loop(self):
366        future = None
367        try:
368            while True:
369                data, future = await self._send_queue.get()
370
371                if data is None:
372                    await self._conn.drain()
373
374                else:
375                    user_data = _user_data(self._syntax_names, data)
376                    ppdu_data = _encode('User-data', user_data)
377
378                    await self._conn.send(ppdu_data)
379
380                if future and not future.done():
381                    future.set_result(None)
382
383        except ConnectionError:
384            pass
385
386        except Exception as e:
387            self._log.error("send loop error: %s", e, exc_info=e)
388
389        finally:
390            self._close(_arp_ppdu())
391            self._send_queue.close()
392
393            while True:
394                if future and not future.done():
395                    future.set_result(None)
396                if self._send_queue.empty():
397                    break
398                _, future = self._send_queue.get_nowait()
399
400
401async def _close_cosp(cosp_conn, ppdu, log):
402    try:
403        data = _encode('Abort-type', ppdu)
404
405    except Exception as e:
406        log.error("error encoding abort ppdu: %s", e, exc_info=e)
407        data = None
408
409    finally:
410        await cosp_conn.async_close(data)
411
412
413def _get_psels(cp_ppdu):
414    cp_params = cp_ppdu['normal-mode-parameters']
415    calling_psel_data = cp_params.get('calling-presentation-selector')
416    calling_psel = (int.from_bytes(calling_psel_data, 'big')
417                    if calling_psel_data else None)
418    called_psel_data = cp_params.get('called-presentation-selector')
419    called_psel = (int.from_bytes(called_psel_data, 'big')
420                   if called_psel_data else None)
421    return calling_psel, called_psel
422
423
424def _validate_connect_response(cp_ppdu, cpa_ppdu):
425    cp_params = cp_ppdu['normal-mode-parameters']
426    cpa_params = cpa_ppdu['normal-mode-parameters']
427    called_psel_data = cp_params.get('called-presentation-selector')
428    responding_psel_data = cpa_params.get('responding-presentation-selector')
429
430    if called_psel_data and responding_psel_data:
431        called_psel = int.from_bytes(called_psel_data, 'big')
432        responding_psel = int.from_bytes(responding_psel_data, 'big')
433
434        if called_psel != responding_psel:
435            raise Exception('presentation selectors not matching')
436
437    result_list = cpa_params['presentation-context-definition-result-list']
438    if any(i['result'] != 0 for i in result_list):
439        raise Exception('presentation context not accepted')
440
441
442def _cp_ppdu(syntax_names, calling_psel, called_psel, user_data):
443    cp_params = {
444        'presentation-context-definition-list': [
445            {'presentation-context-identifier': i,
446             'abstract-syntax-name': name,
447             'transfer-syntax-name-list': [_encoder.syntax_name]}
448            for i, name in syntax_names._syntax_id_names.items()]}
449
450    if calling_psel is not None:
451        cp_params['calling-presentation-selector'] = \
452            calling_psel.to_bytes(4, 'big')
453
454    if called_psel is not None:
455        cp_params['called-presentation-selector'] = \
456            called_psel.to_bytes(4, 'big')
457
458    if user_data:
459        cp_params['user-data'] = _user_data(syntax_names, user_data)
460
461    return {
462        'mode-selector': {
463            'mode-value': 1},
464        'normal-mode-parameters': cp_params}
465
466
467def _cpa_ppdu(syntax_names, responding_psel, user_data):
468    cpa_params = {
469        'presentation-context-definition-result-list': [
470            {'result': 0,
471             'transfer-syntax-name': _encoder.syntax_name}
472            for _ in syntax_names._syntax_id_names.keys()]}
473
474    if responding_psel is not None:
475        cpa_params['responding-presentation-selector'] = \
476            responding_psel.to_bytes(4, 'big')
477
478    if user_data:
479        cpa_params['user-data'] = _user_data(syntax_names, user_data)
480
481    return {
482        'mode-selector': {
483            'mode-value': 1},
484        'normal-mode-parameters': cpa_params}
485
486
487def _aru_ppdu(syntax_names, user_data):
488    aru_params = {}
489
490    if user_data:
491        aru_params['user-data'] = _user_data(syntax_names, user_data)
492
493    return 'aru-ppdu', ('normal-mode-parameters', aru_params)
494
495
496def _arp_ppdu():
497    return 'arp-ppdu', {}
498
499
500def _user_data(syntax_names, user_data):
501    return 'fully-encoded-data', [{
502        'presentation-context-identifier': syntax_names.get_id(user_data[0]),
503        'presentation-data-values': (
504            'single-ASN1-type', user_data[1])}]
505
506
507def _sytax_names_from_cp_ppdu(cp_ppdu):
508    cp_params = cp_ppdu['normal-mode-parameters']
509    syntax_names = SyntaxNames([])
510    syntax_names._syntax_id_names = {
511        i['presentation-context-identifier']: i['abstract-syntax-name']
512        for i in cp_params['presentation-context-definition-list']}
513    syntax_names._syntax_name_ids = {
514        v: k for k, v in syntax_names._syntax_id_names.items()}
515    return syntax_names
516
517
518def _encode(name, value):
519    return _encoder.encode(asn1.TypeRef('ISO8823-PRESENTATION', name), value)
520
521
522def _decode(name, data):
523    res, _ = _encoder.decode(asn1.TypeRef('ISO8823-PRESENTATION', name),
524                             memoryview(data))
525    return res
526
527
528def _create_server_logger(name, info):
529    extra = {'meta': {'type': 'CoppServer',
530                      'name': name}}
531
532    if info is not None:
533        extra['meta']['addresses'] = [{'host': addr.host,
534                                       'port': addr.port}
535                                      for addr in info.addresses]
536
537    return logging.LoggerAdapter(mlog, extra)
538
539
540def _create_connection_logger(name, info):
541    extra = {'meta': {'type': 'CoppConnection',
542                      'name': name}}
543
544    if info is not None:
545        extra['meta']['local_addr'] = {'host': info.local_addr.host,
546                                       'port': info.local_addr.port}
547        extra['meta']['remote_addr'] = {'host': info.remote_addr.host,
548                                        'port': info.remote_addr.port}
549
550    return logging.LoggerAdapter(mlog, extra)
mlog = <Logger hat.drivers.copp (WARNING)>
class ConnectionInfo(typing.NamedTuple):
25class ConnectionInfo(typing.NamedTuple):
26    name: str | None
27    local_addr: tcp.Address
28    local_tsel: int | None
29    local_ssel: int | None
30    local_psel: int | None
31    remote_addr: tcp.Address
32    remote_tsel: int | None
33    remote_ssel: int | None
34    remote_psel: int | None

ConnectionInfo(name, local_addr, local_tsel, local_ssel, local_psel, remote_addr, remote_tsel, remote_ssel, remote_psel)

ConnectionInfo( name: str | None, local_addr: hat.drivers.tcp.Address, local_tsel: int | None, local_ssel: int | None, local_psel: int | None, remote_addr: hat.drivers.tcp.Address, remote_tsel: int | None, remote_ssel: int | None, remote_psel: int | None)

Create new instance of ConnectionInfo(name, local_addr, local_tsel, local_ssel, local_psel, remote_addr, remote_tsel, remote_ssel, remote_psel)

name: str | None

Alias for field number 0

Alias for field number 1

local_tsel: int | None

Alias for field number 2

local_ssel: int | None

Alias for field number 3

local_psel: int | None

Alias for field number 4

Alias for field number 5

remote_tsel: int | None

Alias for field number 6

remote_ssel: int | None

Alias for field number 7

remote_psel: int | None

Alias for field number 8

IdentifiedEntity: TypeAlias = tuple[tuple[int, ...], hat.asn1.common.Entity]

Identified entity

ValidateCb: TypeAlias = Callable[[ForwardRef('SyntaxNames'), tuple[tuple[int, ...], hat.asn1.common.Entity]], tuple[tuple[int, ...], hat.asn1.common.Entity] | None | Awaitable[tuple[tuple[int, ...], hat.asn1.common.Entity] | None]]

Validate callback

ConnectionCb: TypeAlias = Callable[[ForwardRef('Connection')], None | Awaitable[None]]

Connection callback

class SyntaxNames:
49class SyntaxNames:
50    """Syntax name registry
51
52    Args:
53        syntax_names: list of ASN.1 ObjectIdentifiers representing syntax names
54
55    """
56
57    def __init__(self, syntax_names: list[asn1.ObjectIdentifier]):
58        self._syntax_id_names = {(i * 2 + 1): name
59                                 for i, name in enumerate(syntax_names)}
60        self._syntax_name_ids = {v: k
61                                 for k, v in self._syntax_id_names.items()}
62
63    def get_name(self, syntax_id: int) -> asn1.ObjectIdentifier:
64        """Get syntax name associated with id"""
65        return self._syntax_id_names[syntax_id]
66
67    def get_id(self, syntax_name: asn1.ObjectIdentifier) -> int:
68        """Get syntax id associated with name"""
69        return self._syntax_name_ids[syntax_name]

Syntax name registry

Arguments:
  • syntax_names: list of ASN.1 ObjectIdentifiers representing syntax names
SyntaxNames(syntax_names: list[tuple[int, ...]])
57    def __init__(self, syntax_names: list[asn1.ObjectIdentifier]):
58        self._syntax_id_names = {(i * 2 + 1): name
59                                 for i, name in enumerate(syntax_names)}
60        self._syntax_name_ids = {v: k
61                                 for k, v in self._syntax_id_names.items()}
def get_name(self, syntax_id: int) -> tuple[int, ...]:
63    def get_name(self, syntax_id: int) -> asn1.ObjectIdentifier:
64        """Get syntax name associated with id"""
65        return self._syntax_id_names[syntax_id]

Get syntax name associated with id

def get_id(self, syntax_name: tuple[int, ...]) -> int:
67    def get_id(self, syntax_name: asn1.ObjectIdentifier) -> int:
68        """Get syntax id associated with name"""
69        return self._syntax_name_ids[syntax_name]

Get syntax id associated with name

async def connect( addr: hat.drivers.tcp.Address, syntax_names: SyntaxNames, user_data: tuple[tuple[int, ...], hat.asn1.common.Entity] | None = None, *, local_psel: int | None = None, remote_psel: int | None = None, copp_receive_queue_size: int = 1024, copp_send_queue_size: int = 1024, **kwargs) -> Connection:
 72async def connect(addr: tcp.Address,
 73                  syntax_names: SyntaxNames,
 74                  user_data: IdentifiedEntity | None = None,
 75                  *,
 76                  local_psel: int | None = None,
 77                  remote_psel: int | None = None,
 78                  copp_receive_queue_size: int = 1024,
 79                  copp_send_queue_size: int = 1024,
 80                  **kwargs
 81                  ) -> 'Connection':
 82    """Connect to COPP server
 83
 84    Additional arguments are passed directly to `hat.drivers.cosp.connect`.
 85
 86    """
 87    log = _create_connection_logger(kwargs.get('name'), None)
 88    cp_ppdu = _cp_ppdu(syntax_names, local_psel, remote_psel, user_data)
 89    cp_ppdu_data = _encode('CP-type', cp_ppdu)
 90    conn = await cosp.connect(addr, cp_ppdu_data, **kwargs)
 91
 92    try:
 93        cpa_ppdu = _decode('CPA-PPDU', conn.conn_res_user_data)
 94        _validate_connect_response(cp_ppdu, cpa_ppdu)
 95
 96        calling_psel, called_psel = _get_psels(cp_ppdu)
 97        return Connection(conn, syntax_names, cp_ppdu, cpa_ppdu,
 98                          calling_psel, called_psel,
 99                          copp_receive_queue_size, copp_send_queue_size)
100
101    except Exception:
102        await aio.uncancellable(_close_cosp(conn, _arp_ppdu(), log))
103        raise

Connect to COPP server

Additional arguments are passed directly to hat.drivers.cosp.connect.

async def listen( validate_cb: Callable[[SyntaxNames, tuple[tuple[int, ...], hat.asn1.common.Entity]], tuple[tuple[int, ...], hat.asn1.common.Entity] | None | Awaitable[tuple[tuple[int, ...], hat.asn1.common.Entity] | None]], connection_cb: Callable[[Connection], None | Awaitable[None]], addr: hat.drivers.tcp.Address = Address(host='0.0.0.0', port=102), *, bind_connections: bool = False, copp_receive_queue_size: int = 1024, copp_send_queue_size: int = 1024, **kwargs) -> Server:
106async def listen(validate_cb: ValidateCb,
107                 connection_cb: ConnectionCb,
108                 addr: tcp.Address = tcp.Address('0.0.0.0', 102),
109                 *,
110                 bind_connections: bool = False,
111                 copp_receive_queue_size: int = 1024,
112                 copp_send_queue_size: int = 1024,
113                 **kwargs
114                 ) -> 'Server':
115    """Create COPP listening server
116
117    Additional arguments are passed directly to `hat.drivers.cosp.listen`.
118
119    Args:
120        validate_cb: callback function or coroutine called on new
121            incomming connection request prior to creating connection object
122        connection_cb: new connection callback
123        addr: local listening address
124
125    """
126    server = Server()
127    server._validate_cb = validate_cb
128    server._connection_cb = connection_cb
129    server._bind_connections = bind_connections
130    server._receive_queue_size = copp_receive_queue_size
131    server._send_queue_size = copp_send_queue_size
132    server._log = _create_server_logger(kwargs.get('name'), None)
133
134    server._srv = await cosp.listen(server._on_validate,
135                                    server._on_connection,
136                                    addr,
137                                    bind_connections=False,
138                                    **kwargs)
139
140    server._log = _create_server_logger(kwargs.get('name'), server._srv.info)
141
142    return server

Create COPP listening server

Additional arguments are passed directly to hat.drivers.cosp.listen.

Arguments:
  • validate_cb: callback function or coroutine called on new incomming connection request prior to creating connection object
  • connection_cb: new connection callback
  • addr: local listening address
class Server(hat.aio.group.Resource):
145class Server(aio.Resource):
146    """COPP listening server
147
148    For creating new server see `listen`.
149
150    """
151
152    @property
153    def async_group(self) -> aio.Group:
154        """Async group"""
155        return self._srv.async_group
156
157    @property
158    def info(self) -> tcp.ServerInfo:
159        """Server info"""
160        return self._srv.info
161
162    async def _on_validate(self, user_data):
163        cp_ppdu = _decode('CP-type', user_data)
164        cp_params = cp_ppdu['normal-mode-parameters']
165        called_psel_data = cp_params.get('called-presentation-selector')
166        called_psel = (int.from_bytes(called_psel_data, 'big')
167                       if called_psel_data else None)
168        cp_pdv_list = cp_params['user-data'][1][0]
169        syntax_names = _sytax_names_from_cp_ppdu(cp_ppdu)
170        cp_user_data = (
171            syntax_names.get_name(
172                cp_pdv_list['presentation-context-identifier']),
173            cp_pdv_list['presentation-data-values'][1])
174
175        cpa_user_data = await aio.call(self._validate_cb, syntax_names,
176                                       cp_user_data)
177
178        cpa_ppdu = _cpa_ppdu(syntax_names, called_psel, cpa_user_data)
179        cpa_ppdu_data = _encode('CPA-PPDU', cpa_ppdu)
180        return cpa_ppdu_data
181
182    async def _on_connection(self, cosp_conn):
183        try:
184            try:
185                cp_ppdu = _decode('CP-type', cosp_conn.conn_req_user_data)
186                cpa_ppdu = _decode('CPA-PPDU', cosp_conn.conn_res_user_data)
187
188                syntax_names = _sytax_names_from_cp_ppdu(cp_ppdu)
189                calling_psel, called_psel = _get_psels(cp_ppdu)
190
191                conn = Connection(cosp_conn, syntax_names, cp_ppdu, cpa_ppdu,
192                                  called_psel, calling_psel,
193                                  self._receive_queue_size,
194                                  self._send_queue_size)
195
196            except Exception:
197                await aio.uncancellable(
198                    _close_cosp(cosp_conn, _arp_ppdu(), self._log))
199                raise
200
201            try:
202                await aio.call(self._connection_cb, conn)
203
204            except BaseException:
205                await aio.uncancellable(conn.async_close())
206                raise
207
208        except Exception as e:
209            self._log.error("error creating new incomming connection: %s",
210                            e, exc_info=e)
211            return
212
213        if not self._bind_connections:
214            return
215
216        try:
217            await conn.wait_closed()
218
219        except BaseException:
220            await aio.uncancellable(conn.async_close())
221            raise

COPP listening server

For creating new server see listen.

async_group: hat.aio.group.Group
152    @property
153    def async_group(self) -> aio.Group:
154        """Async group"""
155        return self._srv.async_group

Async group

info: hat.drivers.tcp.ServerInfo
157    @property
158    def info(self) -> tcp.ServerInfo:
159        """Server info"""
160        return self._srv.info

Server info

class Connection(hat.aio.group.Resource):
224class Connection(aio.Resource):
225    """COPP connection
226
227    For creating new connection see `connect` or `listen`.
228
229    """
230
231    def __init__(self,
232                 conn: cosp.Connection,
233                 syntax_names: SyntaxNames,
234                 cp_ppdu: asn1.Value,
235                 cpa_ppdu: asn1.Value,
236                 local_psel: int | None,
237                 remote_psel: int | None,
238                 receive_queue_size: int,
239                 send_queue_size: int):
240        cp_user_data = cp_ppdu['normal-mode-parameters']['user-data']
241        cpa_user_data = cpa_ppdu['normal-mode-parameters']['user-data']
242
243        conn_req_user_data = (
244            syntax_names.get_name(
245                cp_user_data[1][0]['presentation-context-identifier']),
246            cp_user_data[1][0]['presentation-data-values'][1])
247        conn_res_user_data = (
248            syntax_names.get_name(
249                cpa_user_data[1][0]['presentation-context-identifier']),
250            cpa_user_data[1][0]['presentation-data-values'][1])
251
252        self._conn = conn
253        self._syntax_names = syntax_names
254        self._conn_req_user_data = conn_req_user_data
255        self._conn_res_user_data = conn_res_user_data
256        self._loop = asyncio.get_running_loop()
257        self._info = ConnectionInfo(local_psel=local_psel,
258                                    remote_psel=remote_psel,
259                                    **conn.info._asdict())
260        self._close_ppdu = _arp_ppdu()
261        self._receive_queue = aio.Queue(receive_queue_size)
262        self._send_queue = aio.Queue(send_queue_size)
263        self._async_group = aio.Group()
264        self._log = _create_connection_logger(self._info.name, self._info)
265
266        self.async_group.spawn(aio.call_on_cancel, self._on_close)
267        self.async_group.spawn(self._receive_loop)
268        self.async_group.spawn(self._send_loop)
269        self.async_group.spawn(aio.call_on_done, conn.wait_closing(),
270                               self.close)
271
272    @property
273    def async_group(self) -> aio.Group:
274        """Async group"""
275        return self._async_group
276
277    @property
278    def info(self) -> ConnectionInfo:
279        """Connection info"""
280        return self._info
281
282    @property
283    def syntax_names(self) -> SyntaxNames:
284        """Syntax names"""
285        return self._syntax_names
286
287    @property
288    def conn_req_user_data(self) -> IdentifiedEntity:
289        """Connect request's user data"""
290        return self._conn_req_user_data
291
292    @property
293    def conn_res_user_data(self) -> IdentifiedEntity:
294        """Connect response's user data"""
295        return self._conn_res_user_data
296
297    def close(self, user_data: IdentifiedEntity | None = None):
298        """Close connection"""
299        self._close(_aru_ppdu(self._syntax_names, user_data))
300
301    async def async_close(self, user_data: IdentifiedEntity | None = None):
302        """Async close"""
303        self.close(user_data)
304        await self.wait_closed()
305
306    async def receive(self) -> IdentifiedEntity:
307        """Receive data"""
308        try:
309            return await self._receive_queue.get()
310
311        except aio.QueueClosedError:
312            raise ConnectionError()
313
314    async def send(self, data: IdentifiedEntity):
315        """Send data"""
316        try:
317            await self._send_queue.put((data, None))
318
319        except aio.QueueClosedError:
320            raise ConnectionError()
321
322    async def drain(self):
323        """Drain output buffer"""
324        try:
325            future = self._loop.create_future()
326            await self._send_queue.put((None, future))
327            await future
328
329        except aio.QueueClosedError:
330            raise ConnectionError()
331
332    async def _on_close(self):
333        await _close_cosp(self._conn, self._close_ppdu, self._log)
334
335    def _close(self, ppdu):
336        if not self.is_open:
337            return
338
339        self._close_ppdu = ppdu
340        self._async_group.close()
341
342    async def _receive_loop(self):
343        try:
344            while True:
345                cosp_data = await self._conn.receive()
346
347                user_data = _decode('User-data', cosp_data)
348
349                pdv_list = user_data[1][0]
350                syntax_name = self._syntax_names.get_name(
351                    pdv_list['presentation-context-identifier'])
352                data = pdv_list['presentation-data-values'][1]
353
354                await self._receive_queue.put((syntax_name, data))
355
356        except ConnectionError:
357            pass
358
359        except Exception as e:
360            self._log.error("receive loop error: %s", e, exc_info=e)
361
362        finally:
363            self._close(_arp_ppdu())
364            self._receive_queue.close()
365
366    async def _send_loop(self):
367        future = None
368        try:
369            while True:
370                data, future = await self._send_queue.get()
371
372                if data is None:
373                    await self._conn.drain()
374
375                else:
376                    user_data = _user_data(self._syntax_names, data)
377                    ppdu_data = _encode('User-data', user_data)
378
379                    await self._conn.send(ppdu_data)
380
381                if future and not future.done():
382                    future.set_result(None)
383
384        except ConnectionError:
385            pass
386
387        except Exception as e:
388            self._log.error("send loop error: %s", e, exc_info=e)
389
390        finally:
391            self._close(_arp_ppdu())
392            self._send_queue.close()
393
394            while True:
395                if future and not future.done():
396                    future.set_result(None)
397                if self._send_queue.empty():
398                    break
399                _, future = self._send_queue.get_nowait()

COPP connection

For creating new connection see connect or listen.

Connection( conn: Connection, syntax_names: SyntaxNames, cp_ppdu: Union[bool, int, Collection[bool], bytes, bytearray, memoryview, NoneType, tuple[int, ...], str, float, Tuple[str, ForwardRef('Value')], Dict[str, ForwardRef('Value')], Collection['Value'], hat.asn1.common.Entity, hat.asn1.common.External, hat.asn1.common.EmbeddedPDV], cpa_ppdu: Union[bool, int, Collection[bool], bytes, bytearray, memoryview, NoneType, tuple[int, ...], str, float, Tuple[str, ForwardRef('Value')], Dict[str, ForwardRef('Value')], Collection['Value'], hat.asn1.common.Entity, hat.asn1.common.External, hat.asn1.common.EmbeddedPDV], local_psel: int | None, remote_psel: int | None, receive_queue_size: int, send_queue_size: int)
231    def __init__(self,
232                 conn: cosp.Connection,
233                 syntax_names: SyntaxNames,
234                 cp_ppdu: asn1.Value,
235                 cpa_ppdu: asn1.Value,
236                 local_psel: int | None,
237                 remote_psel: int | None,
238                 receive_queue_size: int,
239                 send_queue_size: int):
240        cp_user_data = cp_ppdu['normal-mode-parameters']['user-data']
241        cpa_user_data = cpa_ppdu['normal-mode-parameters']['user-data']
242
243        conn_req_user_data = (
244            syntax_names.get_name(
245                cp_user_data[1][0]['presentation-context-identifier']),
246            cp_user_data[1][0]['presentation-data-values'][1])
247        conn_res_user_data = (
248            syntax_names.get_name(
249                cpa_user_data[1][0]['presentation-context-identifier']),
250            cpa_user_data[1][0]['presentation-data-values'][1])
251
252        self._conn = conn
253        self._syntax_names = syntax_names
254        self._conn_req_user_data = conn_req_user_data
255        self._conn_res_user_data = conn_res_user_data
256        self._loop = asyncio.get_running_loop()
257        self._info = ConnectionInfo(local_psel=local_psel,
258                                    remote_psel=remote_psel,
259                                    **conn.info._asdict())
260        self._close_ppdu = _arp_ppdu()
261        self._receive_queue = aio.Queue(receive_queue_size)
262        self._send_queue = aio.Queue(send_queue_size)
263        self._async_group = aio.Group()
264        self._log = _create_connection_logger(self._info.name, self._info)
265
266        self.async_group.spawn(aio.call_on_cancel, self._on_close)
267        self.async_group.spawn(self._receive_loop)
268        self.async_group.spawn(self._send_loop)
269        self.async_group.spawn(aio.call_on_done, conn.wait_closing(),
270                               self.close)
async_group: hat.aio.group.Group
272    @property
273    def async_group(self) -> aio.Group:
274        """Async group"""
275        return self._async_group

Async group

info: ConnectionInfo
277    @property
278    def info(self) -> ConnectionInfo:
279        """Connection info"""
280        return self._info

Connection info

syntax_names: SyntaxNames
282    @property
283    def syntax_names(self) -> SyntaxNames:
284        """Syntax names"""
285        return self._syntax_names

Syntax names

conn_req_user_data: tuple[tuple[int, ...], hat.asn1.common.Entity]
287    @property
288    def conn_req_user_data(self) -> IdentifiedEntity:
289        """Connect request's user data"""
290        return self._conn_req_user_data

Connect request's user data

conn_res_user_data: tuple[tuple[int, ...], hat.asn1.common.Entity]
292    @property
293    def conn_res_user_data(self) -> IdentifiedEntity:
294        """Connect response's user data"""
295        return self._conn_res_user_data

Connect response's user data

def close( self, user_data: tuple[tuple[int, ...], hat.asn1.common.Entity] | None = None):
297    def close(self, user_data: IdentifiedEntity | None = None):
298        """Close connection"""
299        self._close(_aru_ppdu(self._syntax_names, user_data))

Close connection

async def async_close( self, user_data: tuple[tuple[int, ...], hat.asn1.common.Entity] | None = None):
301    async def async_close(self, user_data: IdentifiedEntity | None = None):
302        """Async close"""
303        self.close(user_data)
304        await self.wait_closed()

Async close

async def receive(self) -> tuple[tuple[int, ...], hat.asn1.common.Entity]:
306    async def receive(self) -> IdentifiedEntity:
307        """Receive data"""
308        try:
309            return await self._receive_queue.get()
310
311        except aio.QueueClosedError:
312            raise ConnectionError()

Receive data

async def send(self, data: tuple[tuple[int, ...], hat.asn1.common.Entity]):
314    async def send(self, data: IdentifiedEntity):
315        """Send data"""
316        try:
317            await self._send_queue.put((data, None))
318
319        except aio.QueueClosedError:
320            raise ConnectionError()

Send data

async def drain(self):
322    async def drain(self):
323        """Drain output buffer"""
324        try:
325            future = self._loop.create_future()
326            await self._send_queue.put((None, future))
327            await future
328
329        except aio.QueueClosedError:
330            raise ConnectionError()

Drain output buffer