hat.drivers.cosp

Connection oriented session protocol

 1"""Connection oriented session protocol"""
 2
 3from hat.drivers.cosp.connection import (ConnectionInfo,
 4                                         ValidateCb,
 5                                         ConnectionCb,
 6                                         connect,
 7                                         listen,
 8                                         Server,
 9                                         Connection)
10
11
12__all__ = ['ConnectionInfo',
13           'ValidateCb',
14           'ConnectionCb',
15           'connect',
16           'listen',
17           'Server',
18           'Connection']
class ConnectionInfo(typing.NamedTuple):
28class ConnectionInfo(typing.NamedTuple):
29    name: str | None
30    local_addr: tcp.Address
31    local_tsel: int | None
32    local_ssel: int | None
33    remote_addr: tcp.Address
34    remote_tsel: int | None
35    remote_ssel: int | None

ConnectionInfo(name, local_addr, local_tsel, local_ssel, remote_addr, remote_tsel, remote_ssel)

ConnectionInfo( name: str | None, local_addr: hat.drivers.tcp.Address, local_tsel: int | None, local_ssel: int | None, remote_addr: hat.drivers.tcp.Address, remote_tsel: int | None, remote_ssel: int | None)

Create new instance of ConnectionInfo(name, local_addr, local_tsel, local_ssel, remote_addr, remote_tsel, remote_ssel)

name: str | None

Alias for field number 0

Alias for field number 1

local_tsel: int | None

Alias for field number 2

local_ssel: int | None

Alias for field number 3

Alias for field number 4

remote_tsel: int | None

Alias for field number 5

remote_ssel: int | None

Alias for field number 6

ValidateCb = typing.Callable[[bytes | bytearray | memoryview], bytes | bytearray | memoryview | None | collections.abc.Awaitable[bytes | bytearray | memoryview | None]]
ConnectionCb = typing.Callable[[ForwardRef('Connection')], None | collections.abc.Awaitable[None]]
async def connect( addr: hat.drivers.tcp.Address, user_data: bytes | bytearray | memoryview | None = None, *, local_ssel: int | None = None, remote_ssel: int | None = None, cosp_receive_queue_size: int = 1024, cosp_send_queue_size: int = 1024, **kwargs) -> Connection:
46async def connect(addr: tcp.Address,
47                  user_data: util.Bytes | None = None,
48                  *,
49                  local_ssel: int | None = None,
50                  remote_ssel: int | None = None,
51                  cosp_receive_queue_size: int = 1024,
52                  cosp_send_queue_size: int = 1024,
53                  **kwargs
54                  ) -> 'Connection':
55    """Connect to COSP server
56
57    Additional arguments are passed directly to `hat.drivers.cotp.connect`.
58
59    """
60    log = _create_connection_logger(kwargs.get('name'), None)
61    conn = await cotp.connect(addr, **kwargs)
62
63    try:
64        cn_spdu = common.Spdu(type=common.SpduType.CN,
65                              extended_spdus=False,
66                              version_number=_params_version,
67                              requirements=_params_requirements,
68                              calling_ssel=local_ssel,
69                              called_ssel=remote_ssel,
70                              user_data=user_data)
71        cn_spdu_bytes = encoder.encode(cn_spdu)
72        await conn.send(cn_spdu_bytes)
73
74        ac_spdu_bytes = await conn.receive()
75        ac_spdu = encoder.decode(memoryview(ac_spdu_bytes))
76        _validate_connect_response(cn_spdu, ac_spdu)
77
78        calling_ssel, called_ssel = _get_ssels(cn_spdu, ac_spdu)
79        return Connection(conn, cn_spdu, ac_spdu, calling_ssel, called_ssel,
80                          cosp_receive_queue_size, cosp_send_queue_size)
81
82    except BaseException:
83        await aio.uncancellable(_close_cotp(conn, _ab_spdu, log))
84        raise

Connect to COSP server

Additional arguments are passed directly to hat.drivers.cotp.connect.

async def listen( validate_cb: Callable[[bytes | bytearray | memoryview], bytes | bytearray | memoryview | None | Awaitable[bytes | bytearray | memoryview | None]], connection_cb: Callable[[Connection], None | Awaitable[None]], addr: hat.drivers.tcp.Address = Address(host='0.0.0.0', port=102), *, bind_connections: bool = False, cosp_receive_queue_size: int = 1024, cosp_send_queue_size: int = 1024, **kwargs) -> Server:
 87async def listen(validate_cb: ValidateCb,
 88                 connection_cb: ConnectionCb,
 89                 addr: tcp.Address = tcp.Address('0.0.0.0', 102),
 90                 *,
 91                 bind_connections: bool = False,
 92                 cosp_receive_queue_size: int = 1024,
 93                 cosp_send_queue_size: int = 1024,
 94                 **kwargs
 95                 ) -> 'Server':
 96    """Create COSP listening server
 97
 98    Additional arguments are passed directly to `hat.drivers.cotp.listen`.
 99
100    Args:
101        validate_cb: callback function or coroutine called on new
102            incomming connection request prior to creating new connection
103        connection_cb: new connection callback
104        addr: local listening address
105
106    """
107    server = Server()
108    server._validate_cb = validate_cb
109    server._connection_cb = connection_cb
110    server._bind_connections = bind_connections
111    server._receive_queue_size = cosp_receive_queue_size
112    server._send_queue_size = cosp_send_queue_size
113    server._log = _create_server_logger(kwargs.get('name'), None)
114
115    server._srv = await cotp.listen(server._on_connection, addr,
116                                    bind_connections=False,
117                                    **kwargs)
118
119    server._log = _create_server_logger(kwargs.get('name'), server._srv.info)
120
121    return server

Create COSP listening server

Additional arguments are passed directly to hat.drivers.cotp.listen.

Arguments:
  • validate_cb: callback function or coroutine called on new incomming connection request prior to creating new connection
  • connection_cb: new connection callback
  • addr: local listening address
class Server(hat.aio.group.Resource):
124class Server(aio.Resource):
125    """COSP listening server
126
127    For creating new server see `listen`.
128
129    """
130
131    @property
132    def async_group(self) -> aio.Group:
133        """Async group"""
134        return self._srv.async_group
135
136    @property
137    def info(self) -> tcp.ServerInfo:
138        """Server info"""
139        return self._srv.info
140
141    async def _on_connection(self, cotp_conn):
142        try:
143            try:
144                cn_spdu_bytes = await cotp_conn.receive()
145                cn_spdu = encoder.decode(memoryview(cn_spdu_bytes))
146                _validate_connect_request(cn_spdu)
147
148                res_user_data = await aio.call(self._validate_cb,
149                                               cn_spdu.user_data)
150
151                ac_spdu = common.Spdu(type=common.SpduType.AC,
152                                      extended_spdus=False,
153                                      version_number=_params_version,
154                                      requirements=_params_requirements,
155                                      calling_ssel=cn_spdu.calling_ssel,
156                                      called_ssel=cn_spdu.called_ssel,
157                                      user_data=res_user_data)
158                ac_spdu_bytes = encoder.encode(ac_spdu)
159                await cotp_conn.send(ac_spdu_bytes)
160
161                calling_ssel, called_ssel = _get_ssels(cn_spdu, ac_spdu)
162                conn = Connection(cotp_conn, cn_spdu, ac_spdu,
163                                  called_ssel, calling_ssel,
164                                  self._receive_queue_size,
165                                  self._send_queue_size)
166
167            except BaseException:
168                await aio.uncancellable(
169                    _close_cotp(cotp_conn, _ab_spdu, self._log, self._log))
170                raise
171
172            try:
173                await aio.call(self._connection_cb, conn)
174
175            except BaseException:
176                await aio.uncancellable(conn.async_close())
177                raise
178
179        except Exception as e:
180            self._log.error("error creating new incomming connection: %s",
181                            e, exc_info=e)
182            return
183
184        if not self._bind_connections:
185            return
186
187        try:
188            await conn.wait_closed()
189
190        except BaseException:
191            await aio.uncancellable(conn.async_close())
192            raise

COSP listening server

For creating new server see listen.

async_group: hat.aio.group.Group
131    @property
132    def async_group(self) -> aio.Group:
133        """Async group"""
134        return self._srv.async_group

Async group

info: hat.drivers.tcp.ServerInfo
136    @property
137    def info(self) -> tcp.ServerInfo:
138        """Server info"""
139        return self._srv.info

Server info

class Connection(hat.aio.group.Resource):
195class Connection(aio.Resource):
196    """COSP connection
197
198    For creating new connection see `connect` or `listen`.
199
200    """
201
202    def __init__(self,
203                 conn: cotp.Connection,
204                 cn_spdu: common.Spdu,
205                 ac_spdu: common.Spdu,
206                 local_ssel: int | None,
207                 remote_ssel: int | None,
208                 receive_queue_size: int,
209                 send_queue_size: int):
210        self._conn = conn
211        self._conn_req_user_data = cn_spdu.user_data
212        self._conn_res_user_data = ac_spdu.user_data
213        self._loop = asyncio.get_running_loop()
214        self._info = ConnectionInfo(local_ssel=local_ssel,
215                                    remote_ssel=remote_ssel,
216                                    **conn.info._asdict())
217        self._close_spdu = None
218        self._receive_queue = aio.Queue(receive_queue_size)
219        self._send_queue = aio.Queue(send_queue_size)
220        self._async_group = aio.Group()
221        self._log = _create_connection_logger(self._info.name, self._info)
222
223        self.async_group.spawn(aio.call_on_cancel, self._on_close)
224        self.async_group.spawn(self._receive_loop)
225        self.async_group.spawn(self._send_loop)
226        self.async_group.spawn(aio.call_on_done, conn.wait_closing(),
227                               self.close)
228
229    @property
230    def async_group(self) -> aio.Group:
231        """Async group"""
232        return self._async_group
233
234    @property
235    def info(self) -> ConnectionInfo:
236        """Connection info"""
237        return self._info
238
239    @property
240    def conn_req_user_data(self) -> util.Bytes:
241        """Connect request's user data"""
242        return self._conn_req_user_data
243
244    @property
245    def conn_res_user_data(self) -> util.Bytes:
246        """Connect response's user data"""
247        return self._conn_res_user_data
248
249    def close(self, user_data: util.Bytes | None = None):
250        """Close connection"""
251        self._close(common.Spdu(common.SpduType.FN,
252                                transport_disconnect=True,
253                                user_data=user_data))
254
255    async def async_close(self, user_data: util.Bytes | None = None):
256        """Async close"""
257        self.close(user_data)
258        await self.wait_closed()
259
260    async def receive(self) -> util.Bytes:
261        """Receive data"""
262        try:
263            return await self._receive_queue.get()
264
265        except aio.QueueClosedError:
266            raise ConnectionError()
267
268    async def send(self, data: util.Bytes):
269        """Send data"""
270        try:
271            await self._send_queue.put((data, None))
272
273        except aio.QueueClosedError:
274            raise ConnectionError()
275
276    async def drain(self):
277        """Drain output buffer"""
278        try:
279            future = self._loop.create_future()
280            await self._send_queue.put((None, future))
281            await future
282
283        except aio.QueueClosedError:
284            raise ConnectionError()
285
286    async def _on_close(self):
287        await _close_cotp(self._conn, self._close_spdu, self._log)
288
289    def _close(self, spdu):
290        if not self.is_open:
291            return
292
293        self._close_spdu = spdu
294        self._async_group.close()
295
296    async def _receive_loop(self):
297        try:
298            data = bytearray()
299            while True:
300                spdu_bytes = await self._conn.receive()
301                spdu = encoder.decode(memoryview(spdu_bytes))
302
303                if spdu.type == common.SpduType.DT:
304                    data.extend(spdu.data)
305
306                    if spdu.end is None or spdu.end:
307                        await self._receive_queue.put(data)
308                        data = bytearray()
309
310                elif spdu.type == common.SpduType.FN:
311                    self._close(_dn_spdu)
312                    break
313
314                elif spdu.type == common.SpduType.AB:
315                    self._close(None)
316                    break
317
318                else:
319                    self._close(_ab_spdu)
320                    break
321
322        except ConnectionError:
323            pass
324
325        except Exception as e:
326            self._log.error("receive loop error: %s", e, exc_info=e)
327
328        finally:
329            self.close()
330            self._receive_queue.close()
331
332    async def _send_loop(self):
333        future = None
334        try:
335            while True:
336                data, future = await self._send_queue.get()
337
338                if data is None:
339                    await self._conn.drain()
340
341                else:
342                    spdu = common.Spdu(type=common.SpduType.DT,
343                                       data=data)
344                    spdu_bytes = encoder.encode(spdu)
345
346                    msg = bytes(itertools.chain(common.give_tokens_spdu_bytes,
347                                                spdu_bytes))
348
349                    await self._conn.send(msg)
350
351                if future and not future.done():
352                    future.set_result(None)
353
354        except ConnectionError:
355            pass
356
357        except Exception as e:
358            self._log.error("send loop error: %s", e, exc_info=e)
359
360        finally:
361            self.close()
362            self._send_queue.close()
363
364            while True:
365                if future and not future.done():
366                    future.set_result(None)
367                if self._send_queue.empty():
368                    break
369                _, future = self._send_queue.get_nowait()

COSP connection

For creating new connection see connect or listen.

Connection( conn: Connection, cn_spdu: hat.drivers.cosp.common.Spdu, ac_spdu: hat.drivers.cosp.common.Spdu, local_ssel: int | None, remote_ssel: int | None, receive_queue_size: int, send_queue_size: int)
202    def __init__(self,
203                 conn: cotp.Connection,
204                 cn_spdu: common.Spdu,
205                 ac_spdu: common.Spdu,
206                 local_ssel: int | None,
207                 remote_ssel: int | None,
208                 receive_queue_size: int,
209                 send_queue_size: int):
210        self._conn = conn
211        self._conn_req_user_data = cn_spdu.user_data
212        self._conn_res_user_data = ac_spdu.user_data
213        self._loop = asyncio.get_running_loop()
214        self._info = ConnectionInfo(local_ssel=local_ssel,
215                                    remote_ssel=remote_ssel,
216                                    **conn.info._asdict())
217        self._close_spdu = None
218        self._receive_queue = aio.Queue(receive_queue_size)
219        self._send_queue = aio.Queue(send_queue_size)
220        self._async_group = aio.Group()
221        self._log = _create_connection_logger(self._info.name, self._info)
222
223        self.async_group.spawn(aio.call_on_cancel, self._on_close)
224        self.async_group.spawn(self._receive_loop)
225        self.async_group.spawn(self._send_loop)
226        self.async_group.spawn(aio.call_on_done, conn.wait_closing(),
227                               self.close)
async_group: hat.aio.group.Group
229    @property
230    def async_group(self) -> aio.Group:
231        """Async group"""
232        return self._async_group

Async group

info: ConnectionInfo
234    @property
235    def info(self) -> ConnectionInfo:
236        """Connection info"""
237        return self._info

Connection info

conn_req_user_data: bytes | bytearray | memoryview
239    @property
240    def conn_req_user_data(self) -> util.Bytes:
241        """Connect request's user data"""
242        return self._conn_req_user_data

Connect request's user data

conn_res_user_data: bytes | bytearray | memoryview
244    @property
245    def conn_res_user_data(self) -> util.Bytes:
246        """Connect response's user data"""
247        return self._conn_res_user_data

Connect response's user data

def close(self, user_data: bytes | bytearray | memoryview | None = None):
249    def close(self, user_data: util.Bytes | None = None):
250        """Close connection"""
251        self._close(common.Spdu(common.SpduType.FN,
252                                transport_disconnect=True,
253                                user_data=user_data))

Close connection

async def async_close(self, user_data: bytes | bytearray | memoryview | None = None):
255    async def async_close(self, user_data: util.Bytes | None = None):
256        """Async close"""
257        self.close(user_data)
258        await self.wait_closed()

Async close

async def receive(self) -> bytes | bytearray | memoryview:
260    async def receive(self) -> util.Bytes:
261        """Receive data"""
262        try:
263            return await self._receive_queue.get()
264
265        except aio.QueueClosedError:
266            raise ConnectionError()

Receive data

async def send(self, data: bytes | bytearray | memoryview):
268    async def send(self, data: util.Bytes):
269        """Send data"""
270        try:
271            await self._send_queue.put((data, None))
272
273        except aio.QueueClosedError:
274            raise ConnectionError()

Send data

async def drain(self):
276    async def drain(self):
277        """Drain output buffer"""
278        try:
279            future = self._loop.create_future()
280            await self._send_queue.put((None, future))
281            await future
282
283        except aio.QueueClosedError:
284            raise ConnectionError()

Drain output buffer