hat.drivers.cotp

Connection oriented transport protocol

 1"""Connection oriented transport protocol"""
 2
 3from hat.drivers.cotp.connection import (ConnectionInfo,
 4                                         ConnectionCb,
 5                                         connect,
 6                                         listen,
 7                                         Server,
 8                                         Connection)
 9
10
11__all__ = ['ConnectionInfo',
12           'ConnectionCb',
13           'connect',
14           'listen',
15           'Server',
16           'Connection']
class ConnectionInfo(typing.NamedTuple):
22class ConnectionInfo(typing.NamedTuple):
23    local_addr: tcp.Address
24    local_tsel: int | None
25    remote_addr: tcp.Address
26    remote_tsel: int | None

ConnectionInfo(local_addr, local_tsel, remote_addr, remote_tsel)

ConnectionInfo( local_addr: hat.drivers.tcp.Address, local_tsel: int | None, remote_addr: hat.drivers.tcp.Address, remote_tsel: int | None)

Create new instance of ConnectionInfo(local_addr, local_tsel, remote_addr, remote_tsel)

Alias for field number 0

local_tsel: int | None

Alias for field number 1

Alias for field number 2

remote_tsel: int | None

Alias for field number 3

ConnectionCb = typing.Callable[[ForwardRef('Connection')], None | collections.abc.Awaitable[None]]
async def connect( addr: hat.drivers.tcp.Address, *, local_tsel: int | None = None, remote_tsel: int | None = None, cotp_receive_queue_size: int = 1024, cotp_send_queue_size: int = 1024, **kwargs) -> Connection:
33async def connect(addr: tcp.Address,
34                  *,
35                  local_tsel: int | None = None,
36                  remote_tsel: int | None = None,
37                  cotp_receive_queue_size: int = 1024,
38                  cotp_send_queue_size: int = 1024,
39                  **kwargs
40                  ) -> 'Connection':
41    """Create new COTP connection
42
43    Additional arguments are passed directly to `hat.drivers.tpkt.connect`.
44
45    """
46    conn = await tpkt.connect(addr, **kwargs)
47
48    try:
49        cr_tpdu = common.CR(src=next(_next_srcs),
50                            cls=0,
51                            calling_tsel=local_tsel,
52                            called_tsel=remote_tsel,
53                            max_tpdu=2048,
54                            pref_max_tpdu=None)
55        cr_tpdu_bytes = encoder.encode(cr_tpdu)
56        await conn.send(cr_tpdu_bytes)
57
58        cc_tpdu_bytes = await conn.receive()
59        cc_tpdu = encoder.decode(memoryview(cc_tpdu_bytes))
60        _validate_connect_response(cr_tpdu, cc_tpdu)
61
62        max_tpdu = _calculate_max_tpdu(cr_tpdu, cc_tpdu)
63        calling_tsel, called_tsel = _get_tsels(cr_tpdu, cc_tpdu)
64
65        return Connection(conn, max_tpdu, calling_tsel, called_tsel,
66                          cotp_receive_queue_size, cotp_send_queue_size)
67
68    except BaseException:
69        await aio.uncancellable(conn.async_close())
70        raise

Create new COTP connection

Additional arguments are passed directly to hat.drivers.tpkt.connect.

async def listen( connection_cb: Callable[[hat.drivers.acse.Connection], None | Awaitable[None]], addr: hat.drivers.tcp.Address = Address(host='0.0.0.0', port=102), *, cotp_receive_queue_size: int = 1024, cotp_send_queue_size: int = 1024, **kwargs) -> Server:
73async def listen(connection_cb: ConnectionCb,
74                 addr: tcp.Address = tcp.Address('0.0.0.0', 102),
75                 *,
76                 cotp_receive_queue_size: int = 1024,
77                 cotp_send_queue_size: int = 1024,
78                 **kwargs
79                 ) -> 'Server':
80    """Create new COTP listening server
81
82    Additional arguments are passed directly to `hat.drivers.tpkt.listen`.
83
84    """
85    server = Server()
86    server._connection_cb = connection_cb
87    server._receive_queue_size = cotp_receive_queue_size
88    server._send_queue_size = cotp_send_queue_size
89
90    server._srv = await tpkt.listen(server._on_connection, addr, **kwargs)
91
92    return server

Create new COTP listening server

Additional arguments are passed directly to hat.drivers.tpkt.listen.

class Server(hat.aio.group.Resource):
 95class Server(aio.Resource):
 96    """COTP listening server
 97
 98    For creation of new instance see `listen` coroutine.
 99
100    """
101
102    @property
103    def async_group(self) -> aio.Group:
104        """Async group"""
105        return self._srv.async_group
106
107    @property
108    def addresses(self) -> list[tcp.Address]:
109        """Listening addresses"""
110        return self._srv.addresses
111
112    async def _on_connection(self, tpkt_conn):
113        try:
114            try:
115                cr_tpdu_bytes = await tpkt_conn.receive()
116                cr_tpdu = encoder.decode(memoryview(cr_tpdu_bytes))
117                _validate_connect_request(cr_tpdu)
118
119                cc_tpdu = common.CC(dst=cr_tpdu.src,
120                                    src=next(_next_srcs),
121                                    cls=0,
122                                    calling_tsel=cr_tpdu.calling_tsel,
123                                    called_tsel=cr_tpdu.called_tsel,
124                                    max_tpdu=_calculate_cc_max_tpdu(cr_tpdu),
125                                    pref_max_tpdu=None)
126                cc_tpdu_bytes = encoder.encode(cc_tpdu)
127                await tpkt_conn.send(cc_tpdu_bytes)
128
129                max_tpdu = _calculate_max_tpdu(cr_tpdu, cc_tpdu)
130                calling_tsel, called_tsel = _get_tsels(cr_tpdu, cc_tpdu)
131                conn = Connection(tpkt_conn, max_tpdu,
132                                  called_tsel, calling_tsel,
133                                  self._receive_queue_size,
134                                  self._send_queue_size)
135
136            except BaseException:
137                await aio.uncancellable(tpkt_conn.async_close())
138                raise
139
140            try:
141                await aio.call(self._connection_cb, conn)
142
143            except BaseException:
144                await aio.uncancellable(conn.async_close())
145                raise
146
147        except Exception as e:
148            mlog.error("error creating new incomming connection: %s", e,
149                       exc_info=e)

COTP listening server

For creation of new instance see listen coroutine.

async_group: hat.aio.group.Group
102    @property
103    def async_group(self) -> aio.Group:
104        """Async group"""
105        return self._srv.async_group

Async group

addresses: list[hat.drivers.tcp.Address]
107    @property
108    def addresses(self) -> list[tcp.Address]:
109        """Listening addresses"""
110        return self._srv.addresses

Listening addresses

class Connection(hat.aio.group.Resource):
152class Connection(aio.Resource):
153    """COTP connection
154
155    For creation of new instance see `connect` or `listen`.
156
157    """
158
159    def __init__(self,
160                 conn: tpkt.Connection,
161                 max_tpdu: int,
162                 local_tsel: int | None,
163                 remote_tsel: int | None,
164                 receive_queue_size: int,
165                 send_queue_size: int):
166        self._conn = conn
167        self._max_tpdu = max_tpdu
168        self._loop = asyncio.get_running_loop()
169        self._info = ConnectionInfo(local_tsel=local_tsel,
170                                    remote_tsel=remote_tsel,
171                                    **conn.info._asdict())
172        self._receive_queue = aio.Queue(receive_queue_size)
173        self._send_queue = aio.Queue(send_queue_size)
174
175        self.async_group.spawn(self._receive_loop)
176        self.async_group.spawn(self._send_loop)
177
178    @property
179    def async_group(self) -> aio.Group:
180        """Async group"""
181        return self._conn.async_group
182
183    @property
184    def info(self) -> ConnectionInfo:
185        """Connection info"""
186        return self._info
187
188    async def receive(self) -> util.Bytes:
189        """Receive data"""
190        try:
191            return await self._receive_queue.get()
192
193        except aio.QueueClosedError:
194            raise ConnectionError()
195
196    async def send(self, data: util.Bytes):
197        """Send data"""
198        try:
199            await self._send_queue.put((data, None))
200
201        except aio.QueueClosedError:
202            raise ConnectionError()
203
204    async def drain(self):
205        """Drain output buffer"""
206        try:
207            future = self._loop.create_future()
208            await self._send_queue.put((None, future))
209            await future
210
211        except aio.QueueClosedError:
212            raise ConnectionError()
213
214    async def _receive_loop(self):
215        try:
216            data_queue = collections.deque()
217            while True:
218                tpdu_bytes = await self._conn.receive()
219                tpdu = encoder.decode(memoryview(tpdu_bytes))
220
221                if isinstance(tpdu, (common.DR, common.ER)):
222                    mlog.info("received disconnect request / error")
223                    break
224
225                if not isinstance(tpdu, common.DT):
226                    continue
227
228                data_queue.append(tpdu.data)
229
230                if not tpdu.eot:
231                    continue
232
233                data = bytes(itertools.chain.from_iterable(data_queue))
234                data_queue.clear()
235
236                await self._receive_queue.put(data)
237
238        except ConnectionError:
239            pass
240
241        except Exception as e:
242            mlog.error("receive loop error: %s", e, exc_info=e)
243
244        finally:
245            self.close()
246            self._receive_queue.close()
247
248    async def _send_loop(self):
249        future = None
250        try:
251            while True:
252                data, future = await self._send_queue.get()
253
254                if data is None:
255                    await self._conn.drain()
256
257                else:
258                    data = memoryview(data)
259                    max_size = self._max_tpdu - 3
260
261                    while data:
262                        single_data, data = data[:max_size], data[max_size:]
263
264                        tpdu = common.DT(eot=not data, data=single_data)
265                        tpdu_bytes = encoder.encode(tpdu)
266                        await self._conn.send(tpdu_bytes)
267
268                if future and not future.done():
269                    future.set_result(None)
270
271        except ConnectionError:
272            pass
273
274        except Exception as e:
275            mlog.error("send loop error: %s", e, exc_info=e)
276
277        finally:
278            self.close()
279            self._send_queue.close()
280
281            while True:
282                if future and not future.done():
283                    future.set_result(None)
284                if self._send_queue.empty():
285                    break
286                _, future = self._send_queue.get_nowait()

COTP connection

For creation of new instance see connect or listen.

Connection( conn: hat.drivers.tpkt.Connection, max_tpdu: int, local_tsel: int | None, remote_tsel: int | None, receive_queue_size: int, send_queue_size: int)
159    def __init__(self,
160                 conn: tpkt.Connection,
161                 max_tpdu: int,
162                 local_tsel: int | None,
163                 remote_tsel: int | None,
164                 receive_queue_size: int,
165                 send_queue_size: int):
166        self._conn = conn
167        self._max_tpdu = max_tpdu
168        self._loop = asyncio.get_running_loop()
169        self._info = ConnectionInfo(local_tsel=local_tsel,
170                                    remote_tsel=remote_tsel,
171                                    **conn.info._asdict())
172        self._receive_queue = aio.Queue(receive_queue_size)
173        self._send_queue = aio.Queue(send_queue_size)
174
175        self.async_group.spawn(self._receive_loop)
176        self.async_group.spawn(self._send_loop)
async_group: hat.aio.group.Group
178    @property
179    def async_group(self) -> aio.Group:
180        """Async group"""
181        return self._conn.async_group

Async group

info: ConnectionInfo
183    @property
184    def info(self) -> ConnectionInfo:
185        """Connection info"""
186        return self._info

Connection info

async def receive(self) -> bytes | bytearray | memoryview:
188    async def receive(self) -> util.Bytes:
189        """Receive data"""
190        try:
191            return await self._receive_queue.get()
192
193        except aio.QueueClosedError:
194            raise ConnectionError()

Receive data

async def send(self, data: bytes | bytearray | memoryview):
196    async def send(self, data: util.Bytes):
197        """Send data"""
198        try:
199            await self._send_queue.put((data, None))
200
201        except aio.QueueClosedError:
202            raise ConnectionError()

Send data

async def drain(self):
204    async def drain(self):
205        """Drain output buffer"""
206        try:
207            future = self._loop.create_future()
208            await self._send_queue.put((None, future))
209            await future
210
211        except aio.QueueClosedError:
212            raise ConnectionError()

Drain output buffer