hat.drivers.copp

Connection oriented presentation protocol

  1"""Connection oriented presentation protocol"""
  2
  3import asyncio
  4import importlib.resources
  5import logging
  6import typing
  7
  8from hat import aio
  9from hat import asn1
 10from hat import json
 11
 12from hat.drivers import cosp
 13from hat.drivers import tcp
 14
 15
 16mlog = logging.getLogger(__name__)
 17
 18with importlib.resources.open_text(__package__, 'asn1_repo.json') as _f:
 19    _encoder = asn1.ber.BerEncoder(
 20        asn1.repository_from_json(
 21            json.decode_stream(_f)))
 22
 23
 24class ConnectionInfo(typing.NamedTuple):
 25    local_addr: tcp.Address
 26    local_tsel: int | None
 27    local_ssel: int | None
 28    local_psel: int | None
 29    remote_addr: tcp.Address
 30    remote_tsel: int | None
 31    remote_ssel: int | None
 32    remote_psel: int | None
 33
 34
 35IdentifiedEntity: typing.TypeAlias = tuple[asn1.ObjectIdentifier, asn1.Entity]
 36"""Identified entity"""
 37
 38ValidateCb: typing.TypeAlias = aio.AsyncCallable[['SyntaxNames',
 39                                                  IdentifiedEntity],
 40                                                 IdentifiedEntity | None]
 41"""Validate callback"""
 42
 43ConnectionCb: typing.TypeAlias = aio.AsyncCallable[['Connection'], None]
 44"""Connection callback"""
 45
 46
 47class SyntaxNames:
 48    """Syntax name registry
 49
 50    Args:
 51        syntax_names: list of ASN.1 ObjectIdentifiers representing syntax names
 52
 53    """
 54
 55    def __init__(self, syntax_names: list[asn1.ObjectIdentifier]):
 56        self._syntax_id_names = {(i * 2 + 1): name
 57                                 for i, name in enumerate(syntax_names)}
 58        self._syntax_name_ids = {v: k
 59                                 for k, v in self._syntax_id_names.items()}
 60
 61    def get_name(self, syntax_id: int) -> asn1.ObjectIdentifier:
 62        """Get syntax name associated with id"""
 63        return self._syntax_id_names[syntax_id]
 64
 65    def get_id(self, syntax_name: asn1.ObjectIdentifier) -> int:
 66        """Get syntax id associated with name"""
 67        return self._syntax_name_ids[syntax_name]
 68
 69
 70async def connect(addr: tcp.Address,
 71                  syntax_names: SyntaxNames,
 72                  user_data: IdentifiedEntity | None = None,
 73                  *,
 74                  local_psel: int | None = None,
 75                  remote_psel: int | None = None,
 76                  copp_receive_queue_size: int = 1024,
 77                  copp_send_queue_size: int = 1024,
 78                  **kwargs
 79                  ) -> 'Connection':
 80    """Connect to COPP server
 81
 82    Additional arguments are passed directly to `hat.drivers.cosp.connect`.
 83
 84    """
 85    cp_ppdu = _cp_ppdu(syntax_names, local_psel, remote_psel, user_data)
 86    cp_ppdu_data = _encode('CP-type', cp_ppdu)
 87    conn = await cosp.connect(addr, cp_ppdu_data, **kwargs)
 88
 89    try:
 90        cpa_ppdu = _decode('CPA-PPDU', conn.conn_res_user_data)
 91        _validate_connect_response(cp_ppdu, cpa_ppdu)
 92
 93        calling_psel, called_psel = _get_psels(cp_ppdu)
 94        return Connection(conn, syntax_names, cp_ppdu, cpa_ppdu,
 95                          calling_psel, called_psel,
 96                          copp_receive_queue_size, copp_send_queue_size)
 97
 98    except Exception:
 99        await aio.uncancellable(_close_cosp(conn, _arp_ppdu()))
100        raise
101
102
103async def listen(validate_cb: ValidateCb,
104                 connection_cb: ConnectionCb,
105                 addr: tcp.Address = tcp.Address('0.0.0.0', 102),
106                 *,
107                 bind_connections: bool = False,
108                 copp_receive_queue_size: int = 1024,
109                 copp_send_queue_size: int = 1024,
110                 **kwargs
111                 ) -> 'Server':
112    """Create COPP listening server
113
114    Additional arguments are passed directly to `hat.drivers.cosp.listen`.
115
116    Args:
117        validate_cb: callback function or coroutine called on new
118            incomming connection request prior to creating connection object
119        connection_cb: new connection callback
120        addr: local listening address
121
122    """
123    server = Server()
124    server._validate_cb = validate_cb
125    server._connection_cb = connection_cb
126    server._bind_connections = bind_connections
127    server._receive_queue_size = copp_receive_queue_size
128    server._send_queue_size = copp_send_queue_size
129
130    server._srv = await cosp.listen(server._on_validate,
131                                    server._on_connection,
132                                    addr,
133                                    bind_connections=False,
134                                    **kwargs)
135
136    return server
137
138
139class Server(aio.Resource):
140    """COPP listening server
141
142    For creating new server see `listen`.
143
144    """
145
146    @property
147    def async_group(self) -> aio.Group:
148        """Async group"""
149        return self._srv.async_group
150
151    @property
152    def addresses(self) -> list[tcp.Address]:
153        """Listening addresses"""
154        return self._srv.addresses
155
156    async def _on_validate(self, user_data):
157        cp_ppdu = _decode('CP-type', user_data)
158        cp_params = cp_ppdu['normal-mode-parameters']
159        called_psel_data = cp_params.get('called-presentation-selector')
160        called_psel = (int.from_bytes(called_psel_data, 'big')
161                       if called_psel_data else None)
162        cp_pdv_list = cp_params['user-data'][1][0]
163        syntax_names = _sytax_names_from_cp_ppdu(cp_ppdu)
164        cp_user_data = (
165            syntax_names.get_name(
166                cp_pdv_list['presentation-context-identifier']),
167            cp_pdv_list['presentation-data-values'][1])
168
169        cpa_user_data = await aio.call(self._validate_cb, syntax_names,
170                                       cp_user_data)
171
172        cpa_ppdu = _cpa_ppdu(syntax_names, called_psel, cpa_user_data)
173        cpa_ppdu_data = _encode('CPA-PPDU', cpa_ppdu)
174        return cpa_ppdu_data
175
176    async def _on_connection(self, cosp_conn):
177        try:
178            try:
179                cp_ppdu = _decode('CP-type', cosp_conn.conn_req_user_data)
180                cpa_ppdu = _decode('CPA-PPDU', cosp_conn.conn_res_user_data)
181
182                syntax_names = _sytax_names_from_cp_ppdu(cp_ppdu)
183                calling_psel, called_psel = _get_psels(cp_ppdu)
184
185                conn = Connection(cosp_conn, syntax_names, cp_ppdu, cpa_ppdu,
186                                  calling_psel, called_psel,
187                                  self._receive_queue_size,
188                                  self._send_queue_size)
189
190            except Exception:
191                await aio.uncancellable(_close_cosp(cosp_conn, _arp_ppdu()))
192                raise
193
194            try:
195                await aio.call(self._connection_cb, conn)
196
197            except BaseException:
198                await aio.uncancellable(conn.async_close())
199                raise
200
201        except Exception as e:
202            mlog.error("error creating new incomming connection: %s", e,
203                       exc_info=e)
204            return
205
206        if not self._bind_connections:
207            return
208
209        try:
210            await conn.wait_closed()
211
212        except BaseException:
213            await aio.uncancellable(conn.async_close())
214            raise
215
216
217class Connection(aio.Resource):
218    """COPP connection
219
220    For creating new connection see `connect` or `listen`.
221
222    """
223
224    def __init__(self,
225                 conn: cosp.Connection,
226                 syntax_names: SyntaxNames,
227                 cp_ppdu: asn1.Value,
228                 cpa_ppdu: asn1.Value,
229                 local_psel: int | None,
230                 remote_psel: int | None,
231                 receive_queue_size: int,
232                 send_queue_size: int):
233        cp_user_data = cp_ppdu['normal-mode-parameters']['user-data']
234        cpa_user_data = cpa_ppdu['normal-mode-parameters']['user-data']
235
236        conn_req_user_data = (
237            syntax_names.get_name(
238                cp_user_data[1][0]['presentation-context-identifier']),
239            cp_user_data[1][0]['presentation-data-values'][1])
240        conn_res_user_data = (
241            syntax_names.get_name(
242                cpa_user_data[1][0]['presentation-context-identifier']),
243            cpa_user_data[1][0]['presentation-data-values'][1])
244
245        self._conn = conn
246        self._syntax_names = syntax_names
247        self._conn_req_user_data = conn_req_user_data
248        self._conn_res_user_data = conn_res_user_data
249        self._loop = asyncio.get_running_loop()
250        self._info = ConnectionInfo(local_psel=local_psel,
251                                    remote_psel=remote_psel,
252                                    **conn.info._asdict())
253        self._close_ppdu = _arp_ppdu()
254        self._receive_queue = aio.Queue(receive_queue_size)
255        self._send_queue = aio.Queue(send_queue_size)
256        self._async_group = aio.Group()
257
258        self.async_group.spawn(aio.call_on_cancel, self._on_close)
259        self.async_group.spawn(self._receive_loop)
260        self.async_group.spawn(self._send_loop)
261        self.async_group.spawn(aio.call_on_done, conn.wait_closing(),
262                               self.close)
263
264    @property
265    def async_group(self) -> aio.Group:
266        """Async group"""
267        return self._async_group
268
269    @property
270    def info(self) -> ConnectionInfo:
271        """Connection info"""
272        return self._info
273
274    @property
275    def syntax_names(self) -> SyntaxNames:
276        """Syntax names"""
277        return self._syntax_names
278
279    @property
280    def conn_req_user_data(self) -> IdentifiedEntity:
281        """Connect request's user data"""
282        return self._conn_req_user_data
283
284    @property
285    def conn_res_user_data(self) -> IdentifiedEntity:
286        """Connect response's user data"""
287        return self._conn_res_user_data
288
289    def close(self, user_data: IdentifiedEntity | None = None):
290        """Close connection"""
291        self._close(_aru_ppdu(self._syntax_names, user_data))
292
293    async def async_close(self, user_data: IdentifiedEntity | None = None):
294        """Async close"""
295        self.close(user_data)
296        await self.wait_closed()
297
298    async def receive(self) -> IdentifiedEntity:
299        """Receive data"""
300        try:
301            return await self._receive_queue.get()
302
303        except aio.QueueClosedError:
304            raise ConnectionError()
305
306    async def send(self, data: IdentifiedEntity):
307        """Send data"""
308        try:
309            await self._send_queue.put((data, None))
310
311        except aio.QueueClosedError:
312            raise ConnectionError()
313
314    async def drain(self):
315        """Drain output buffer"""
316        try:
317            future = self._loop.create_future()
318            await self._send_queue.put((None, future))
319            await future
320
321        except aio.QueueClosedError:
322            raise ConnectionError()
323
324    async def _on_close(self):
325        await _close_cosp(self._conn, self._close_ppdu)
326
327    def _close(self, ppdu):
328        if not self.is_open:
329            return
330
331        self._close_ppdu = ppdu
332        self._async_group.close()
333
334    async def _receive_loop(self):
335        try:
336            while True:
337                cosp_data = await self._conn.receive()
338
339                user_data = _decode('User-data', cosp_data)
340                pdv_list = user_data[1][0]
341                syntax_name = self._syntax_names.get_name(
342                    pdv_list['presentation-context-identifier'])
343                data = pdv_list['presentation-data-values'][1]
344
345                await self._receive_queue.put((syntax_name, data))
346
347        except ConnectionError:
348            pass
349
350        except Exception as e:
351            mlog.error("receive loop error: %s", e, exc_info=e)
352
353        finally:
354            self._close(_arp_ppdu())
355            self._receive_queue.close()
356
357    async def _send_loop(self):
358        future = None
359        try:
360            while True:
361                data, future = await self._send_queue.get()
362
363                if data is None:
364                    await self._conn.drain()
365
366                else:
367                    ppdu_data = _encode('User-data',
368                                        _user_data(self._syntax_names, data))
369                    await self._conn.send(ppdu_data)
370
371                if future and not future.done():
372                    future.set_result(None)
373
374        except ConnectionError:
375            pass
376
377        except Exception as e:
378            mlog.error("send loop error: %s", e, exc_info=e)
379
380        finally:
381            self._close(_arp_ppdu())
382            self._send_queue.close()
383
384            while True:
385                if future and not future.done():
386                    future.set_result(None)
387                if self._send_queue.empty():
388                    break
389                _, future = self._send_queue.get_nowait()
390
391
392async def _close_cosp(cosp_conn, ppdu):
393    try:
394        data = _encode('Abort-type', ppdu)
395
396    except Exception as e:
397        mlog.error("error encoding abort ppdu: %s", e, exc_info=e)
398        data = None
399
400    finally:
401        await cosp_conn.async_close(data)
402
403
404def _get_psels(cp_ppdu):
405    cp_params = cp_ppdu['normal-mode-parameters']
406    calling_psel_data = cp_params.get('calling-presentation-selector')
407    calling_psel = (int.from_bytes(calling_psel_data, 'big')
408                    if calling_psel_data else None)
409    called_psel_data = cp_params.get('called-presentation-selector')
410    called_psel = (int.from_bytes(called_psel_data, 'big')
411                   if called_psel_data else None)
412    return calling_psel, called_psel
413
414
415def _validate_connect_response(cp_ppdu, cpa_ppdu):
416    cp_params = cp_ppdu['normal-mode-parameters']
417    cpa_params = cpa_ppdu['normal-mode-parameters']
418    called_psel_data = cp_params.get('called-presentation-selector')
419    responding_psel_data = cpa_params.get('responding-presentation-selector')
420
421    if called_psel_data and responding_psel_data:
422        called_psel = int.from_bytes(called_psel_data, 'big')
423        responding_psel = int.from_bytes(responding_psel_data, 'big')
424
425        if called_psel != responding_psel:
426            raise Exception('presentation selectors not matching')
427
428    result_list = cpa_params['presentation-context-definition-result-list']
429    if any(i['result'] != 0 for i in result_list):
430        raise Exception('presentation context not accepted')
431
432
433def _cp_ppdu(syntax_names, calling_psel, called_psel, user_data):
434    cp_params = {
435        'presentation-context-definition-list': [
436            {'presentation-context-identifier': i,
437             'abstract-syntax-name': name,
438             'transfer-syntax-name-list': [_encoder.syntax_name]}
439            for i, name in syntax_names._syntax_id_names.items()]}
440
441    if calling_psel is not None:
442        cp_params['calling-presentation-selector'] = \
443            calling_psel.to_bytes(4, 'big')
444
445    if called_psel is not None:
446        cp_params['called-presentation-selector'] = \
447            called_psel.to_bytes(4, 'big')
448
449    if user_data:
450        cp_params['user-data'] = _user_data(syntax_names, user_data)
451
452    return {
453        'mode-selector': {
454            'mode-value': 1},
455        'normal-mode-parameters': cp_params}
456
457
458def _cpa_ppdu(syntax_names, responding_psel, user_data):
459    cpa_params = {
460        'presentation-context-definition-result-list': [
461            {'result': 0,
462             'transfer-syntax-name': _encoder.syntax_name}
463            for _ in syntax_names._syntax_id_names.keys()]}
464
465    if responding_psel is not None:
466        cpa_params['responding-presentation-selector'] = \
467            responding_psel.to_bytes(4, 'big')
468
469    if user_data:
470        cpa_params['user-data'] = _user_data(syntax_names, user_data)
471
472    return {
473        'mode-selector': {
474            'mode-value': 1},
475        'normal-mode-parameters': cpa_params}
476
477
478def _aru_ppdu(syntax_names, user_data):
479    aru_params = {}
480
481    if user_data:
482        aru_params['user-data'] = _user_data(syntax_names, user_data)
483
484    return 'aru-ppdu', ('normal-mode-parameters', aru_params)
485
486
487def _arp_ppdu():
488    return 'arp-ppdu', {}
489
490
491def _user_data(syntax_names, user_data):
492    return 'fully-encoded-data', [{
493        'presentation-context-identifier': syntax_names.get_id(user_data[0]),
494        'presentation-data-values': (
495            'single-ASN1-type', user_data[1])}]
496
497
498def _sytax_names_from_cp_ppdu(cp_ppdu):
499    cp_params = cp_ppdu['normal-mode-parameters']
500    syntax_names = SyntaxNames([])
501    syntax_names._syntax_id_names = {
502        i['presentation-context-identifier']: i['abstract-syntax-name']
503        for i in cp_params['presentation-context-definition-list']}
504    syntax_names._syntax_name_ids = {
505        v: k for k, v in syntax_names._syntax_id_names.items()}
506    return syntax_names
507
508
509def _encode(name, value):
510    return _encoder.encode(asn1.TypeRef('ISO8823-PRESENTATION', name), value)
511
512
513def _decode(name, data):
514    res, _ = _encoder.decode(asn1.TypeRef('ISO8823-PRESENTATION', name),
515                             memoryview(data))
516    return res
mlog = <Logger hat.drivers.copp (WARNING)>
class ConnectionInfo(typing.NamedTuple):
25class ConnectionInfo(typing.NamedTuple):
26    local_addr: tcp.Address
27    local_tsel: int | None
28    local_ssel: int | None
29    local_psel: int | None
30    remote_addr: tcp.Address
31    remote_tsel: int | None
32    remote_ssel: int | None
33    remote_psel: int | None

ConnectionInfo(local_addr, local_tsel, local_ssel, local_psel, remote_addr, remote_tsel, remote_ssel, remote_psel)

ConnectionInfo( local_addr: hat.drivers.tcp.Address, local_tsel: int | None, local_ssel: int | None, local_psel: int | None, remote_addr: hat.drivers.tcp.Address, remote_tsel: int | None, remote_ssel: int | None, remote_psel: int | None)

Create new instance of ConnectionInfo(local_addr, local_tsel, local_ssel, local_psel, remote_addr, remote_tsel, remote_ssel, remote_psel)

Alias for field number 0

local_tsel: int | None

Alias for field number 1

local_ssel: int | None

Alias for field number 2

local_psel: int | None

Alias for field number 3

Alias for field number 4

remote_tsel: int | None

Alias for field number 5

remote_ssel: int | None

Alias for field number 6

remote_psel: int | None

Alias for field number 7

IdentifiedEntity: TypeAlias = tuple[tuple[int, ...], hat.asn1.common.Entity]

Identified entity

ValidateCb: TypeAlias = Callable[[ForwardRef('SyntaxNames'), tuple[tuple[int, ...], hat.asn1.common.Entity]], tuple[tuple[int, ...], hat.asn1.common.Entity] | None | Awaitable[tuple[tuple[int, ...], hat.asn1.common.Entity] | None]]

Validate callback

ConnectionCb: TypeAlias = Callable[[ForwardRef('Connection')], None | Awaitable[None]]

Connection callback

class SyntaxNames:
48class SyntaxNames:
49    """Syntax name registry
50
51    Args:
52        syntax_names: list of ASN.1 ObjectIdentifiers representing syntax names
53
54    """
55
56    def __init__(self, syntax_names: list[asn1.ObjectIdentifier]):
57        self._syntax_id_names = {(i * 2 + 1): name
58                                 for i, name in enumerate(syntax_names)}
59        self._syntax_name_ids = {v: k
60                                 for k, v in self._syntax_id_names.items()}
61
62    def get_name(self, syntax_id: int) -> asn1.ObjectIdentifier:
63        """Get syntax name associated with id"""
64        return self._syntax_id_names[syntax_id]
65
66    def get_id(self, syntax_name: asn1.ObjectIdentifier) -> int:
67        """Get syntax id associated with name"""
68        return self._syntax_name_ids[syntax_name]

Syntax name registry

Arguments:
  • syntax_names: list of ASN.1 ObjectIdentifiers representing syntax names
SyntaxNames(syntax_names: list[tuple[int, ...]])
56    def __init__(self, syntax_names: list[asn1.ObjectIdentifier]):
57        self._syntax_id_names = {(i * 2 + 1): name
58                                 for i, name in enumerate(syntax_names)}
59        self._syntax_name_ids = {v: k
60                                 for k, v in self._syntax_id_names.items()}
def get_name(self, syntax_id: int) -> tuple[int, ...]:
62    def get_name(self, syntax_id: int) -> asn1.ObjectIdentifier:
63        """Get syntax name associated with id"""
64        return self._syntax_id_names[syntax_id]

Get syntax name associated with id

def get_id(self, syntax_name: tuple[int, ...]) -> int:
66    def get_id(self, syntax_name: asn1.ObjectIdentifier) -> int:
67        """Get syntax id associated with name"""
68        return self._syntax_name_ids[syntax_name]

Get syntax id associated with name

async def connect( addr: hat.drivers.tcp.Address, syntax_names: SyntaxNames, user_data: tuple[tuple[int, ...], hat.asn1.common.Entity] | None = None, *, local_psel: int | None = None, remote_psel: int | None = None, copp_receive_queue_size: int = 1024, copp_send_queue_size: int = 1024, **kwargs) -> Connection:
 71async def connect(addr: tcp.Address,
 72                  syntax_names: SyntaxNames,
 73                  user_data: IdentifiedEntity | None = None,
 74                  *,
 75                  local_psel: int | None = None,
 76                  remote_psel: int | None = None,
 77                  copp_receive_queue_size: int = 1024,
 78                  copp_send_queue_size: int = 1024,
 79                  **kwargs
 80                  ) -> 'Connection':
 81    """Connect to COPP server
 82
 83    Additional arguments are passed directly to `hat.drivers.cosp.connect`.
 84
 85    """
 86    cp_ppdu = _cp_ppdu(syntax_names, local_psel, remote_psel, user_data)
 87    cp_ppdu_data = _encode('CP-type', cp_ppdu)
 88    conn = await cosp.connect(addr, cp_ppdu_data, **kwargs)
 89
 90    try:
 91        cpa_ppdu = _decode('CPA-PPDU', conn.conn_res_user_data)
 92        _validate_connect_response(cp_ppdu, cpa_ppdu)
 93
 94        calling_psel, called_psel = _get_psels(cp_ppdu)
 95        return Connection(conn, syntax_names, cp_ppdu, cpa_ppdu,
 96                          calling_psel, called_psel,
 97                          copp_receive_queue_size, copp_send_queue_size)
 98
 99    except Exception:
100        await aio.uncancellable(_close_cosp(conn, _arp_ppdu()))
101        raise

Connect to COPP server

Additional arguments are passed directly to hat.drivers.cosp.connect.

async def listen( validate_cb: Callable[[SyntaxNames, tuple[tuple[int, ...], hat.asn1.common.Entity]], tuple[tuple[int, ...], hat.asn1.common.Entity] | None | Awaitable[tuple[tuple[int, ...], hat.asn1.common.Entity] | None]], connection_cb: Callable[[hat.drivers.acse.Connection], None | Awaitable[None]], addr: hat.drivers.tcp.Address = Address(host='0.0.0.0', port=102), *, bind_connections: bool = False, copp_receive_queue_size: int = 1024, copp_send_queue_size: int = 1024, **kwargs) -> Server:
104async def listen(validate_cb: ValidateCb,
105                 connection_cb: ConnectionCb,
106                 addr: tcp.Address = tcp.Address('0.0.0.0', 102),
107                 *,
108                 bind_connections: bool = False,
109                 copp_receive_queue_size: int = 1024,
110                 copp_send_queue_size: int = 1024,
111                 **kwargs
112                 ) -> 'Server':
113    """Create COPP listening server
114
115    Additional arguments are passed directly to `hat.drivers.cosp.listen`.
116
117    Args:
118        validate_cb: callback function or coroutine called on new
119            incomming connection request prior to creating connection object
120        connection_cb: new connection callback
121        addr: local listening address
122
123    """
124    server = Server()
125    server._validate_cb = validate_cb
126    server._connection_cb = connection_cb
127    server._bind_connections = bind_connections
128    server._receive_queue_size = copp_receive_queue_size
129    server._send_queue_size = copp_send_queue_size
130
131    server._srv = await cosp.listen(server._on_validate,
132                                    server._on_connection,
133                                    addr,
134                                    bind_connections=False,
135                                    **kwargs)
136
137    return server

Create COPP listening server

Additional arguments are passed directly to hat.drivers.cosp.listen.

Arguments:
  • validate_cb: callback function or coroutine called on new incomming connection request prior to creating connection object
  • connection_cb: new connection callback
  • addr: local listening address
class Server(hat.aio.group.Resource):
140class Server(aio.Resource):
141    """COPP listening server
142
143    For creating new server see `listen`.
144
145    """
146
147    @property
148    def async_group(self) -> aio.Group:
149        """Async group"""
150        return self._srv.async_group
151
152    @property
153    def addresses(self) -> list[tcp.Address]:
154        """Listening addresses"""
155        return self._srv.addresses
156
157    async def _on_validate(self, user_data):
158        cp_ppdu = _decode('CP-type', user_data)
159        cp_params = cp_ppdu['normal-mode-parameters']
160        called_psel_data = cp_params.get('called-presentation-selector')
161        called_psel = (int.from_bytes(called_psel_data, 'big')
162                       if called_psel_data else None)
163        cp_pdv_list = cp_params['user-data'][1][0]
164        syntax_names = _sytax_names_from_cp_ppdu(cp_ppdu)
165        cp_user_data = (
166            syntax_names.get_name(
167                cp_pdv_list['presentation-context-identifier']),
168            cp_pdv_list['presentation-data-values'][1])
169
170        cpa_user_data = await aio.call(self._validate_cb, syntax_names,
171                                       cp_user_data)
172
173        cpa_ppdu = _cpa_ppdu(syntax_names, called_psel, cpa_user_data)
174        cpa_ppdu_data = _encode('CPA-PPDU', cpa_ppdu)
175        return cpa_ppdu_data
176
177    async def _on_connection(self, cosp_conn):
178        try:
179            try:
180                cp_ppdu = _decode('CP-type', cosp_conn.conn_req_user_data)
181                cpa_ppdu = _decode('CPA-PPDU', cosp_conn.conn_res_user_data)
182
183                syntax_names = _sytax_names_from_cp_ppdu(cp_ppdu)
184                calling_psel, called_psel = _get_psels(cp_ppdu)
185
186                conn = Connection(cosp_conn, syntax_names, cp_ppdu, cpa_ppdu,
187                                  calling_psel, called_psel,
188                                  self._receive_queue_size,
189                                  self._send_queue_size)
190
191            except Exception:
192                await aio.uncancellable(_close_cosp(cosp_conn, _arp_ppdu()))
193                raise
194
195            try:
196                await aio.call(self._connection_cb, conn)
197
198            except BaseException:
199                await aio.uncancellable(conn.async_close())
200                raise
201
202        except Exception as e:
203            mlog.error("error creating new incomming connection: %s", e,
204                       exc_info=e)
205            return
206
207        if not self._bind_connections:
208            return
209
210        try:
211            await conn.wait_closed()
212
213        except BaseException:
214            await aio.uncancellable(conn.async_close())
215            raise

COPP listening server

For creating new server see listen.

async_group: hat.aio.group.Group
147    @property
148    def async_group(self) -> aio.Group:
149        """Async group"""
150        return self._srv.async_group

Async group

addresses: list[hat.drivers.tcp.Address]
152    @property
153    def addresses(self) -> list[tcp.Address]:
154        """Listening addresses"""
155        return self._srv.addresses

Listening addresses

class Connection(hat.aio.group.Resource):
218class Connection(aio.Resource):
219    """COPP connection
220
221    For creating new connection see `connect` or `listen`.
222
223    """
224
225    def __init__(self,
226                 conn: cosp.Connection,
227                 syntax_names: SyntaxNames,
228                 cp_ppdu: asn1.Value,
229                 cpa_ppdu: asn1.Value,
230                 local_psel: int | None,
231                 remote_psel: int | None,
232                 receive_queue_size: int,
233                 send_queue_size: int):
234        cp_user_data = cp_ppdu['normal-mode-parameters']['user-data']
235        cpa_user_data = cpa_ppdu['normal-mode-parameters']['user-data']
236
237        conn_req_user_data = (
238            syntax_names.get_name(
239                cp_user_data[1][0]['presentation-context-identifier']),
240            cp_user_data[1][0]['presentation-data-values'][1])
241        conn_res_user_data = (
242            syntax_names.get_name(
243                cpa_user_data[1][0]['presentation-context-identifier']),
244            cpa_user_data[1][0]['presentation-data-values'][1])
245
246        self._conn = conn
247        self._syntax_names = syntax_names
248        self._conn_req_user_data = conn_req_user_data
249        self._conn_res_user_data = conn_res_user_data
250        self._loop = asyncio.get_running_loop()
251        self._info = ConnectionInfo(local_psel=local_psel,
252                                    remote_psel=remote_psel,
253                                    **conn.info._asdict())
254        self._close_ppdu = _arp_ppdu()
255        self._receive_queue = aio.Queue(receive_queue_size)
256        self._send_queue = aio.Queue(send_queue_size)
257        self._async_group = aio.Group()
258
259        self.async_group.spawn(aio.call_on_cancel, self._on_close)
260        self.async_group.spawn(self._receive_loop)
261        self.async_group.spawn(self._send_loop)
262        self.async_group.spawn(aio.call_on_done, conn.wait_closing(),
263                               self.close)
264
265    @property
266    def async_group(self) -> aio.Group:
267        """Async group"""
268        return self._async_group
269
270    @property
271    def info(self) -> ConnectionInfo:
272        """Connection info"""
273        return self._info
274
275    @property
276    def syntax_names(self) -> SyntaxNames:
277        """Syntax names"""
278        return self._syntax_names
279
280    @property
281    def conn_req_user_data(self) -> IdentifiedEntity:
282        """Connect request's user data"""
283        return self._conn_req_user_data
284
285    @property
286    def conn_res_user_data(self) -> IdentifiedEntity:
287        """Connect response's user data"""
288        return self._conn_res_user_data
289
290    def close(self, user_data: IdentifiedEntity | None = None):
291        """Close connection"""
292        self._close(_aru_ppdu(self._syntax_names, user_data))
293
294    async def async_close(self, user_data: IdentifiedEntity | None = None):
295        """Async close"""
296        self.close(user_data)
297        await self.wait_closed()
298
299    async def receive(self) -> IdentifiedEntity:
300        """Receive data"""
301        try:
302            return await self._receive_queue.get()
303
304        except aio.QueueClosedError:
305            raise ConnectionError()
306
307    async def send(self, data: IdentifiedEntity):
308        """Send data"""
309        try:
310            await self._send_queue.put((data, None))
311
312        except aio.QueueClosedError:
313            raise ConnectionError()
314
315    async def drain(self):
316        """Drain output buffer"""
317        try:
318            future = self._loop.create_future()
319            await self._send_queue.put((None, future))
320            await future
321
322        except aio.QueueClosedError:
323            raise ConnectionError()
324
325    async def _on_close(self):
326        await _close_cosp(self._conn, self._close_ppdu)
327
328    def _close(self, ppdu):
329        if not self.is_open:
330            return
331
332        self._close_ppdu = ppdu
333        self._async_group.close()
334
335    async def _receive_loop(self):
336        try:
337            while True:
338                cosp_data = await self._conn.receive()
339
340                user_data = _decode('User-data', cosp_data)
341                pdv_list = user_data[1][0]
342                syntax_name = self._syntax_names.get_name(
343                    pdv_list['presentation-context-identifier'])
344                data = pdv_list['presentation-data-values'][1]
345
346                await self._receive_queue.put((syntax_name, data))
347
348        except ConnectionError:
349            pass
350
351        except Exception as e:
352            mlog.error("receive loop error: %s", e, exc_info=e)
353
354        finally:
355            self._close(_arp_ppdu())
356            self._receive_queue.close()
357
358    async def _send_loop(self):
359        future = None
360        try:
361            while True:
362                data, future = await self._send_queue.get()
363
364                if data is None:
365                    await self._conn.drain()
366
367                else:
368                    ppdu_data = _encode('User-data',
369                                        _user_data(self._syntax_names, data))
370                    await self._conn.send(ppdu_data)
371
372                if future and not future.done():
373                    future.set_result(None)
374
375        except ConnectionError:
376            pass
377
378        except Exception as e:
379            mlog.error("send loop error: %s", e, exc_info=e)
380
381        finally:
382            self._close(_arp_ppdu())
383            self._send_queue.close()
384
385            while True:
386                if future and not future.done():
387                    future.set_result(None)
388                if self._send_queue.empty():
389                    break
390                _, future = self._send_queue.get_nowait()

COPP connection

For creating new connection see connect or listen.

Connection( conn: Connection, syntax_names: SyntaxNames, cp_ppdu: Union[bool, int, Collection[bool], bytes, bytearray, memoryview, NoneType, tuple[int, ...], str, float, Tuple[str, ForwardRef('Value')], Dict[str, ForwardRef('Value')], Collection['Value'], hat.asn1.common.Entity, hat.asn1.common.External, hat.asn1.common.EmbeddedPDV], cpa_ppdu: Union[bool, int, Collection[bool], bytes, bytearray, memoryview, NoneType, tuple[int, ...], str, float, Tuple[str, ForwardRef('Value')], Dict[str, ForwardRef('Value')], Collection['Value'], hat.asn1.common.Entity, hat.asn1.common.External, hat.asn1.common.EmbeddedPDV], local_psel: int | None, remote_psel: int | None, receive_queue_size: int, send_queue_size: int)
225    def __init__(self,
226                 conn: cosp.Connection,
227                 syntax_names: SyntaxNames,
228                 cp_ppdu: asn1.Value,
229                 cpa_ppdu: asn1.Value,
230                 local_psel: int | None,
231                 remote_psel: int | None,
232                 receive_queue_size: int,
233                 send_queue_size: int):
234        cp_user_data = cp_ppdu['normal-mode-parameters']['user-data']
235        cpa_user_data = cpa_ppdu['normal-mode-parameters']['user-data']
236
237        conn_req_user_data = (
238            syntax_names.get_name(
239                cp_user_data[1][0]['presentation-context-identifier']),
240            cp_user_data[1][0]['presentation-data-values'][1])
241        conn_res_user_data = (
242            syntax_names.get_name(
243                cpa_user_data[1][0]['presentation-context-identifier']),
244            cpa_user_data[1][0]['presentation-data-values'][1])
245
246        self._conn = conn
247        self._syntax_names = syntax_names
248        self._conn_req_user_data = conn_req_user_data
249        self._conn_res_user_data = conn_res_user_data
250        self._loop = asyncio.get_running_loop()
251        self._info = ConnectionInfo(local_psel=local_psel,
252                                    remote_psel=remote_psel,
253                                    **conn.info._asdict())
254        self._close_ppdu = _arp_ppdu()
255        self._receive_queue = aio.Queue(receive_queue_size)
256        self._send_queue = aio.Queue(send_queue_size)
257        self._async_group = aio.Group()
258
259        self.async_group.spawn(aio.call_on_cancel, self._on_close)
260        self.async_group.spawn(self._receive_loop)
261        self.async_group.spawn(self._send_loop)
262        self.async_group.spawn(aio.call_on_done, conn.wait_closing(),
263                               self.close)
async_group: hat.aio.group.Group
265    @property
266    def async_group(self) -> aio.Group:
267        """Async group"""
268        return self._async_group

Async group

info: ConnectionInfo
270    @property
271    def info(self) -> ConnectionInfo:
272        """Connection info"""
273        return self._info

Connection info

syntax_names: SyntaxNames
275    @property
276    def syntax_names(self) -> SyntaxNames:
277        """Syntax names"""
278        return self._syntax_names

Syntax names

conn_req_user_data: tuple[tuple[int, ...], hat.asn1.common.Entity]
280    @property
281    def conn_req_user_data(self) -> IdentifiedEntity:
282        """Connect request's user data"""
283        return self._conn_req_user_data

Connect request's user data

conn_res_user_data: tuple[tuple[int, ...], hat.asn1.common.Entity]
285    @property
286    def conn_res_user_data(self) -> IdentifiedEntity:
287        """Connect response's user data"""
288        return self._conn_res_user_data

Connect response's user data

def close( self, user_data: tuple[tuple[int, ...], hat.asn1.common.Entity] | None = None):
290    def close(self, user_data: IdentifiedEntity | None = None):
291        """Close connection"""
292        self._close(_aru_ppdu(self._syntax_names, user_data))

Close connection

async def async_close( self, user_data: tuple[tuple[int, ...], hat.asn1.common.Entity] | None = None):
294    async def async_close(self, user_data: IdentifiedEntity | None = None):
295        """Async close"""
296        self.close(user_data)
297        await self.wait_closed()

Async close

async def receive(self) -> tuple[tuple[int, ...], hat.asn1.common.Entity]:
299    async def receive(self) -> IdentifiedEntity:
300        """Receive data"""
301        try:
302            return await self._receive_queue.get()
303
304        except aio.QueueClosedError:
305            raise ConnectionError()

Receive data

async def send(self, data: tuple[tuple[int, ...], hat.asn1.common.Entity]):
307    async def send(self, data: IdentifiedEntity):
308        """Send data"""
309        try:
310            await self._send_queue.put((data, None))
311
312        except aio.QueueClosedError:
313            raise ConnectionError()

Send data

async def drain(self):
315    async def drain(self):
316        """Drain output buffer"""
317        try:
318            future = self._loop.create_future()
319            await self._send_queue.put((None, future))
320            await future
321
322        except aio.QueueClosedError:
323            raise ConnectionError()

Drain output buffer