hat.drivers.cosp

Connection oriented session protocol

 1"""Connection oriented session protocol"""
 2
 3from hat.drivers.cosp.connection import (ConnectionInfo,
 4                                         ValidateCb,
 5                                         ConnectionCb,
 6                                         connect,
 7                                         listen,
 8                                         Server,
 9                                         Connection)
10
11
12__all__ = ['ConnectionInfo',
13           'ValidateCb',
14           'ConnectionCb',
15           'connect',
16           'listen',
17           'Server',
18           'Connection']
class ConnectionInfo(typing.NamedTuple):
28class ConnectionInfo(typing.NamedTuple):
29    local_addr: tcp.Address
30    local_tsel: int | None
31    local_ssel: int | None
32    remote_addr: tcp.Address
33    remote_tsel: int | None
34    remote_ssel: int | None

ConnectionInfo(local_addr, local_tsel, local_ssel, remote_addr, remote_tsel, remote_ssel)

ConnectionInfo( local_addr: hat.drivers.tcp.Address, local_tsel: int | None, local_ssel: int | None, remote_addr: hat.drivers.tcp.Address, remote_tsel: int | None, remote_ssel: int | None)

Create new instance of ConnectionInfo(local_addr, local_tsel, local_ssel, remote_addr, remote_tsel, remote_ssel)

Alias for field number 0

local_tsel: int | None

Alias for field number 1

local_ssel: int | None

Alias for field number 2

Alias for field number 3

remote_tsel: int | None

Alias for field number 4

remote_ssel: int | None

Alias for field number 5

ValidateCb = typing.Callable[[bytes | bytearray | memoryview], typing.Union[bytes, bytearray, memoryview, NoneType, typing.Awaitable[bytes | bytearray | memoryview | None]]]
ConnectionCb = typing.Callable[[ForwardRef('Connection')], typing.Optional[typing.Awaitable[NoneType]]]
async def connect( addr: hat.drivers.tcp.Address, user_data: bytes | bytearray | memoryview | None = None, *, local_ssel: int | None = None, remote_ssel: int | None = None, cosp_receive_queue_size: int = 1024, cosp_send_queue_size: int = 1024, **kwargs) -> Connection:
45async def connect(addr: tcp.Address,
46                  user_data: util.Bytes | None = None,
47                  *,
48                  local_ssel: int | None = None,
49                  remote_ssel: int | None = None,
50                  cosp_receive_queue_size: int = 1024,
51                  cosp_send_queue_size: int = 1024,
52                  **kwargs
53                  ) -> 'Connection':
54    """Connect to COSP server
55
56    Additional arguments are passed directly to `hat.drivers.cotp.connect`.
57
58    """
59    conn = await cotp.connect(addr, **kwargs)
60
61    try:
62        cn_spdu = common.Spdu(type=common.SpduType.CN,
63                              extended_spdus=False,
64                              version_number=_params_version,
65                              requirements=_params_requirements,
66                              calling_ssel=local_ssel,
67                              called_ssel=remote_ssel,
68                              user_data=user_data)
69        cn_spdu_bytes = encoder.encode(cn_spdu)
70        await conn.send(cn_spdu_bytes)
71
72        ac_spdu_bytes = await conn.receive()
73        ac_spdu = encoder.decode(memoryview(ac_spdu_bytes))
74        _validate_connect_response(cn_spdu, ac_spdu)
75
76        calling_ssel, called_ssel = _get_ssels(cn_spdu, ac_spdu)
77        return Connection(conn, cn_spdu, ac_spdu, calling_ssel, called_ssel,
78                          cosp_receive_queue_size, cosp_send_queue_size)
79
80    except BaseException:
81        await aio.uncancellable(_close_cotp(conn, _ab_spdu))
82        raise

Connect to COSP server

Additional arguments are passed directly to hat.drivers.cotp.connect.

async def listen( validate_cb: Callable[[bytes | bytearray | memoryview], Union[bytes, bytearray, memoryview, NoneType, Awaitable[bytes | bytearray | memoryview | None]]], connection_cb: Callable[[hat.drivers.acse.Connection], Optional[Awaitable[NoneType]]], addr: hat.drivers.tcp.Address = Address(host='0.0.0.0', port=102), *, bind_connections: bool = False, cosp_receive_queue_size: int = 1024, cosp_send_queue_size: int = 1024, **kwargs) -> Server:
 85async def listen(validate_cb: ValidateCb,
 86                 connection_cb: ConnectionCb,
 87                 addr: tcp.Address = tcp.Address('0.0.0.0', 102),
 88                 *,
 89                 bind_connections: bool = False,
 90                 cosp_receive_queue_size: int = 1024,
 91                 cosp_send_queue_size: int = 1024,
 92                 **kwargs
 93                 ) -> 'Server':
 94    """Create COSP listening server
 95
 96    Additional arguments are passed directly to `hat.drivers.cotp.listen`.
 97
 98    Args:
 99        validate_cb: callback function or coroutine called on new
100            incomming connection request prior to creating new connection
101        connection_cb: new connection callback
102        addr: local listening address
103
104    """
105    server = Server()
106    server._validate_cb = validate_cb
107    server._connection_cb = connection_cb
108    server._bind_connections = bind_connections
109    server._receive_queue_size = cosp_receive_queue_size
110    server._send_queue_size = cosp_send_queue_size
111
112    server._srv = await cotp.listen(server._on_connection, addr,
113                                    bind_connections=False,
114                                    **kwargs)
115
116    return server

Create COSP listening server

Additional arguments are passed directly to hat.drivers.cotp.listen.

Arguments:
  • validate_cb: callback function or coroutine called on new incomming connection request prior to creating new connection
  • connection_cb: new connection callback
  • addr: local listening address
class Server(hat.aio.group.Resource):
119class Server(aio.Resource):
120    """COSP listening server
121
122    For creating new server see `listen`.
123
124    """
125
126    @property
127    def async_group(self) -> aio.Group:
128        """Async group"""
129        return self._srv.async_group
130
131    @property
132    def addresses(self) -> list[tcp.Address]:
133        """Listening addresses"""
134        return self._srv.addresses
135
136    async def _on_connection(self, cotp_conn):
137        try:
138            try:
139                cn_spdu_bytes = await cotp_conn.receive()
140                cn_spdu = encoder.decode(memoryview(cn_spdu_bytes))
141                _validate_connect_request(cn_spdu)
142
143                res_user_data = await aio.call(self._validate_cb,
144                                               cn_spdu.user_data)
145
146                ac_spdu = common.Spdu(type=common.SpduType.AC,
147                                      extended_spdus=False,
148                                      version_number=_params_version,
149                                      requirements=_params_requirements,
150                                      calling_ssel=cn_spdu.calling_ssel,
151                                      called_ssel=cn_spdu.called_ssel,
152                                      user_data=res_user_data)
153                ac_spdu_bytes = encoder.encode(ac_spdu)
154                await cotp_conn.send(ac_spdu_bytes)
155
156                calling_ssel, called_ssel = _get_ssels(cn_spdu, ac_spdu)
157                conn = Connection(cotp_conn, cn_spdu, ac_spdu,
158                                  called_ssel, calling_ssel,
159                                  self._receive_queue_size,
160                                  self._send_queue_size)
161
162            except BaseException:
163                await aio.uncancellable(_close_cotp(cotp_conn, _ab_spdu))
164                raise
165
166            try:
167                await aio.call(self._connection_cb, conn)
168
169            except BaseException:
170                await aio.uncancellable(conn.async_close())
171                raise
172
173        except Exception as e:
174            mlog.error("error creating new incomming connection: %s", e,
175                       exc_info=e)
176            return
177
178        if not self._bind_connections:
179            return
180
181        try:
182            await conn.wait_closed()
183
184        except BaseException:
185            await aio.uncancellable(conn.async_close())
186            raise

COSP listening server

For creating new server see listen.

async_group: hat.aio.group.Group
126    @property
127    def async_group(self) -> aio.Group:
128        """Async group"""
129        return self._srv.async_group

Async group

addresses: list[hat.drivers.tcp.Address]
131    @property
132    def addresses(self) -> list[tcp.Address]:
133        """Listening addresses"""
134        return self._srv.addresses

Listening addresses

class Connection(hat.aio.group.Resource):
189class Connection(aio.Resource):
190    """COSP connection
191
192    For creating new connection see `connect` or `listen`.
193
194    """
195
196    def __init__(self,
197                 conn: cotp.Connection,
198                 cn_spdu: common.Spdu,
199                 ac_spdu: common.Spdu,
200                 local_ssel: int | None,
201                 remote_ssel: int | None,
202                 receive_queue_size: int,
203                 send_queue_size: int):
204        self._conn = conn
205        self._conn_req_user_data = cn_spdu.user_data
206        self._conn_res_user_data = ac_spdu.user_data
207        self._loop = asyncio.get_running_loop()
208        self._info = ConnectionInfo(local_ssel=local_ssel,
209                                    remote_ssel=remote_ssel,
210                                    **conn.info._asdict())
211        self._close_spdu = None
212        self._receive_queue = aio.Queue(receive_queue_size)
213        self._send_queue = aio.Queue(send_queue_size)
214        self._async_group = aio.Group()
215
216        self.async_group.spawn(aio.call_on_cancel, self._on_close)
217        self.async_group.spawn(self._receive_loop)
218        self.async_group.spawn(self._send_loop)
219        self.async_group.spawn(aio.call_on_done, conn.wait_closing(),
220                               self.close)
221
222    @property
223    def async_group(self) -> aio.Group:
224        """Async group"""
225        return self._async_group
226
227    @property
228    def info(self) -> ConnectionInfo:
229        """Connection info"""
230        return self._info
231
232    @property
233    def conn_req_user_data(self) -> util.Bytes:
234        """Connect request's user data"""
235        return self._conn_req_user_data
236
237    @property
238    def conn_res_user_data(self) -> util.Bytes:
239        """Connect response's user data"""
240        return self._conn_res_user_data
241
242    def close(self, user_data: util.Bytes | None = None):
243        """Close connection"""
244        self._close(common.Spdu(common.SpduType.FN,
245                                transport_disconnect=True,
246                                user_data=user_data))
247
248    async def async_close(self, user_data: util.Bytes | None = None):
249        """Async close"""
250        self.close(user_data)
251        await self.wait_closed()
252
253    async def receive(self) -> util.Bytes:
254        """Receive data"""
255        try:
256            return await self._receive_queue.get()
257
258        except aio.QueueClosedError:
259            raise ConnectionError()
260
261    async def send(self, data: util.Bytes):
262        """Send data"""
263        try:
264            await self._send_queue.put((data, None))
265
266        except aio.QueueClosedError:
267            raise ConnectionError()
268
269    async def drain(self):
270        """Drain output buffer"""
271        try:
272            future = self._loop.create_future()
273            await self._send_queue.put((None, future))
274            await future
275
276        except aio.QueueClosedError:
277            raise ConnectionError()
278
279    async def _on_close(self):
280        await _close_cotp(self._conn, self._close_spdu)
281
282    def _close(self, spdu):
283        if not self.is_open:
284            return
285
286        self._close_spdu = spdu
287        self._async_group.close()
288
289    async def _receive_loop(self):
290        try:
291            data = bytearray()
292            while True:
293                spdu_bytes = await self._conn.receive()
294                spdu = encoder.decode(memoryview(spdu_bytes))
295
296                if spdu.type == common.SpduType.DT:
297                    data.extend(spdu.data)
298
299                    if spdu.end is None or spdu.end:
300                        await self._receive_queue.put(data)
301                        data = bytearray()
302
303                elif spdu.type == common.SpduType.FN:
304                    self._close(_dn_spdu)
305                    break
306
307                elif spdu.type == common.SpduType.AB:
308                    self._close(None)
309                    break
310
311                else:
312                    self._close(_ab_spdu)
313                    break
314
315        except ConnectionError:
316            pass
317
318        except Exception as e:
319            mlog.error("receive loop error: %s", e, exc_info=e)
320
321        finally:
322            self.close()
323            self._receive_queue.close()
324
325    async def _send_loop(self):
326        future = None
327        try:
328            while True:
329                data, future = await self._send_queue.get()
330
331                if data is None:
332                    await self._conn.drain()
333
334                else:
335                    spdu = common.Spdu(type=common.SpduType.DT,
336                                       data=data)
337                    spdu_bytes = encoder.encode(spdu)
338
339                    msg = bytes(itertools.chain(common.give_tokens_spdu_bytes,
340                                                spdu_bytes))
341
342                    await self._conn.send(msg)
343
344                if future and not future.done():
345                    future.set_result(None)
346
347        except ConnectionError:
348            pass
349
350        except Exception as e:
351            mlog.error("send loop error: %s", e, exc_info=e)
352
353        finally:
354            self.close()
355            self._send_queue.close()
356
357            while True:
358                if future and not future.done():
359                    future.set_result(None)
360                if self._send_queue.empty():
361                    break
362                _, future = self._send_queue.get_nowait()

COSP connection

For creating new connection see connect or listen.

Connection( conn: Connection, cn_spdu: hat.drivers.cosp.common.Spdu, ac_spdu: hat.drivers.cosp.common.Spdu, local_ssel: int | None, remote_ssel: int | None, receive_queue_size: int, send_queue_size: int)
196    def __init__(self,
197                 conn: cotp.Connection,
198                 cn_spdu: common.Spdu,
199                 ac_spdu: common.Spdu,
200                 local_ssel: int | None,
201                 remote_ssel: int | None,
202                 receive_queue_size: int,
203                 send_queue_size: int):
204        self._conn = conn
205        self._conn_req_user_data = cn_spdu.user_data
206        self._conn_res_user_data = ac_spdu.user_data
207        self._loop = asyncio.get_running_loop()
208        self._info = ConnectionInfo(local_ssel=local_ssel,
209                                    remote_ssel=remote_ssel,
210                                    **conn.info._asdict())
211        self._close_spdu = None
212        self._receive_queue = aio.Queue(receive_queue_size)
213        self._send_queue = aio.Queue(send_queue_size)
214        self._async_group = aio.Group()
215
216        self.async_group.spawn(aio.call_on_cancel, self._on_close)
217        self.async_group.spawn(self._receive_loop)
218        self.async_group.spawn(self._send_loop)
219        self.async_group.spawn(aio.call_on_done, conn.wait_closing(),
220                               self.close)
async_group: hat.aio.group.Group
222    @property
223    def async_group(self) -> aio.Group:
224        """Async group"""
225        return self._async_group

Async group

info: ConnectionInfo
227    @property
228    def info(self) -> ConnectionInfo:
229        """Connection info"""
230        return self._info

Connection info

conn_req_user_data: bytes | bytearray | memoryview
232    @property
233    def conn_req_user_data(self) -> util.Bytes:
234        """Connect request's user data"""
235        return self._conn_req_user_data

Connect request's user data

conn_res_user_data: bytes | bytearray | memoryview
237    @property
238    def conn_res_user_data(self) -> util.Bytes:
239        """Connect response's user data"""
240        return self._conn_res_user_data

Connect response's user data

def close(self, user_data: bytes | bytearray | memoryview | None = None):
242    def close(self, user_data: util.Bytes | None = None):
243        """Close connection"""
244        self._close(common.Spdu(common.SpduType.FN,
245                                transport_disconnect=True,
246                                user_data=user_data))

Close connection

async def async_close(self, user_data: bytes | bytearray | memoryview | None = None):
248    async def async_close(self, user_data: util.Bytes | None = None):
249        """Async close"""
250        self.close(user_data)
251        await self.wait_closed()

Async close

async def receive(self) -> bytes | bytearray | memoryview:
253    async def receive(self) -> util.Bytes:
254        """Receive data"""
255        try:
256            return await self._receive_queue.get()
257
258        except aio.QueueClosedError:
259            raise ConnectionError()

Receive data

async def send(self, data: bytes | bytearray | memoryview):
261    async def send(self, data: util.Bytes):
262        """Send data"""
263        try:
264            await self._send_queue.put((data, None))
265
266        except aio.QueueClosedError:
267            raise ConnectionError()

Send data

async def drain(self):
269    async def drain(self):
270        """Drain output buffer"""
271        try:
272            future = self._loop.create_future()
273            await self._send_queue.put((None, future))
274            await future
275
276        except aio.QueueClosedError:
277            raise ConnectionError()

Drain output buffer