hat.drivers.cotp

Connection oriented transport protocol

 1"""Connection oriented transport protocol"""
 2
 3from hat.drivers.cotp.connection import (ConnectionInfo,
 4                                         ConnectionCb,
 5                                         connect,
 6                                         listen,
 7                                         Server,
 8                                         Connection)
 9
10
11__all__ = ['ConnectionInfo',
12           'ConnectionCb',
13           'connect',
14           'listen',
15           'Server',
16           'Connection']
class ConnectionInfo(typing.NamedTuple):
22class ConnectionInfo(typing.NamedTuple):
23    name: str | None
24    local_addr: tcp.Address
25    local_tsel: int | None
26    remote_addr: tcp.Address
27    remote_tsel: int | None

ConnectionInfo(name, local_addr, local_tsel, remote_addr, remote_tsel)

ConnectionInfo( name: str | None, local_addr: hat.drivers.tcp.Address, local_tsel: int | None, remote_addr: hat.drivers.tcp.Address, remote_tsel: int | None)

Create new instance of ConnectionInfo(name, local_addr, local_tsel, remote_addr, remote_tsel)

name: str | None

Alias for field number 0

Alias for field number 1

local_tsel: int | None

Alias for field number 2

Alias for field number 3

remote_tsel: int | None

Alias for field number 4

ConnectionCb = typing.Callable[[ForwardRef('Connection')], None | collections.abc.Awaitable[None]]
async def connect( addr: hat.drivers.tcp.Address, *, local_tsel: int | None = None, remote_tsel: int | None = None, cotp_receive_queue_size: int = 1024, cotp_send_queue_size: int = 1024, **kwargs) -> Connection:
34async def connect(addr: tcp.Address,
35                  *,
36                  local_tsel: int | None = None,
37                  remote_tsel: int | None = None,
38                  cotp_receive_queue_size: int = 1024,
39                  cotp_send_queue_size: int = 1024,
40                  **kwargs
41                  ) -> 'Connection':
42    """Create new COTP connection
43
44    Additional arguments are passed directly to `hat.drivers.tpkt.connect`.
45
46    """
47    conn = await tpkt.connect(addr, **kwargs)
48
49    try:
50        cr_tpdu = common.CR(src=next(_next_srcs),
51                            cls=0,
52                            calling_tsel=local_tsel,
53                            called_tsel=remote_tsel,
54                            max_tpdu=2048,
55                            pref_max_tpdu=None)
56        cr_tpdu_bytes = encoder.encode(cr_tpdu)
57        await conn.send(cr_tpdu_bytes)
58
59        cc_tpdu_bytes = await conn.receive()
60        cc_tpdu = encoder.decode(memoryview(cc_tpdu_bytes))
61        _validate_connect_response(cr_tpdu, cc_tpdu)
62
63        max_tpdu = _calculate_max_tpdu(cr_tpdu, cc_tpdu)
64        calling_tsel, called_tsel = _get_tsels(cr_tpdu, cc_tpdu)
65
66        return Connection(conn, max_tpdu, calling_tsel, called_tsel,
67                          cotp_receive_queue_size, cotp_send_queue_size)
68
69    except BaseException:
70        await aio.uncancellable(conn.async_close())
71        raise

Create new COTP connection

Additional arguments are passed directly to hat.drivers.tpkt.connect.

async def listen( connection_cb: Callable[[Connection], None | Awaitable[None]], addr: hat.drivers.tcp.Address = Address(host='0.0.0.0', port=102), *, cotp_receive_queue_size: int = 1024, cotp_send_queue_size: int = 1024, **kwargs) -> Server:
74async def listen(connection_cb: ConnectionCb,
75                 addr: tcp.Address = tcp.Address('0.0.0.0', 102),
76                 *,
77                 cotp_receive_queue_size: int = 1024,
78                 cotp_send_queue_size: int = 1024,
79                 **kwargs
80                 ) -> 'Server':
81    """Create new COTP listening server
82
83    Additional arguments are passed directly to `hat.drivers.tpkt.listen`.
84
85    """
86    server = Server()
87    server._connection_cb = connection_cb
88    server._receive_queue_size = cotp_receive_queue_size
89    server._send_queue_size = cotp_send_queue_size
90    server._log = mlog
91
92    server._srv = await tpkt.listen(server._on_connection, addr, **kwargs)
93
94    server._log = _create_server_logger_adapter(server._srv.info)
95
96    return server

Create new COTP listening server

Additional arguments are passed directly to hat.drivers.tpkt.listen.

class Server(hat.aio.group.Resource):
 99class Server(aio.Resource):
100    """COTP listening server
101
102    For creation of new instance see `listen` coroutine.
103
104    """
105
106    @property
107    def async_group(self) -> aio.Group:
108        """Async group"""
109        return self._srv.async_group
110
111    @property
112    def info(self) -> tcp.ServerInfo:
113        """Server info"""
114        return self._srv.info
115
116    async def _on_connection(self, tpkt_conn):
117        try:
118            try:
119                cr_tpdu_bytes = await tpkt_conn.receive()
120                cr_tpdu = encoder.decode(memoryview(cr_tpdu_bytes))
121                _validate_connect_request(cr_tpdu)
122
123                cc_tpdu = common.CC(dst=cr_tpdu.src,
124                                    src=next(_next_srcs),
125                                    cls=0,
126                                    calling_tsel=cr_tpdu.calling_tsel,
127                                    called_tsel=cr_tpdu.called_tsel,
128                                    max_tpdu=_calculate_cc_max_tpdu(cr_tpdu),
129                                    pref_max_tpdu=None)
130                cc_tpdu_bytes = encoder.encode(cc_tpdu)
131                await tpkt_conn.send(cc_tpdu_bytes)
132
133                max_tpdu = _calculate_max_tpdu(cr_tpdu, cc_tpdu)
134                calling_tsel, called_tsel = _get_tsels(cr_tpdu, cc_tpdu)
135                conn = Connection(tpkt_conn, max_tpdu,
136                                  called_tsel, calling_tsel,
137                                  self._receive_queue_size,
138                                  self._send_queue_size)
139
140            except BaseException:
141                await aio.uncancellable(tpkt_conn.async_close())
142                raise
143
144            try:
145                await aio.call(self._connection_cb, conn)
146
147            except BaseException:
148                await aio.uncancellable(conn.async_close())
149                raise
150
151        except Exception as e:
152            self._log.error("error creating new incomming connection: %s",
153                            e, exc_info=e)

COTP listening server

For creation of new instance see listen coroutine.

async_group: hat.aio.group.Group
106    @property
107    def async_group(self) -> aio.Group:
108        """Async group"""
109        return self._srv.async_group

Async group

info: hat.drivers.tcp.ServerInfo
111    @property
112    def info(self) -> tcp.ServerInfo:
113        """Server info"""
114        return self._srv.info

Server info

class Connection(hat.aio.group.Resource):
156class Connection(aio.Resource):
157    """COTP connection
158
159    For creation of new instance see `connect` or `listen`.
160
161    """
162
163    def __init__(self,
164                 conn: tpkt.Connection,
165                 max_tpdu: int,
166                 local_tsel: int | None,
167                 remote_tsel: int | None,
168                 receive_queue_size: int,
169                 send_queue_size: int):
170        self._conn = conn
171        self._max_tpdu = max_tpdu
172        self._loop = asyncio.get_running_loop()
173        self._info = ConnectionInfo(local_tsel=local_tsel,
174                                    remote_tsel=remote_tsel,
175                                    **conn.info._asdict())
176        self._receive_queue = aio.Queue(receive_queue_size)
177        self._send_queue = aio.Queue(send_queue_size)
178        self._log = _create_connection_logger_adapter(False, self._info)
179        self._comm_log = _create_connection_logger_adapter(True, self._info)
180
181        self.async_group.spawn(self._receive_loop)
182        self.async_group.spawn(self._send_loop)
183
184        self._comm_log.debug('connection established')
185
186    @property
187    def async_group(self) -> aio.Group:
188        """Async group"""
189        return self._conn.async_group
190
191    @property
192    def info(self) -> ConnectionInfo:
193        """Connection info"""
194        return self._info
195
196    async def receive(self) -> util.Bytes:
197        """Receive data"""
198        try:
199            return await self._receive_queue.get()
200
201        except aio.QueueClosedError:
202            raise ConnectionError()
203
204    async def send(self, data: util.Bytes):
205        """Send data"""
206        try:
207            await self._send_queue.put((data, None))
208
209        except aio.QueueClosedError:
210            raise ConnectionError()
211
212    async def drain(self):
213        """Drain output buffer"""
214        try:
215            future = self._loop.create_future()
216            await self._send_queue.put((None, future))
217            await future
218
219        except aio.QueueClosedError:
220            raise ConnectionError()
221
222    async def _receive_loop(self):
223        try:
224            data_queue = collections.deque()
225            while True:
226                tpdu_bytes = await self._conn.receive()
227                tpdu = encoder.decode(memoryview(tpdu_bytes))
228
229                self._comm_log.debug('received %s', tpdu)
230
231                if isinstance(tpdu, (common.DR, common.ER)):
232                    self._log.info("received disconnect request / error")
233                    break
234
235                if not isinstance(tpdu, common.DT):
236                    continue
237
238                data_queue.append(tpdu.data)
239
240                if not tpdu.eot:
241                    continue
242
243                data = bytes(itertools.chain.from_iterable(data_queue))
244                data_queue.clear()
245
246                await self._receive_queue.put(data)
247
248        except ConnectionError:
249            pass
250
251        except Exception as e:
252            self._log.error("receive loop error: %s", e, exc_info=e)
253
254        finally:
255            self.close()
256            self._receive_queue.close()
257
258            self._comm_log.debug('connection closed')
259
260    async def _send_loop(self):
261        future = None
262        try:
263            while True:
264                data, future = await self._send_queue.get()
265
266                if data is None:
267                    await self._conn.drain()
268
269                else:
270                    data = memoryview(data)
271                    max_size = self._max_tpdu - 3
272
273                    while data:
274                        single_data, data = data[:max_size], data[max_size:]
275
276                        tpdu = common.DT(eot=not data, data=single_data)
277                        tpdu_bytes = encoder.encode(tpdu)
278
279                        self._comm_log.debug('sending %s', tpdu)
280
281                        await self._conn.send(tpdu_bytes)
282
283                if future and not future.done():
284                    future.set_result(None)
285
286        except ConnectionError:
287            pass
288
289        except Exception as e:
290            self._log.error("send loop error: %s", e, exc_info=e)
291
292        finally:
293            self.close()
294            self._send_queue.close()
295
296            while True:
297                if future and not future.done():
298                    future.set_result(None)
299                if self._send_queue.empty():
300                    break
301                _, future = self._send_queue.get_nowait()

COTP connection

For creation of new instance see connect or listen.

Connection( conn: hat.drivers.tpkt.Connection, max_tpdu: int, local_tsel: int | None, remote_tsel: int | None, receive_queue_size: int, send_queue_size: int)
163    def __init__(self,
164                 conn: tpkt.Connection,
165                 max_tpdu: int,
166                 local_tsel: int | None,
167                 remote_tsel: int | None,
168                 receive_queue_size: int,
169                 send_queue_size: int):
170        self._conn = conn
171        self._max_tpdu = max_tpdu
172        self._loop = asyncio.get_running_loop()
173        self._info = ConnectionInfo(local_tsel=local_tsel,
174                                    remote_tsel=remote_tsel,
175                                    **conn.info._asdict())
176        self._receive_queue = aio.Queue(receive_queue_size)
177        self._send_queue = aio.Queue(send_queue_size)
178        self._log = _create_connection_logger_adapter(False, self._info)
179        self._comm_log = _create_connection_logger_adapter(True, self._info)
180
181        self.async_group.spawn(self._receive_loop)
182        self.async_group.spawn(self._send_loop)
183
184        self._comm_log.debug('connection established')
async_group: hat.aio.group.Group
186    @property
187    def async_group(self) -> aio.Group:
188        """Async group"""
189        return self._conn.async_group

Async group

info: ConnectionInfo
191    @property
192    def info(self) -> ConnectionInfo:
193        """Connection info"""
194        return self._info

Connection info

async def receive(self) -> bytes | bytearray | memoryview:
196    async def receive(self) -> util.Bytes:
197        """Receive data"""
198        try:
199            return await self._receive_queue.get()
200
201        except aio.QueueClosedError:
202            raise ConnectionError()

Receive data

async def send(self, data: bytes | bytearray | memoryview):
204    async def send(self, data: util.Bytes):
205        """Send data"""
206        try:
207            await self._send_queue.put((data, None))
208
209        except aio.QueueClosedError:
210            raise ConnectionError()

Send data

async def drain(self):
212    async def drain(self):
213        """Drain output buffer"""
214        try:
215            future = self._loop.create_future()
216            await self._send_queue.put((None, future))
217            await future
218
219        except aio.QueueClosedError:
220            raise ConnectionError()

Drain output buffer