hat.drivers.cosp

Connection oriented session protocol

 1"""Connection oriented session protocol"""
 2
 3from hat.drivers.cosp.connection import (ConnectionInfo,
 4                                         ValidateCb,
 5                                         ConnectionCb,
 6                                         connect,
 7                                         listen,
 8                                         Server,
 9                                         Connection)
10
11
12__all__ = ['ConnectionInfo',
13           'ValidateCb',
14           'ConnectionCb',
15           'connect',
16           'listen',
17           'Server',
18           'Connection']
class ConnectionInfo(typing.NamedTuple):
28class ConnectionInfo(typing.NamedTuple):
29    name: str | None
30    local_addr: tcp.Address
31    local_tsel: int | None
32    local_ssel: int | None
33    remote_addr: tcp.Address
34    remote_tsel: int | None
35    remote_ssel: int | None

ConnectionInfo(name, local_addr, local_tsel, local_ssel, remote_addr, remote_tsel, remote_ssel)

ConnectionInfo( name: str | None, local_addr: hat.drivers.tcp.Address, local_tsel: int | None, local_ssel: int | None, remote_addr: hat.drivers.tcp.Address, remote_tsel: int | None, remote_ssel: int | None)

Create new instance of ConnectionInfo(name, local_addr, local_tsel, local_ssel, remote_addr, remote_tsel, remote_ssel)

name: str | None

Alias for field number 0

Alias for field number 1

local_tsel: int | None

Alias for field number 2

local_ssel: int | None

Alias for field number 3

Alias for field number 4

remote_tsel: int | None

Alias for field number 5

remote_ssel: int | None

Alias for field number 6

ValidateCb = typing.Callable[[bytes | bytearray | memoryview], bytes | bytearray | memoryview | None | collections.abc.Awaitable[bytes | bytearray | memoryview | None]]
ConnectionCb = typing.Callable[[ForwardRef('Connection')], None | collections.abc.Awaitable[None]]
async def connect( addr: hat.drivers.tcp.Address, user_data: bytes | bytearray | memoryview | None = None, *, local_ssel: int | None = None, remote_ssel: int | None = None, cosp_receive_queue_size: int = 1024, cosp_send_queue_size: int = 1024, **kwargs) -> Connection:
46async def connect(addr: tcp.Address,
47                  user_data: util.Bytes | None = None,
48                  *,
49                  local_ssel: int | None = None,
50                  remote_ssel: int | None = None,
51                  cosp_receive_queue_size: int = 1024,
52                  cosp_send_queue_size: int = 1024,
53                  **kwargs
54                  ) -> 'Connection':
55    """Connect to COSP server
56
57    Additional arguments are passed directly to `hat.drivers.cotp.connect`.
58
59    """
60    conn = await cotp.connect(addr, **kwargs)
61
62    try:
63        cn_spdu = common.Spdu(type=common.SpduType.CN,
64                              extended_spdus=False,
65                              version_number=_params_version,
66                              requirements=_params_requirements,
67                              calling_ssel=local_ssel,
68                              called_ssel=remote_ssel,
69                              user_data=user_data)
70        cn_spdu_bytes = encoder.encode(cn_spdu)
71        await conn.send(cn_spdu_bytes)
72
73        ac_spdu_bytes = await conn.receive()
74        ac_spdu = encoder.decode(memoryview(ac_spdu_bytes))
75        _validate_connect_response(cn_spdu, ac_spdu)
76
77        calling_ssel, called_ssel = _get_ssels(cn_spdu, ac_spdu)
78        return Connection(conn, cn_spdu, ac_spdu, calling_ssel, called_ssel,
79                          cosp_receive_queue_size, cosp_send_queue_size)
80
81    except BaseException:
82        await aio.uncancellable(_close_cotp(conn, _ab_spdu, mlog, mlog))
83        raise

Connect to COSP server

Additional arguments are passed directly to hat.drivers.cotp.connect.

async def listen( validate_cb: Callable[[bytes | bytearray | memoryview], bytes | bytearray | memoryview | None | Awaitable[bytes | bytearray | memoryview | None]], connection_cb: Callable[[Connection], None | Awaitable[None]], addr: hat.drivers.tcp.Address = Address(host='0.0.0.0', port=102), *, bind_connections: bool = False, cosp_receive_queue_size: int = 1024, cosp_send_queue_size: int = 1024, **kwargs) -> Server:
 86async def listen(validate_cb: ValidateCb,
 87                 connection_cb: ConnectionCb,
 88                 addr: tcp.Address = tcp.Address('0.0.0.0', 102),
 89                 *,
 90                 bind_connections: bool = False,
 91                 cosp_receive_queue_size: int = 1024,
 92                 cosp_send_queue_size: int = 1024,
 93                 **kwargs
 94                 ) -> 'Server':
 95    """Create COSP listening server
 96
 97    Additional arguments are passed directly to `hat.drivers.cotp.listen`.
 98
 99    Args:
100        validate_cb: callback function or coroutine called on new
101            incomming connection request prior to creating new connection
102        connection_cb: new connection callback
103        addr: local listening address
104
105    """
106    server = Server()
107    server._validate_cb = validate_cb
108    server._connection_cb = connection_cb
109    server._bind_connections = bind_connections
110    server._receive_queue_size = cosp_receive_queue_size
111    server._send_queue_size = cosp_send_queue_size
112    server._log = mlog
113
114    server._srv = await cotp.listen(server._on_connection, addr,
115                                    bind_connections=False,
116                                    **kwargs)
117
118    server._log = _create_server_logger_adapter(server._srv.info)
119
120    return server

Create COSP listening server

Additional arguments are passed directly to hat.drivers.cotp.listen.

Arguments:
  • validate_cb: callback function or coroutine called on new incomming connection request prior to creating new connection
  • connection_cb: new connection callback
  • addr: local listening address
class Server(hat.aio.group.Resource):
123class Server(aio.Resource):
124    """COSP listening server
125
126    For creating new server see `listen`.
127
128    """
129
130    @property
131    def async_group(self) -> aio.Group:
132        """Async group"""
133        return self._srv.async_group
134
135    @property
136    def info(self) -> tcp.ServerInfo:
137        """Server info"""
138        return self._srv.info
139
140    async def _on_connection(self, cotp_conn):
141        try:
142            try:
143                cn_spdu_bytes = await cotp_conn.receive()
144                cn_spdu = encoder.decode(memoryview(cn_spdu_bytes))
145                _validate_connect_request(cn_spdu)
146
147                res_user_data = await aio.call(self._validate_cb,
148                                               cn_spdu.user_data)
149
150                ac_spdu = common.Spdu(type=common.SpduType.AC,
151                                      extended_spdus=False,
152                                      version_number=_params_version,
153                                      requirements=_params_requirements,
154                                      calling_ssel=cn_spdu.calling_ssel,
155                                      called_ssel=cn_spdu.called_ssel,
156                                      user_data=res_user_data)
157                ac_spdu_bytes = encoder.encode(ac_spdu)
158                await cotp_conn.send(ac_spdu_bytes)
159
160                calling_ssel, called_ssel = _get_ssels(cn_spdu, ac_spdu)
161                conn = Connection(cotp_conn, cn_spdu, ac_spdu,
162                                  called_ssel, calling_ssel,
163                                  self._receive_queue_size,
164                                  self._send_queue_size)
165
166            except BaseException:
167                await aio.uncancellable(
168                    _close_cotp(cotp_conn, _ab_spdu, self._log, self._log))
169                raise
170
171            try:
172                await aio.call(self._connection_cb, conn)
173
174            except BaseException:
175                await aio.uncancellable(conn.async_close())
176                raise
177
178        except Exception as e:
179            self._log.error("error creating new incomming connection: %s",
180                            e, exc_info=e)
181            return
182
183        if not self._bind_connections:
184            return
185
186        try:
187            await conn.wait_closed()
188
189        except BaseException:
190            await aio.uncancellable(conn.async_close())
191            raise

COSP listening server

For creating new server see listen.

async_group: hat.aio.group.Group
130    @property
131    def async_group(self) -> aio.Group:
132        """Async group"""
133        return self._srv.async_group

Async group

info: hat.drivers.tcp.ServerInfo
135    @property
136    def info(self) -> tcp.ServerInfo:
137        """Server info"""
138        return self._srv.info

Server info

class Connection(hat.aio.group.Resource):
194class Connection(aio.Resource):
195    """COSP connection
196
197    For creating new connection see `connect` or `listen`.
198
199    """
200
201    def __init__(self,
202                 conn: cotp.Connection,
203                 cn_spdu: common.Spdu,
204                 ac_spdu: common.Spdu,
205                 local_ssel: int | None,
206                 remote_ssel: int | None,
207                 receive_queue_size: int,
208                 send_queue_size: int):
209        self._conn = conn
210        self._conn_req_user_data = cn_spdu.user_data
211        self._conn_res_user_data = ac_spdu.user_data
212        self._loop = asyncio.get_running_loop()
213        self._info = ConnectionInfo(local_ssel=local_ssel,
214                                    remote_ssel=remote_ssel,
215                                    **conn.info._asdict())
216        self._close_spdu = None
217        self._receive_queue = aio.Queue(receive_queue_size)
218        self._send_queue = aio.Queue(send_queue_size)
219        self._async_group = aio.Group()
220        self._log = _create_connection_logger_adapter(False, self._info)
221        self._comm_log = _create_connection_logger_adapter(True, self._info)
222
223        self.async_group.spawn(aio.call_on_cancel, self._on_close)
224        self.async_group.spawn(self._receive_loop)
225        self.async_group.spawn(self._send_loop)
226        self.async_group.spawn(aio.call_on_done, conn.wait_closing(),
227                               self.close)
228
229        self._comm_log.debug('connection established')
230
231    @property
232    def async_group(self) -> aio.Group:
233        """Async group"""
234        return self._async_group
235
236    @property
237    def info(self) -> ConnectionInfo:
238        """Connection info"""
239        return self._info
240
241    @property
242    def conn_req_user_data(self) -> util.Bytes:
243        """Connect request's user data"""
244        return self._conn_req_user_data
245
246    @property
247    def conn_res_user_data(self) -> util.Bytes:
248        """Connect response's user data"""
249        return self._conn_res_user_data
250
251    def close(self, user_data: util.Bytes | None = None):
252        """Close connection"""
253        self._close(common.Spdu(common.SpduType.FN,
254                                transport_disconnect=True,
255                                user_data=user_data))
256
257    async def async_close(self, user_data: util.Bytes | None = None):
258        """Async close"""
259        self.close(user_data)
260        await self.wait_closed()
261
262    async def receive(self) -> util.Bytes:
263        """Receive data"""
264        try:
265            return await self._receive_queue.get()
266
267        except aio.QueueClosedError:
268            raise ConnectionError()
269
270    async def send(self, data: util.Bytes):
271        """Send data"""
272        try:
273            await self._send_queue.put((data, None))
274
275        except aio.QueueClosedError:
276            raise ConnectionError()
277
278    async def drain(self):
279        """Drain output buffer"""
280        try:
281            future = self._loop.create_future()
282            await self._send_queue.put((None, future))
283            await future
284
285        except aio.QueueClosedError:
286            raise ConnectionError()
287
288    async def _on_close(self):
289        await _close_cotp(self._conn, self._close_spdu, self._log,
290                          self._comm_log)
291
292        self._comm_log.debug('connection closed')
293
294    def _close(self, spdu):
295        if not self.is_open:
296            return
297
298        self._close_spdu = spdu
299        self._async_group.close()
300
301    async def _receive_loop(self):
302        try:
303            data = bytearray()
304            while True:
305                spdu_bytes = await self._conn.receive()
306                spdu = encoder.decode(memoryview(spdu_bytes))
307
308                self._comm_log.debug('received %s', spdu)
309
310                if spdu.type == common.SpduType.DT:
311                    data.extend(spdu.data)
312
313                    if spdu.end is None or spdu.end:
314                        await self._receive_queue.put(data)
315                        data = bytearray()
316
317                elif spdu.type == common.SpduType.FN:
318                    self._close(_dn_spdu)
319                    break
320
321                elif spdu.type == common.SpduType.AB:
322                    self._close(None)
323                    break
324
325                else:
326                    self._close(_ab_spdu)
327                    break
328
329        except ConnectionError:
330            pass
331
332        except Exception as e:
333            self._log.error("receive loop error: %s", e, exc_info=e)
334
335        finally:
336            self.close()
337            self._receive_queue.close()
338
339    async def _send_loop(self):
340        future = None
341        try:
342            while True:
343                data, future = await self._send_queue.get()
344
345                if data is None:
346                    await self._conn.drain()
347
348                else:
349                    spdu = common.Spdu(type=common.SpduType.DT,
350                                       data=data)
351                    spdu_bytes = encoder.encode(spdu)
352
353                    msg = bytes(itertools.chain(common.give_tokens_spdu_bytes,
354                                                spdu_bytes))
355
356                    self._comm_log.debug('sending %s', spdu)
357
358                    await self._conn.send(msg)
359
360                if future and not future.done():
361                    future.set_result(None)
362
363        except ConnectionError:
364            pass
365
366        except Exception as e:
367            self._log.error("send loop error: %s", e, exc_info=e)
368
369        finally:
370            self.close()
371            self._send_queue.close()
372
373            while True:
374                if future and not future.done():
375                    future.set_result(None)
376                if self._send_queue.empty():
377                    break
378                _, future = self._send_queue.get_nowait()

COSP connection

For creating new connection see connect or listen.

Connection( conn: Connection, cn_spdu: hat.drivers.cosp.common.Spdu, ac_spdu: hat.drivers.cosp.common.Spdu, local_ssel: int | None, remote_ssel: int | None, receive_queue_size: int, send_queue_size: int)
201    def __init__(self,
202                 conn: cotp.Connection,
203                 cn_spdu: common.Spdu,
204                 ac_spdu: common.Spdu,
205                 local_ssel: int | None,
206                 remote_ssel: int | None,
207                 receive_queue_size: int,
208                 send_queue_size: int):
209        self._conn = conn
210        self._conn_req_user_data = cn_spdu.user_data
211        self._conn_res_user_data = ac_spdu.user_data
212        self._loop = asyncio.get_running_loop()
213        self._info = ConnectionInfo(local_ssel=local_ssel,
214                                    remote_ssel=remote_ssel,
215                                    **conn.info._asdict())
216        self._close_spdu = None
217        self._receive_queue = aio.Queue(receive_queue_size)
218        self._send_queue = aio.Queue(send_queue_size)
219        self._async_group = aio.Group()
220        self._log = _create_connection_logger_adapter(False, self._info)
221        self._comm_log = _create_connection_logger_adapter(True, self._info)
222
223        self.async_group.spawn(aio.call_on_cancel, self._on_close)
224        self.async_group.spawn(self._receive_loop)
225        self.async_group.spawn(self._send_loop)
226        self.async_group.spawn(aio.call_on_done, conn.wait_closing(),
227                               self.close)
228
229        self._comm_log.debug('connection established')
async_group: hat.aio.group.Group
231    @property
232    def async_group(self) -> aio.Group:
233        """Async group"""
234        return self._async_group

Async group

info: ConnectionInfo
236    @property
237    def info(self) -> ConnectionInfo:
238        """Connection info"""
239        return self._info

Connection info

conn_req_user_data: bytes | bytearray | memoryview
241    @property
242    def conn_req_user_data(self) -> util.Bytes:
243        """Connect request's user data"""
244        return self._conn_req_user_data

Connect request's user data

conn_res_user_data: bytes | bytearray | memoryview
246    @property
247    def conn_res_user_data(self) -> util.Bytes:
248        """Connect response's user data"""
249        return self._conn_res_user_data

Connect response's user data

def close(self, user_data: bytes | bytearray | memoryview | None = None):
251    def close(self, user_data: util.Bytes | None = None):
252        """Close connection"""
253        self._close(common.Spdu(common.SpduType.FN,
254                                transport_disconnect=True,
255                                user_data=user_data))

Close connection

async def async_close(self, user_data: bytes | bytearray | memoryview | None = None):
257    async def async_close(self, user_data: util.Bytes | None = None):
258        """Async close"""
259        self.close(user_data)
260        await self.wait_closed()

Async close

async def receive(self) -> bytes | bytearray | memoryview:
262    async def receive(self) -> util.Bytes:
263        """Receive data"""
264        try:
265            return await self._receive_queue.get()
266
267        except aio.QueueClosedError:
268            raise ConnectionError()

Receive data

async def send(self, data: bytes | bytearray | memoryview):
270    async def send(self, data: util.Bytes):
271        """Send data"""
272        try:
273            await self._send_queue.put((data, None))
274
275        except aio.QueueClosedError:
276            raise ConnectionError()

Send data

async def drain(self):
278    async def drain(self):
279        """Drain output buffer"""
280        try:
281            future = self._loop.create_future()
282            await self._send_queue.put((None, future))
283            await future
284
285        except aio.QueueClosedError:
286            raise ConnectionError()

Drain output buffer