hat.drivers.cotp

Connection oriented transport protocol

 1"""Connection oriented transport protocol"""
 2
 3from hat.drivers.cotp.connection import (ConnectionInfo,
 4                                         ConnectionCb,
 5                                         connect,
 6                                         listen,
 7                                         Server,
 8                                         Connection)
 9
10
11__all__ = ['ConnectionInfo',
12           'ConnectionCb',
13           'connect',
14           'listen',
15           'Server',
16           'Connection']
class ConnectionInfo(typing.NamedTuple):
22class ConnectionInfo(typing.NamedTuple):
23    name: str | None
24    local_addr: tcp.Address
25    local_tsel: int | None
26    remote_addr: tcp.Address
27    remote_tsel: int | None

ConnectionInfo(name, local_addr, local_tsel, remote_addr, remote_tsel)

ConnectionInfo( name: str | None, local_addr: hat.drivers.tcp.Address, local_tsel: int | None, remote_addr: hat.drivers.tcp.Address, remote_tsel: int | None)

Create new instance of ConnectionInfo(name, local_addr, local_tsel, remote_addr, remote_tsel)

name: str | None

Alias for field number 0

Alias for field number 1

local_tsel: int | None

Alias for field number 2

Alias for field number 3

remote_tsel: int | None

Alias for field number 4

ConnectionCb = typing.Callable[[ForwardRef('Connection')], None | collections.abc.Awaitable[None]]
async def connect( addr: hat.drivers.tcp.Address, *, local_tsel: int | None = None, remote_tsel: int | None = None, cotp_receive_queue_size: int = 1024, cotp_send_queue_size: int = 1024, **kwargs) -> Connection:
34async def connect(addr: tcp.Address,
35                  *,
36                  local_tsel: int | None = None,
37                  remote_tsel: int | None = None,
38                  cotp_receive_queue_size: int = 1024,
39                  cotp_send_queue_size: int = 1024,
40                  **kwargs
41                  ) -> 'Connection':
42    """Create new COTP connection
43
44    Additional arguments are passed directly to `hat.drivers.tpkt.connect`.
45
46    """
47    conn = await tpkt.connect(addr, **kwargs)
48
49    try:
50        cr_tpdu = common.CR(src=next(_next_srcs),
51                            cls=0,
52                            calling_tsel=local_tsel,
53                            called_tsel=remote_tsel,
54                            max_tpdu=2048,
55                            pref_max_tpdu=None)
56        cr_tpdu_bytes = encoder.encode(cr_tpdu)
57        await conn.send(cr_tpdu_bytes)
58
59        cc_tpdu_bytes = await conn.receive()
60        cc_tpdu = encoder.decode(memoryview(cc_tpdu_bytes))
61        _validate_connect_response(cr_tpdu, cc_tpdu)
62
63        max_tpdu = _calculate_max_tpdu(cr_tpdu, cc_tpdu)
64        calling_tsel, called_tsel = _get_tsels(cr_tpdu, cc_tpdu)
65
66        return Connection(conn, max_tpdu, calling_tsel, called_tsel,
67                          cotp_receive_queue_size, cotp_send_queue_size)
68
69    except BaseException:
70        await aio.uncancellable(conn.async_close())
71        raise

Create new COTP connection

Additional arguments are passed directly to hat.drivers.tpkt.connect.

async def listen( connection_cb: Callable[[Connection], None | Awaitable[None]], addr: hat.drivers.tcp.Address = Address(host='0.0.0.0', port=102), *, cotp_receive_queue_size: int = 1024, cotp_send_queue_size: int = 1024, **kwargs) -> Server:
74async def listen(connection_cb: ConnectionCb,
75                 addr: tcp.Address = tcp.Address('0.0.0.0', 102),
76                 *,
77                 cotp_receive_queue_size: int = 1024,
78                 cotp_send_queue_size: int = 1024,
79                 **kwargs
80                 ) -> 'Server':
81    """Create new COTP listening server
82
83    Additional arguments are passed directly to `hat.drivers.tpkt.listen`.
84
85    """
86    server = Server()
87    server._connection_cb = connection_cb
88    server._receive_queue_size = cotp_receive_queue_size
89    server._send_queue_size = cotp_send_queue_size
90    server._log = _create_server_logger(kwargs.get('name'), None)
91
92    server._srv = await tpkt.listen(server._on_connection, addr, **kwargs)
93
94    server._log = _create_server_logger(kwargs.get('name'), server._srv.info)
95
96    return server

Create new COTP listening server

Additional arguments are passed directly to hat.drivers.tpkt.listen.

class Server(hat.aio.group.Resource):
 99class Server(aio.Resource):
100    """COTP listening server
101
102    For creation of new instance see `listen` coroutine.
103
104    """
105
106    @property
107    def async_group(self) -> aio.Group:
108        """Async group"""
109        return self._srv.async_group
110
111    @property
112    def info(self) -> tcp.ServerInfo:
113        """Server info"""
114        return self._srv.info
115
116    async def _on_connection(self, tpkt_conn):
117        try:
118            try:
119                cr_tpdu_bytes = await tpkt_conn.receive()
120                cr_tpdu = encoder.decode(memoryview(cr_tpdu_bytes))
121                _validate_connect_request(cr_tpdu)
122
123                cc_tpdu = common.CC(dst=cr_tpdu.src,
124                                    src=next(_next_srcs),
125                                    cls=0,
126                                    calling_tsel=cr_tpdu.calling_tsel,
127                                    called_tsel=cr_tpdu.called_tsel,
128                                    max_tpdu=_calculate_cc_max_tpdu(cr_tpdu),
129                                    pref_max_tpdu=None)
130                cc_tpdu_bytes = encoder.encode(cc_tpdu)
131                await tpkt_conn.send(cc_tpdu_bytes)
132
133                max_tpdu = _calculate_max_tpdu(cr_tpdu, cc_tpdu)
134                calling_tsel, called_tsel = _get_tsels(cr_tpdu, cc_tpdu)
135                conn = Connection(tpkt_conn, max_tpdu,
136                                  called_tsel, calling_tsel,
137                                  self._receive_queue_size,
138                                  self._send_queue_size)
139
140            except BaseException:
141                await aio.uncancellable(tpkt_conn.async_close())
142                raise
143
144            try:
145                await aio.call(self._connection_cb, conn)
146
147            except BaseException:
148                await aio.uncancellable(conn.async_close())
149                raise
150
151        except Exception as e:
152            self._log.error("error creating new incomming connection: %s",
153                            e, exc_info=e)

COTP listening server

For creation of new instance see listen coroutine.

async_group: hat.aio.group.Group
106    @property
107    def async_group(self) -> aio.Group:
108        """Async group"""
109        return self._srv.async_group

Async group

info: hat.drivers.tcp.ServerInfo
111    @property
112    def info(self) -> tcp.ServerInfo:
113        """Server info"""
114        return self._srv.info

Server info

class Connection(hat.aio.group.Resource):
156class Connection(aio.Resource):
157    """COTP connection
158
159    For creation of new instance see `connect` or `listen`.
160
161    """
162
163    def __init__(self,
164                 conn: tpkt.Connection,
165                 max_tpdu: int,
166                 local_tsel: int | None,
167                 remote_tsel: int | None,
168                 receive_queue_size: int,
169                 send_queue_size: int):
170        self._conn = conn
171        self._max_tpdu = max_tpdu
172        self._loop = asyncio.get_running_loop()
173        self._info = ConnectionInfo(local_tsel=local_tsel,
174                                    remote_tsel=remote_tsel,
175                                    **conn.info._asdict())
176        self._receive_queue = aio.Queue(receive_queue_size)
177        self._send_queue = aio.Queue(send_queue_size)
178        self._log = _create_connection_logger(self._info)
179
180        self.async_group.spawn(self._receive_loop)
181        self.async_group.spawn(self._send_loop)
182
183    @property
184    def async_group(self) -> aio.Group:
185        """Async group"""
186        return self._conn.async_group
187
188    @property
189    def info(self) -> ConnectionInfo:
190        """Connection info"""
191        return self._info
192
193    async def receive(self) -> util.Bytes:
194        """Receive data"""
195        try:
196            return await self._receive_queue.get()
197
198        except aio.QueueClosedError:
199            raise ConnectionError()
200
201    async def send(self, data: util.Bytes):
202        """Send data"""
203        try:
204            await self._send_queue.put((data, None))
205
206        except aio.QueueClosedError:
207            raise ConnectionError()
208
209    async def drain(self):
210        """Drain output buffer"""
211        try:
212            future = self._loop.create_future()
213            await self._send_queue.put((None, future))
214            await future
215
216        except aio.QueueClosedError:
217            raise ConnectionError()
218
219    async def _receive_loop(self):
220        try:
221            data_queue = collections.deque()
222            while True:
223                tpdu_bytes = await self._conn.receive()
224                tpdu = encoder.decode(memoryview(tpdu_bytes))
225
226                if isinstance(tpdu, (common.DR, common.ER)):
227                    self._log.info("received disconnect request / error")
228                    break
229
230                if not isinstance(tpdu, common.DT):
231                    continue
232
233                data_queue.append(tpdu.data)
234
235                if not tpdu.eot:
236                    continue
237
238                data = bytes(itertools.chain.from_iterable(data_queue))
239                data_queue.clear()
240
241                await self._receive_queue.put(data)
242
243        except ConnectionError:
244            pass
245
246        except Exception as e:
247            self._log.error("receive loop error: %s", e, exc_info=e)
248
249        finally:
250            self.close()
251            self._receive_queue.close()
252
253    async def _send_loop(self):
254        future = None
255        try:
256            while True:
257                data, future = await self._send_queue.get()
258
259                if data is None:
260                    await self._conn.drain()
261
262                else:
263                    data = memoryview(data)
264                    max_size = self._max_tpdu - 3
265
266                    while data:
267                        single_data, data = data[:max_size], data[max_size:]
268
269                        tpdu = common.DT(eot=not data, data=single_data)
270                        tpdu_bytes = encoder.encode(tpdu)
271
272                        await self._conn.send(tpdu_bytes)
273
274                if future and not future.done():
275                    future.set_result(None)
276
277        except ConnectionError:
278            pass
279
280        except Exception as e:
281            self._log.error("send loop error: %s", e, exc_info=e)
282
283        finally:
284            self.close()
285            self._send_queue.close()
286
287            while True:
288                if future and not future.done():
289                    future.set_result(None)
290                if self._send_queue.empty():
291                    break
292                _, future = self._send_queue.get_nowait()

COTP connection

For creation of new instance see connect or listen.

Connection( conn: hat.drivers.tpkt.Connection, max_tpdu: int, local_tsel: int | None, remote_tsel: int | None, receive_queue_size: int, send_queue_size: int)
163    def __init__(self,
164                 conn: tpkt.Connection,
165                 max_tpdu: int,
166                 local_tsel: int | None,
167                 remote_tsel: int | None,
168                 receive_queue_size: int,
169                 send_queue_size: int):
170        self._conn = conn
171        self._max_tpdu = max_tpdu
172        self._loop = asyncio.get_running_loop()
173        self._info = ConnectionInfo(local_tsel=local_tsel,
174                                    remote_tsel=remote_tsel,
175                                    **conn.info._asdict())
176        self._receive_queue = aio.Queue(receive_queue_size)
177        self._send_queue = aio.Queue(send_queue_size)
178        self._log = _create_connection_logger(self._info)
179
180        self.async_group.spawn(self._receive_loop)
181        self.async_group.spawn(self._send_loop)
async_group: hat.aio.group.Group
183    @property
184    def async_group(self) -> aio.Group:
185        """Async group"""
186        return self._conn.async_group

Async group

info: ConnectionInfo
188    @property
189    def info(self) -> ConnectionInfo:
190        """Connection info"""
191        return self._info

Connection info

async def receive(self) -> bytes | bytearray | memoryview:
193    async def receive(self) -> util.Bytes:
194        """Receive data"""
195        try:
196            return await self._receive_queue.get()
197
198        except aio.QueueClosedError:
199            raise ConnectionError()

Receive data

async def send(self, data: bytes | bytearray | memoryview):
201    async def send(self, data: util.Bytes):
202        """Send data"""
203        try:
204            await self._send_queue.put((data, None))
205
206        except aio.QueueClosedError:
207            raise ConnectionError()

Send data

async def drain(self):
209    async def drain(self):
210        """Drain output buffer"""
211        try:
212            future = self._loop.create_future()
213            await self._send_queue.put((None, future))
214            await future
215
216        except aio.QueueClosedError:
217            raise ConnectionError()

Drain output buffer