hat.drivers.copp

Connection oriented presentation protocol

  1"""Connection oriented presentation protocol"""
  2
  3import asyncio
  4import importlib.resources
  5import logging
  6import typing
  7
  8from hat import aio
  9from hat import asn1
 10
 11from hat.drivers import cosp
 12from hat.drivers import tcp
 13
 14
 15mlog = logging.getLogger(__name__)
 16
 17with importlib.resources.as_file(importlib.resources.files(__package__) /
 18                                 'asn1_repo.json') as _path:
 19    _encoder = asn1.Encoder(asn1.Encoding.BER,
 20                            asn1.Repository.from_json(_path))
 21
 22
 23class ConnectionInfo(typing.NamedTuple):
 24    local_addr: tcp.Address
 25    local_tsel: int | None
 26    local_ssel: int | None
 27    local_psel: int | None
 28    remote_addr: tcp.Address
 29    remote_tsel: int | None
 30    remote_ssel: int | None
 31    remote_psel: int | None
 32
 33
 34IdentifiedEntity: typing.TypeAlias = tuple[asn1.ObjectIdentifier, asn1.Entity]
 35"""Identified entity"""
 36
 37ValidateCb: typing.TypeAlias = aio.AsyncCallable[['SyntaxNames',
 38                                                  IdentifiedEntity],
 39                                                 IdentifiedEntity | None]
 40"""Validate callback"""
 41
 42ConnectionCb: typing.TypeAlias = aio.AsyncCallable[['Connection'], None]
 43"""Connection callback"""
 44
 45
 46class SyntaxNames:
 47    """Syntax name registry
 48
 49    Args:
 50        syntax_names: list of ASN.1 ObjectIdentifiers representing syntax names
 51
 52    """
 53
 54    def __init__(self, syntax_names: list[asn1.ObjectIdentifier]):
 55        self._syntax_id_names = {(i * 2 + 1): name
 56                                 for i, name in enumerate(syntax_names)}
 57        self._syntax_name_ids = {v: k
 58                                 for k, v in self._syntax_id_names.items()}
 59
 60    def get_name(self, syntax_id: int) -> asn1.ObjectIdentifier:
 61        """Get syntax name associated with id"""
 62        return self._syntax_id_names[syntax_id]
 63
 64    def get_id(self, syntax_name: asn1.ObjectIdentifier) -> int:
 65        """Get syntax id associated with name"""
 66        return self._syntax_name_ids[syntax_name]
 67
 68
 69async def connect(addr: tcp.Address,
 70                  syntax_names: SyntaxNames,
 71                  user_data: IdentifiedEntity | None = None,
 72                  *,
 73                  local_psel: int | None = None,
 74                  remote_psel: int | None = None,
 75                  copp_receive_queue_size: int = 1024,
 76                  copp_send_queue_size: int = 1024,
 77                  **kwargs
 78                  ) -> 'Connection':
 79    """Connect to COPP server
 80
 81    Additional arguments are passed directly to `hat.drivers.cosp.connect`.
 82
 83    """
 84    cp_ppdu = _cp_ppdu(syntax_names, local_psel, remote_psel, user_data)
 85    cp_ppdu_data = _encode('CP-type', cp_ppdu)
 86    conn = await cosp.connect(addr, cp_ppdu_data, **kwargs)
 87
 88    try:
 89        cpa_ppdu = _decode('CPA-PPDU', conn.conn_res_user_data)
 90        _validate_connect_response(cp_ppdu, cpa_ppdu)
 91
 92        calling_psel, called_psel = _get_psels(cp_ppdu)
 93        return Connection(conn, syntax_names, cp_ppdu, cpa_ppdu,
 94                          calling_psel, called_psel,
 95                          copp_receive_queue_size, copp_send_queue_size)
 96
 97    except Exception:
 98        await aio.uncancellable(_close_cosp(conn, _arp_ppdu()))
 99        raise
100
101
102async def listen(validate_cb: ValidateCb,
103                 connection_cb: ConnectionCb,
104                 addr: tcp.Address = tcp.Address('0.0.0.0', 102),
105                 *,
106                 bind_connections: bool = False,
107                 copp_receive_queue_size: int = 1024,
108                 copp_send_queue_size: int = 1024,
109                 **kwargs
110                 ) -> 'Server':
111    """Create COPP listening server
112
113    Additional arguments are passed directly to `hat.drivers.cosp.listen`.
114
115    Args:
116        validate_cb: callback function or coroutine called on new
117            incomming connection request prior to creating connection object
118        connection_cb: new connection callback
119        addr: local listening address
120
121    """
122    server = Server()
123    server._validate_cb = validate_cb
124    server._connection_cb = connection_cb
125    server._bind_connections = bind_connections
126    server._receive_queue_size = copp_receive_queue_size
127    server._send_queue_size = copp_send_queue_size
128
129    server._srv = await cosp.listen(server._on_validate,
130                                    server._on_connection,
131                                    addr,
132                                    bind_connections=False,
133                                    **kwargs)
134
135    return server
136
137
138class Server(aio.Resource):
139    """COPP listening server
140
141    For creating new server see `listen`.
142
143    """
144
145    @property
146    def async_group(self) -> aio.Group:
147        """Async group"""
148        return self._srv.async_group
149
150    @property
151    def addresses(self) -> list[tcp.Address]:
152        """Listening addresses"""
153        return self._srv.addresses
154
155    async def _on_validate(self, user_data):
156        cp_ppdu = _decode('CP-type', user_data)
157        cp_params = cp_ppdu['normal-mode-parameters']
158        called_psel_data = cp_params.get('called-presentation-selector')
159        called_psel = (int.from_bytes(called_psel_data, 'big')
160                       if called_psel_data else None)
161        cp_pdv_list = cp_params['user-data'][1][0]
162        syntax_names = _sytax_names_from_cp_ppdu(cp_ppdu)
163        cp_user_data = (
164            syntax_names.get_name(
165                cp_pdv_list['presentation-context-identifier']),
166            cp_pdv_list['presentation-data-values'][1])
167
168        cpa_user_data = await aio.call(self._validate_cb, syntax_names,
169                                       cp_user_data)
170
171        cpa_ppdu = _cpa_ppdu(syntax_names, called_psel, cpa_user_data)
172        cpa_ppdu_data = _encode('CPA-PPDU', cpa_ppdu)
173        return cpa_ppdu_data
174
175    async def _on_connection(self, cosp_conn):
176        try:
177            try:
178                cp_ppdu = _decode('CP-type', cosp_conn.conn_req_user_data)
179                cpa_ppdu = _decode('CPA-PPDU', cosp_conn.conn_res_user_data)
180
181                syntax_names = _sytax_names_from_cp_ppdu(cp_ppdu)
182                calling_psel, called_psel = _get_psels(cp_ppdu)
183
184                conn = Connection(cosp_conn, syntax_names, cp_ppdu, cpa_ppdu,
185                                  calling_psel, called_psel,
186                                  self._receive_queue_size,
187                                  self._send_queue_size)
188
189            except Exception:
190                await aio.uncancellable(_close_cosp(cosp_conn, _arp_ppdu()))
191                raise
192
193            try:
194                await aio.call(self._connection_cb, conn)
195
196            except BaseException:
197                await aio.uncancellable(conn.async_close())
198                raise
199
200        except Exception as e:
201            mlog.error("error creating new incomming connection: %s", e,
202                       exc_info=e)
203            return
204
205        if not self._bind_connections:
206            return
207
208        try:
209            await conn.wait_closed()
210
211        except BaseException:
212            await aio.uncancellable(conn.async_close())
213            raise
214
215
216class Connection(aio.Resource):
217    """COPP connection
218
219    For creating new connection see `connect` or `listen`.
220
221    """
222
223    def __init__(self,
224                 conn: cosp.Connection,
225                 syntax_names: SyntaxNames,
226                 cp_ppdu: asn1.Value,
227                 cpa_ppdu: asn1.Value,
228                 local_psel: int | None,
229                 remote_psel: int | None,
230                 receive_queue_size: int,
231                 send_queue_size: int):
232        cp_user_data = cp_ppdu['normal-mode-parameters']['user-data']
233        cpa_user_data = cpa_ppdu['normal-mode-parameters']['user-data']
234
235        conn_req_user_data = (
236            syntax_names.get_name(
237                cp_user_data[1][0]['presentation-context-identifier']),
238            cp_user_data[1][0]['presentation-data-values'][1])
239        conn_res_user_data = (
240            syntax_names.get_name(
241                cpa_user_data[1][0]['presentation-context-identifier']),
242            cpa_user_data[1][0]['presentation-data-values'][1])
243
244        self._conn = conn
245        self._syntax_names = syntax_names
246        self._conn_req_user_data = conn_req_user_data
247        self._conn_res_user_data = conn_res_user_data
248        self._loop = asyncio.get_running_loop()
249        self._info = ConnectionInfo(local_psel=local_psel,
250                                    remote_psel=remote_psel,
251                                    **conn.info._asdict())
252        self._close_ppdu = _arp_ppdu()
253        self._receive_queue = aio.Queue(receive_queue_size)
254        self._send_queue = aio.Queue(send_queue_size)
255        self._async_group = aio.Group()
256
257        self.async_group.spawn(aio.call_on_cancel, self._on_close)
258        self.async_group.spawn(self._receive_loop)
259        self.async_group.spawn(self._send_loop)
260        self.async_group.spawn(aio.call_on_done, conn.wait_closing(),
261                               self.close)
262
263    @property
264    def async_group(self) -> aio.Group:
265        """Async group"""
266        return self._async_group
267
268    @property
269    def info(self) -> ConnectionInfo:
270        """Connection info"""
271        return self._info
272
273    @property
274    def syntax_names(self) -> SyntaxNames:
275        """Syntax names"""
276        return self._syntax_names
277
278    @property
279    def conn_req_user_data(self) -> IdentifiedEntity:
280        """Connect request's user data"""
281        return self._conn_req_user_data
282
283    @property
284    def conn_res_user_data(self) -> IdentifiedEntity:
285        """Connect response's user data"""
286        return self._conn_res_user_data
287
288    def close(self, user_data: IdentifiedEntity | None = None):
289        """Close connection"""
290        self._close(_aru_ppdu(self._syntax_names, user_data))
291
292    async def async_close(self, user_data: IdentifiedEntity | None = None):
293        """Async close"""
294        self.close(user_data)
295        await self.wait_closed()
296
297    async def receive(self) -> IdentifiedEntity:
298        """Receive data"""
299        try:
300            return await self._receive_queue.get()
301
302        except aio.QueueClosedError:
303            raise ConnectionError()
304
305    async def send(self, data: IdentifiedEntity):
306        """Send data"""
307        try:
308            await self._send_queue.put((data, None))
309
310        except aio.QueueClosedError:
311            raise ConnectionError()
312
313    async def drain(self):
314        """Drain output buffer"""
315        try:
316            future = self._loop.create_future()
317            await self._send_queue.put((None, future))
318            await future
319
320        except aio.QueueClosedError:
321            raise ConnectionError()
322
323    async def _on_close(self):
324        await _close_cosp(self._conn, self._close_ppdu)
325
326    def _close(self, ppdu):
327        if not self.is_open:
328            return
329
330        self._close_ppdu = ppdu
331        self._async_group.close()
332
333    async def _receive_loop(self):
334        try:
335            while True:
336                cosp_data = await self._conn.receive()
337
338                user_data = _decode('User-data', cosp_data)
339                pdv_list = user_data[1][0]
340                syntax_name = self._syntax_names.get_name(
341                    pdv_list['presentation-context-identifier'])
342                data = pdv_list['presentation-data-values'][1]
343
344                await self._receive_queue.put((syntax_name, data))
345
346        except ConnectionError:
347            pass
348
349        except Exception as e:
350            mlog.error("receive loop error: %s", e, exc_info=e)
351
352        finally:
353            self._close(_arp_ppdu())
354            self._receive_queue.close()
355
356    async def _send_loop(self):
357        future = None
358        try:
359            while True:
360                data, future = await self._send_queue.get()
361
362                if data is None:
363                    await self._conn.drain()
364
365                else:
366                    ppdu_data = _encode('User-data',
367                                        _user_data(self._syntax_names, data))
368                    await self._conn.send(ppdu_data)
369
370                if future and not future.done():
371                    future.set_result(None)
372
373        except ConnectionError:
374            pass
375
376        except Exception as e:
377            mlog.error("send loop error: %s", e, exc_info=e)
378
379        finally:
380            self._close(_arp_ppdu())
381            self._send_queue.close()
382
383            while True:
384                if future and not future.done():
385                    future.set_result(None)
386                if self._send_queue.empty():
387                    break
388                _, future = self._send_queue.get_nowait()
389
390
391async def _close_cosp(cosp_conn, ppdu):
392    try:
393        data = _encode('Abort-type', ppdu)
394
395    except Exception as e:
396        mlog.error("error encoding abort ppdu: %s", e, exc_info=e)
397        data = None
398
399    finally:
400        await cosp_conn.async_close(data)
401
402
403def _get_psels(cp_ppdu):
404    cp_params = cp_ppdu['normal-mode-parameters']
405    calling_psel_data = cp_params.get('calling-presentation-selector')
406    calling_psel = (int.from_bytes(calling_psel_data, 'big')
407                    if calling_psel_data else None)
408    called_psel_data = cp_params.get('called-presentation-selector')
409    called_psel = (int.from_bytes(called_psel_data, 'big')
410                   if called_psel_data else None)
411    return calling_psel, called_psel
412
413
414def _validate_connect_response(cp_ppdu, cpa_ppdu):
415    cp_params = cp_ppdu['normal-mode-parameters']
416    cpa_params = cpa_ppdu['normal-mode-parameters']
417    called_psel_data = cp_params.get('called-presentation-selector')
418    responding_psel_data = cpa_params.get('responding-presentation-selector')
419
420    if called_psel_data and responding_psel_data:
421        called_psel = int.from_bytes(called_psel_data, 'big')
422        responding_psel = int.from_bytes(responding_psel_data, 'big')
423
424        if called_psel != responding_psel:
425            raise Exception('presentation selectors not matching')
426
427    result_list = cpa_params['presentation-context-definition-result-list']
428    if any(i['result'] != 0 for i in result_list):
429        raise Exception('presentation context not accepted')
430
431
432def _cp_ppdu(syntax_names, calling_psel, called_psel, user_data):
433    cp_params = {
434        'presentation-context-definition-list': [
435            {'presentation-context-identifier': i,
436             'abstract-syntax-name': name,
437             'transfer-syntax-name-list': [_encoder.syntax_name]}
438            for i, name in syntax_names._syntax_id_names.items()]}
439
440    if calling_psel is not None:
441        cp_params['calling-presentation-selector'] = \
442            calling_psel.to_bytes(4, 'big')
443
444    if called_psel is not None:
445        cp_params['called-presentation-selector'] = \
446            called_psel.to_bytes(4, 'big')
447
448    if user_data:
449        cp_params['user-data'] = _user_data(syntax_names, user_data)
450
451    return {
452        'mode-selector': {
453            'mode-value': 1},
454        'normal-mode-parameters': cp_params}
455
456
457def _cpa_ppdu(syntax_names, responding_psel, user_data):
458    cpa_params = {
459        'presentation-context-definition-result-list': [
460            {'result': 0,
461             'transfer-syntax-name': _encoder.syntax_name}
462            for _ in syntax_names._syntax_id_names.keys()]}
463
464    if responding_psel is not None:
465        cpa_params['responding-presentation-selector'] = \
466            responding_psel.to_bytes(4, 'big')
467
468    if user_data:
469        cpa_params['user-data'] = _user_data(syntax_names, user_data)
470
471    return {
472        'mode-selector': {
473            'mode-value': 1},
474        'normal-mode-parameters': cpa_params}
475
476
477def _aru_ppdu(syntax_names, user_data):
478    aru_params = {}
479
480    if user_data:
481        aru_params['user-data'] = _user_data(syntax_names, user_data)
482
483    return 'aru-ppdu', ('normal-mode-parameters', aru_params)
484
485
486def _arp_ppdu():
487    return 'arp-ppdu', {}
488
489
490def _user_data(syntax_names, user_data):
491    return 'fully-encoded-data', [{
492        'presentation-context-identifier': syntax_names.get_id(user_data[0]),
493        'presentation-data-values': (
494            'single-ASN1-type', user_data[1])}]
495
496
497def _sytax_names_from_cp_ppdu(cp_ppdu):
498    cp_params = cp_ppdu['normal-mode-parameters']
499    syntax_names = SyntaxNames([])
500    syntax_names._syntax_id_names = {
501        i['presentation-context-identifier']: i['abstract-syntax-name']
502        for i in cp_params['presentation-context-definition-list']}
503    syntax_names._syntax_name_ids = {
504        v: k for k, v in syntax_names._syntax_id_names.items()}
505    return syntax_names
506
507
508def _encode(name, value):
509    return _encoder.encode('ISO8823-PRESENTATION', name, value)
510
511
512def _decode(name, data):
513    res, _ = _encoder.decode('ISO8823-PRESENTATION', name, memoryview(data))
514    return res
mlog = <Logger hat.drivers.copp (WARNING)>
class ConnectionInfo(typing.NamedTuple):
24class ConnectionInfo(typing.NamedTuple):
25    local_addr: tcp.Address
26    local_tsel: int | None
27    local_ssel: int | None
28    local_psel: int | None
29    remote_addr: tcp.Address
30    remote_tsel: int | None
31    remote_ssel: int | None
32    remote_psel: int | None

ConnectionInfo(local_addr, local_tsel, local_ssel, local_psel, remote_addr, remote_tsel, remote_ssel, remote_psel)

ConnectionInfo( local_addr: hat.drivers.tcp.Address, local_tsel: int | None, local_ssel: int | None, local_psel: int | None, remote_addr: hat.drivers.tcp.Address, remote_tsel: int | None, remote_ssel: int | None, remote_psel: int | None)

Create new instance of ConnectionInfo(local_addr, local_tsel, local_ssel, local_psel, remote_addr, remote_tsel, remote_ssel, remote_psel)

Alias for field number 0

local_tsel: int | None

Alias for field number 1

local_ssel: int | None

Alias for field number 2

local_psel: int | None

Alias for field number 3

Alias for field number 4

remote_tsel: int | None

Alias for field number 5

remote_ssel: int | None

Alias for field number 6

remote_psel: int | None

Alias for field number 7

IdentifiedEntity: TypeAlias = tuple[tuple[int, ...], hat.asn1.common.Entity]

Identified entity

ValidateCb: TypeAlias = Callable[[ForwardRef('SyntaxNames'), tuple[tuple[int, ...], hat.asn1.common.Entity]], Union[tuple[tuple[int, ...], hat.asn1.common.Entity], NoneType, Awaitable[tuple[tuple[int, ...], hat.asn1.common.Entity] | None]]]

Validate callback

ConnectionCb: TypeAlias = Callable[[ForwardRef('Connection')], Optional[Awaitable[NoneType]]]

Connection callback

class SyntaxNames:
47class SyntaxNames:
48    """Syntax name registry
49
50    Args:
51        syntax_names: list of ASN.1 ObjectIdentifiers representing syntax names
52
53    """
54
55    def __init__(self, syntax_names: list[asn1.ObjectIdentifier]):
56        self._syntax_id_names = {(i * 2 + 1): name
57                                 for i, name in enumerate(syntax_names)}
58        self._syntax_name_ids = {v: k
59                                 for k, v in self._syntax_id_names.items()}
60
61    def get_name(self, syntax_id: int) -> asn1.ObjectIdentifier:
62        """Get syntax name associated with id"""
63        return self._syntax_id_names[syntax_id]
64
65    def get_id(self, syntax_name: asn1.ObjectIdentifier) -> int:
66        """Get syntax id associated with name"""
67        return self._syntax_name_ids[syntax_name]

Syntax name registry

Arguments:
  • syntax_names: list of ASN.1 ObjectIdentifiers representing syntax names
SyntaxNames(syntax_names: list[tuple[int, ...]])
55    def __init__(self, syntax_names: list[asn1.ObjectIdentifier]):
56        self._syntax_id_names = {(i * 2 + 1): name
57                                 for i, name in enumerate(syntax_names)}
58        self._syntax_name_ids = {v: k
59                                 for k, v in self._syntax_id_names.items()}
def get_name(self, syntax_id: int) -> tuple[int, ...]:
61    def get_name(self, syntax_id: int) -> asn1.ObjectIdentifier:
62        """Get syntax name associated with id"""
63        return self._syntax_id_names[syntax_id]

Get syntax name associated with id

def get_id(self, syntax_name: tuple[int, ...]) -> int:
65    def get_id(self, syntax_name: asn1.ObjectIdentifier) -> int:
66        """Get syntax id associated with name"""
67        return self._syntax_name_ids[syntax_name]

Get syntax id associated with name

async def connect( addr: hat.drivers.tcp.Address, syntax_names: SyntaxNames, user_data: tuple[tuple[int, ...], hat.asn1.common.Entity] | None = None, *, local_psel: int | None = None, remote_psel: int | None = None, copp_receive_queue_size: int = 1024, copp_send_queue_size: int = 1024, **kwargs) -> Connection:
 70async def connect(addr: tcp.Address,
 71                  syntax_names: SyntaxNames,
 72                  user_data: IdentifiedEntity | None = None,
 73                  *,
 74                  local_psel: int | None = None,
 75                  remote_psel: int | None = None,
 76                  copp_receive_queue_size: int = 1024,
 77                  copp_send_queue_size: int = 1024,
 78                  **kwargs
 79                  ) -> 'Connection':
 80    """Connect to COPP server
 81
 82    Additional arguments are passed directly to `hat.drivers.cosp.connect`.
 83
 84    """
 85    cp_ppdu = _cp_ppdu(syntax_names, local_psel, remote_psel, user_data)
 86    cp_ppdu_data = _encode('CP-type', cp_ppdu)
 87    conn = await cosp.connect(addr, cp_ppdu_data, **kwargs)
 88
 89    try:
 90        cpa_ppdu = _decode('CPA-PPDU', conn.conn_res_user_data)
 91        _validate_connect_response(cp_ppdu, cpa_ppdu)
 92
 93        calling_psel, called_psel = _get_psels(cp_ppdu)
 94        return Connection(conn, syntax_names, cp_ppdu, cpa_ppdu,
 95                          calling_psel, called_psel,
 96                          copp_receive_queue_size, copp_send_queue_size)
 97
 98    except Exception:
 99        await aio.uncancellable(_close_cosp(conn, _arp_ppdu()))
100        raise

Connect to COPP server

Additional arguments are passed directly to hat.drivers.cosp.connect.

async def listen( validate_cb: Callable[[SyntaxNames, tuple[tuple[int, ...], hat.asn1.common.Entity]], Union[tuple[tuple[int, ...], hat.asn1.common.Entity], NoneType, Awaitable[tuple[tuple[int, ...], hat.asn1.common.Entity] | None]]], connection_cb: Callable[[hat.drivers.acse.Connection], Optional[Awaitable[NoneType]]], addr: hat.drivers.tcp.Address = Address(host='0.0.0.0', port=102), *, bind_connections: bool = False, copp_receive_queue_size: int = 1024, copp_send_queue_size: int = 1024, **kwargs) -> Server:
103async def listen(validate_cb: ValidateCb,
104                 connection_cb: ConnectionCb,
105                 addr: tcp.Address = tcp.Address('0.0.0.0', 102),
106                 *,
107                 bind_connections: bool = False,
108                 copp_receive_queue_size: int = 1024,
109                 copp_send_queue_size: int = 1024,
110                 **kwargs
111                 ) -> 'Server':
112    """Create COPP listening server
113
114    Additional arguments are passed directly to `hat.drivers.cosp.listen`.
115
116    Args:
117        validate_cb: callback function or coroutine called on new
118            incomming connection request prior to creating connection object
119        connection_cb: new connection callback
120        addr: local listening address
121
122    """
123    server = Server()
124    server._validate_cb = validate_cb
125    server._connection_cb = connection_cb
126    server._bind_connections = bind_connections
127    server._receive_queue_size = copp_receive_queue_size
128    server._send_queue_size = copp_send_queue_size
129
130    server._srv = await cosp.listen(server._on_validate,
131                                    server._on_connection,
132                                    addr,
133                                    bind_connections=False,
134                                    **kwargs)
135
136    return server

Create COPP listening server

Additional arguments are passed directly to hat.drivers.cosp.listen.

Arguments:
  • validate_cb: callback function or coroutine called on new incomming connection request prior to creating connection object
  • connection_cb: new connection callback
  • addr: local listening address
class Server(hat.aio.group.Resource):
139class Server(aio.Resource):
140    """COPP listening server
141
142    For creating new server see `listen`.
143
144    """
145
146    @property
147    def async_group(self) -> aio.Group:
148        """Async group"""
149        return self._srv.async_group
150
151    @property
152    def addresses(self) -> list[tcp.Address]:
153        """Listening addresses"""
154        return self._srv.addresses
155
156    async def _on_validate(self, user_data):
157        cp_ppdu = _decode('CP-type', user_data)
158        cp_params = cp_ppdu['normal-mode-parameters']
159        called_psel_data = cp_params.get('called-presentation-selector')
160        called_psel = (int.from_bytes(called_psel_data, 'big')
161                       if called_psel_data else None)
162        cp_pdv_list = cp_params['user-data'][1][0]
163        syntax_names = _sytax_names_from_cp_ppdu(cp_ppdu)
164        cp_user_data = (
165            syntax_names.get_name(
166                cp_pdv_list['presentation-context-identifier']),
167            cp_pdv_list['presentation-data-values'][1])
168
169        cpa_user_data = await aio.call(self._validate_cb, syntax_names,
170                                       cp_user_data)
171
172        cpa_ppdu = _cpa_ppdu(syntax_names, called_psel, cpa_user_data)
173        cpa_ppdu_data = _encode('CPA-PPDU', cpa_ppdu)
174        return cpa_ppdu_data
175
176    async def _on_connection(self, cosp_conn):
177        try:
178            try:
179                cp_ppdu = _decode('CP-type', cosp_conn.conn_req_user_data)
180                cpa_ppdu = _decode('CPA-PPDU', cosp_conn.conn_res_user_data)
181
182                syntax_names = _sytax_names_from_cp_ppdu(cp_ppdu)
183                calling_psel, called_psel = _get_psels(cp_ppdu)
184
185                conn = Connection(cosp_conn, syntax_names, cp_ppdu, cpa_ppdu,
186                                  calling_psel, called_psel,
187                                  self._receive_queue_size,
188                                  self._send_queue_size)
189
190            except Exception:
191                await aio.uncancellable(_close_cosp(cosp_conn, _arp_ppdu()))
192                raise
193
194            try:
195                await aio.call(self._connection_cb, conn)
196
197            except BaseException:
198                await aio.uncancellable(conn.async_close())
199                raise
200
201        except Exception as e:
202            mlog.error("error creating new incomming connection: %s", e,
203                       exc_info=e)
204            return
205
206        if not self._bind_connections:
207            return
208
209        try:
210            await conn.wait_closed()
211
212        except BaseException:
213            await aio.uncancellable(conn.async_close())
214            raise

COPP listening server

For creating new server see listen.

async_group: hat.aio.group.Group
146    @property
147    def async_group(self) -> aio.Group:
148        """Async group"""
149        return self._srv.async_group

Async group

addresses: list[hat.drivers.tcp.Address]
151    @property
152    def addresses(self) -> list[tcp.Address]:
153        """Listening addresses"""
154        return self._srv.addresses

Listening addresses

class Connection(hat.aio.group.Resource):
217class Connection(aio.Resource):
218    """COPP connection
219
220    For creating new connection see `connect` or `listen`.
221
222    """
223
224    def __init__(self,
225                 conn: cosp.Connection,
226                 syntax_names: SyntaxNames,
227                 cp_ppdu: asn1.Value,
228                 cpa_ppdu: asn1.Value,
229                 local_psel: int | None,
230                 remote_psel: int | None,
231                 receive_queue_size: int,
232                 send_queue_size: int):
233        cp_user_data = cp_ppdu['normal-mode-parameters']['user-data']
234        cpa_user_data = cpa_ppdu['normal-mode-parameters']['user-data']
235
236        conn_req_user_data = (
237            syntax_names.get_name(
238                cp_user_data[1][0]['presentation-context-identifier']),
239            cp_user_data[1][0]['presentation-data-values'][1])
240        conn_res_user_data = (
241            syntax_names.get_name(
242                cpa_user_data[1][0]['presentation-context-identifier']),
243            cpa_user_data[1][0]['presentation-data-values'][1])
244
245        self._conn = conn
246        self._syntax_names = syntax_names
247        self._conn_req_user_data = conn_req_user_data
248        self._conn_res_user_data = conn_res_user_data
249        self._loop = asyncio.get_running_loop()
250        self._info = ConnectionInfo(local_psel=local_psel,
251                                    remote_psel=remote_psel,
252                                    **conn.info._asdict())
253        self._close_ppdu = _arp_ppdu()
254        self._receive_queue = aio.Queue(receive_queue_size)
255        self._send_queue = aio.Queue(send_queue_size)
256        self._async_group = aio.Group()
257
258        self.async_group.spawn(aio.call_on_cancel, self._on_close)
259        self.async_group.spawn(self._receive_loop)
260        self.async_group.spawn(self._send_loop)
261        self.async_group.spawn(aio.call_on_done, conn.wait_closing(),
262                               self.close)
263
264    @property
265    def async_group(self) -> aio.Group:
266        """Async group"""
267        return self._async_group
268
269    @property
270    def info(self) -> ConnectionInfo:
271        """Connection info"""
272        return self._info
273
274    @property
275    def syntax_names(self) -> SyntaxNames:
276        """Syntax names"""
277        return self._syntax_names
278
279    @property
280    def conn_req_user_data(self) -> IdentifiedEntity:
281        """Connect request's user data"""
282        return self._conn_req_user_data
283
284    @property
285    def conn_res_user_data(self) -> IdentifiedEntity:
286        """Connect response's user data"""
287        return self._conn_res_user_data
288
289    def close(self, user_data: IdentifiedEntity | None = None):
290        """Close connection"""
291        self._close(_aru_ppdu(self._syntax_names, user_data))
292
293    async def async_close(self, user_data: IdentifiedEntity | None = None):
294        """Async close"""
295        self.close(user_data)
296        await self.wait_closed()
297
298    async def receive(self) -> IdentifiedEntity:
299        """Receive data"""
300        try:
301            return await self._receive_queue.get()
302
303        except aio.QueueClosedError:
304            raise ConnectionError()
305
306    async def send(self, data: IdentifiedEntity):
307        """Send data"""
308        try:
309            await self._send_queue.put((data, None))
310
311        except aio.QueueClosedError:
312            raise ConnectionError()
313
314    async def drain(self):
315        """Drain output buffer"""
316        try:
317            future = self._loop.create_future()
318            await self._send_queue.put((None, future))
319            await future
320
321        except aio.QueueClosedError:
322            raise ConnectionError()
323
324    async def _on_close(self):
325        await _close_cosp(self._conn, self._close_ppdu)
326
327    def _close(self, ppdu):
328        if not self.is_open:
329            return
330
331        self._close_ppdu = ppdu
332        self._async_group.close()
333
334    async def _receive_loop(self):
335        try:
336            while True:
337                cosp_data = await self._conn.receive()
338
339                user_data = _decode('User-data', cosp_data)
340                pdv_list = user_data[1][0]
341                syntax_name = self._syntax_names.get_name(
342                    pdv_list['presentation-context-identifier'])
343                data = pdv_list['presentation-data-values'][1]
344
345                await self._receive_queue.put((syntax_name, data))
346
347        except ConnectionError:
348            pass
349
350        except Exception as e:
351            mlog.error("receive loop error: %s", e, exc_info=e)
352
353        finally:
354            self._close(_arp_ppdu())
355            self._receive_queue.close()
356
357    async def _send_loop(self):
358        future = None
359        try:
360            while True:
361                data, future = await self._send_queue.get()
362
363                if data is None:
364                    await self._conn.drain()
365
366                else:
367                    ppdu_data = _encode('User-data',
368                                        _user_data(self._syntax_names, data))
369                    await self._conn.send(ppdu_data)
370
371                if future and not future.done():
372                    future.set_result(None)
373
374        except ConnectionError:
375            pass
376
377        except Exception as e:
378            mlog.error("send loop error: %s", e, exc_info=e)
379
380        finally:
381            self._close(_arp_ppdu())
382            self._send_queue.close()
383
384            while True:
385                if future and not future.done():
386                    future.set_result(None)
387                if self._send_queue.empty():
388                    break
389                _, future = self._send_queue.get_nowait()

COPP connection

For creating new connection see connect or listen.

Connection( conn: Connection, syntax_names: SyntaxNames, cp_ppdu: Union[bool, int, list[bool], bytes, bytearray, memoryview, NoneType, tuple[int, ...], str, hat.asn1.common.External, float, hat.asn1.common.EmbeddedPDV, Tuple[str, ForwardRef('Value')], Dict[str, ForwardRef('Value')], Iterable[ForwardRef('Value')], List[ForwardRef('Value')], hat.asn1.common.Entity], cpa_ppdu: Union[bool, int, list[bool], bytes, bytearray, memoryview, NoneType, tuple[int, ...], str, hat.asn1.common.External, float, hat.asn1.common.EmbeddedPDV, Tuple[str, ForwardRef('Value')], Dict[str, ForwardRef('Value')], Iterable[ForwardRef('Value')], List[ForwardRef('Value')], hat.asn1.common.Entity], local_psel: int | None, remote_psel: int | None, receive_queue_size: int, send_queue_size: int)
224    def __init__(self,
225                 conn: cosp.Connection,
226                 syntax_names: SyntaxNames,
227                 cp_ppdu: asn1.Value,
228                 cpa_ppdu: asn1.Value,
229                 local_psel: int | None,
230                 remote_psel: int | None,
231                 receive_queue_size: int,
232                 send_queue_size: int):
233        cp_user_data = cp_ppdu['normal-mode-parameters']['user-data']
234        cpa_user_data = cpa_ppdu['normal-mode-parameters']['user-data']
235
236        conn_req_user_data = (
237            syntax_names.get_name(
238                cp_user_data[1][0]['presentation-context-identifier']),
239            cp_user_data[1][0]['presentation-data-values'][1])
240        conn_res_user_data = (
241            syntax_names.get_name(
242                cpa_user_data[1][0]['presentation-context-identifier']),
243            cpa_user_data[1][0]['presentation-data-values'][1])
244
245        self._conn = conn
246        self._syntax_names = syntax_names
247        self._conn_req_user_data = conn_req_user_data
248        self._conn_res_user_data = conn_res_user_data
249        self._loop = asyncio.get_running_loop()
250        self._info = ConnectionInfo(local_psel=local_psel,
251                                    remote_psel=remote_psel,
252                                    **conn.info._asdict())
253        self._close_ppdu = _arp_ppdu()
254        self._receive_queue = aio.Queue(receive_queue_size)
255        self._send_queue = aio.Queue(send_queue_size)
256        self._async_group = aio.Group()
257
258        self.async_group.spawn(aio.call_on_cancel, self._on_close)
259        self.async_group.spawn(self._receive_loop)
260        self.async_group.spawn(self._send_loop)
261        self.async_group.spawn(aio.call_on_done, conn.wait_closing(),
262                               self.close)
async_group: hat.aio.group.Group
264    @property
265    def async_group(self) -> aio.Group:
266        """Async group"""
267        return self._async_group

Async group

info: ConnectionInfo
269    @property
270    def info(self) -> ConnectionInfo:
271        """Connection info"""
272        return self._info

Connection info

syntax_names: SyntaxNames
274    @property
275    def syntax_names(self) -> SyntaxNames:
276        """Syntax names"""
277        return self._syntax_names

Syntax names

conn_req_user_data: tuple[tuple[int, ...], hat.asn1.common.Entity]
279    @property
280    def conn_req_user_data(self) -> IdentifiedEntity:
281        """Connect request's user data"""
282        return self._conn_req_user_data

Connect request's user data

conn_res_user_data: tuple[tuple[int, ...], hat.asn1.common.Entity]
284    @property
285    def conn_res_user_data(self) -> IdentifiedEntity:
286        """Connect response's user data"""
287        return self._conn_res_user_data

Connect response's user data

def close( self, user_data: tuple[tuple[int, ...], hat.asn1.common.Entity] | None = None):
289    def close(self, user_data: IdentifiedEntity | None = None):
290        """Close connection"""
291        self._close(_aru_ppdu(self._syntax_names, user_data))

Close connection

async def async_close( self, user_data: tuple[tuple[int, ...], hat.asn1.common.Entity] | None = None):
293    async def async_close(self, user_data: IdentifiedEntity | None = None):
294        """Async close"""
295        self.close(user_data)
296        await self.wait_closed()

Async close

async def receive(self) -> tuple[tuple[int, ...], hat.asn1.common.Entity]:
298    async def receive(self) -> IdentifiedEntity:
299        """Receive data"""
300        try:
301            return await self._receive_queue.get()
302
303        except aio.QueueClosedError:
304            raise ConnectionError()

Receive data

async def send(self, data: tuple[tuple[int, ...], hat.asn1.common.Entity]):
306    async def send(self, data: IdentifiedEntity):
307        """Send data"""
308        try:
309            await self._send_queue.put((data, None))
310
311        except aio.QueueClosedError:
312            raise ConnectionError()

Send data

async def drain(self):
314    async def drain(self):
315        """Drain output buffer"""
316        try:
317            future = self._loop.create_future()
318            await self._send_queue.put((None, future))
319            await future
320
321        except aio.QueueClosedError:
322            raise ConnectionError()

Drain output buffer