hat.drivers.chatter

Chatter communication protocol

  1"""Chatter communication protocol"""
  2
  3import asyncio
  4import contextlib
  5import importlib.resources
  6import itertools
  7import logging
  8import math
  9import typing
 10
 11from hat import aio
 12from hat import sbs
 13from hat import util
 14
 15from hat.drivers import tcp
 16
 17
 18mlog: logging.Logger = logging.getLogger(__name__)
 19"""Module logger"""
 20
 21ConnectionCb: typing.TypeAlias = aio.AsyncCallable[['Connection'], None]
 22"""Connection callback"""
 23
 24
 25class Data(typing.NamedTuple):
 26    type: str
 27    data: util.Bytes
 28
 29
 30class Conversation(typing.NamedTuple):
 31    owner: bool
 32    first_id: int
 33
 34
 35class Msg(typing.NamedTuple):
 36    data: Data
 37    conv: Conversation
 38    first: bool
 39    last: bool
 40    token: bool
 41
 42
 43async def connect(addr: tcp.Address,
 44                  *,
 45                  ping_delay: float = 20,
 46                  ping_timeout: float = 20,
 47                  receive_queue_size: int = 1024,
 48                  send_queue_size: int = 1024,
 49                  **kwargs
 50                  ) -> 'Connection':
 51    """Connect to remote server
 52
 53    Argument `addr` specifies remote server listening address.
 54
 55    If `ping_delay` is ``None`` or 0, ping requests are not sent.
 56    Otherwise, it represents ping request delay in seconds.
 57
 58    Additional arguments are passed directly to `hat.drivers.tcp.connect`.
 59
 60    """
 61    conn = await tcp.connect(addr, **kwargs)
 62
 63    try:
 64        return Connection(conn=conn,
 65                          ping_delay=ping_delay,
 66                          ping_timeout=ping_timeout,
 67                          receive_queue_size=receive_queue_size,
 68                          send_queue_size=send_queue_size)
 69
 70    except Exception:
 71        await aio.uncancellable(conn.async_close())
 72        raise
 73
 74
 75async def listen(connection_cb: ConnectionCb,
 76                 addr: tcp.Address,
 77                 *,
 78                 ping_delay: float = 20,
 79                 ping_timeout: float = 20,
 80                 receive_queue_size: int = 1024,
 81                 send_queue_size: int = 1024,
 82                 bind_connections: bool = True,
 83                 **kwargs
 84                 ) -> tcp.Server:
 85    """Create listening server.
 86
 87    Argument `addr` specifies local server listening address.
 88
 89    If `ping_delay` is ``None`` or 0, ping requests are not sent.
 90    Otherwise, it represents ping request delay in seconds.
 91
 92    Additional arguments are passed directly to `hat.drivers.tcp.listen`.
 93
 94    """
 95
 96    async def on_connection(conn):
 97        try:
 98            conn = Connection(conn=conn,
 99                              ping_delay=ping_delay,
100                              ping_timeout=ping_timeout,
101                              receive_queue_size=receive_queue_size,
102                              send_queue_size=send_queue_size)
103
104            await aio.call(connection_cb, conn)
105
106        except Exception as e:
107            mlog.warning('connection callback error: %s', e, exc_info=e)
108            await aio.uncancellable(conn.async_close())
109
110        except BaseException:
111            await aio.uncancellable(conn.async_close())
112            raise
113
114    return await tcp.listen(on_connection, addr,
115                            bind_connections=bind_connections,
116                            **kwargs)
117
118
119class Connection(aio.Resource):
120    """Single connection
121
122    For creating new connection see `connect` coroutine.
123
124    """
125
126    def __init__(self,
127                 conn: tcp.Connection,
128                 ping_delay: float,
129                 ping_timeout: float,
130                 receive_queue_size: int,
131                 send_queue_size: int):
132        self._conn = conn
133        self._receive_queue = aio.Queue(receive_queue_size)
134        self._send_queue = aio.Queue(receive_queue_size)
135        self._loop = asyncio.get_running_loop()
136        self._next_msg_ids = itertools.count(1)
137        self._ping_event = asyncio.Event()
138
139        self.async_group.spawn(self._read_loop)
140        self.async_group.spawn(self._write_loop)
141
142        if ping_delay:
143            self.async_group.spawn(self._ping_loop, ping_delay, ping_timeout)
144
145    @property
146    def async_group(self) -> aio.Group:
147        """Async group"""
148        return self._conn.async_group
149
150    @property
151    def info(self) -> tcp.ConnectionInfo:
152        """Connection info"""
153        return self._conn.info
154
155    async def receive(self) -> Msg:
156        """Receive message"""
157        try:
158            return await self._receive_queue.get()
159
160        except aio.QueueClosedError:
161            raise ConnectionError()
162
163    async def send(self,
164                   data: Data,
165                   *,
166                   conv: Conversation | None = None,
167                   last: bool = True,
168                   token: bool = True
169                   ) -> Conversation:
170        """Send message
171
172        If `conv` is ``None``, new conversation is created.
173
174        """
175        if self.is_closing:
176            raise ConnectionError()
177
178        msg_id = next(self._next_msg_ids)
179
180        if not conv:
181            conv = Conversation(owner=True,
182                                first_id=msg_id)
183
184        msg = {'id': msg_id,
185               'first': conv.first_id,
186               'owner': conv.owner,
187               'token': token,
188               'last': last,
189               'data': {'type': data.type,
190                        'data': data.data}}
191        await self._send_queue.put((msg, None))
192
193        return conv
194
195    async def drain(self):
196        """Drain output buffer"""
197        future = self._loop.create_future()
198        try:
199            await self._send_queue.put((None, future))
200            await future
201
202        except aio.QueueClosedError:
203            raise ConnectionError()
204
205    async def _read_loop(self):
206        mlog.debug("connection's read loop started")
207        try:
208            while True:
209                mlog.debug("waiting for incoming message")
210                data = await self._read()
211                msg = Msg(
212                    data=Data(type=data['data']['type'],
213                              data=data['data']['data']),
214                    conv=Conversation(owner=not data['owner'],
215                                      first_id=data['first']),
216                    first=data['owner'] and data['first'] == data['id'],
217                    last=data['last'],
218                    token=data['token'])
219
220                self._ping_event.set()
221
222                if msg.data.type == 'HatChatter.Ping':
223                    mlog.debug("received ping request - sending ping response")
224                    await self.send(Data('HatChatter.Pong', b''),
225                                    conv=msg.conv)
226
227                elif msg.data.type == 'HatChatter.Pong':
228                    mlog.debug("received ping response")
229
230                else:
231                    mlog.debug("received message %s", msg.data.type)
232                    await self._receive_queue.put(msg)
233
234        except ConnectionError:
235            mlog.debug("connection error")
236
237        except Exception as e:
238            mlog.error("read loop error: %s", e, exc_info=e)
239
240        finally:
241            mlog.debug("connection's read loop stopping")
242            self.close()
243            self._receive_queue.close()
244
245    async def _write_loop(self):
246        mlog.debug("connection's write loop started")
247        future = None
248
249        try:
250            while True:
251                mlog.debug("waiting for outgoing message")
252                msg, future = await self._send_queue.get()
253
254                if msg is None:
255                    mlog.debug("draining output buffer")
256                    await self._conn.drain()
257
258                else:
259                    mlog.debug("writing message %s", msg['data']['type'])
260                    await self._write(msg)
261
262                if future and not future.done():
263                    future.set_result(None)
264
265        except ConnectionError:
266            mlog.debug("connection error")
267
268        except Exception as e:
269            mlog.error("write loop error: %s", e, exc_info=e)
270
271        finally:
272            mlog.debug("connection's write loop stopping")
273            self.close()
274            self._send_queue.close()
275
276            while True:
277                if future and not future.done():
278                    future.set_exception(ConnectionError())
279                if self._send_queue.empty():
280                    break
281                _, future = self._send_queue.get_nowait()
282
283    async def _ping_loop(self, delay, timeout):
284        mlog.debug("ping loop started")
285        try:
286            while True:
287                self._ping_event.clear()
288
289                with contextlib.suppress(asyncio.TimeoutError):
290                    await aio.wait_for(self._ping_event.wait(), delay)
291                    continue
292
293                mlog.debug("sending ping request")
294                await self.send(Data('HatChatter.Ping', b''),
295                                last=False)
296
297                with contextlib.suppress(asyncio.TimeoutError):
298                    await aio.wait_for(self._ping_event.wait(), timeout)
299                    continue
300
301                mlog.debug("ping timeout")
302                break
303
304        except ConnectionError:
305            pass
306
307        finally:
308            mlog.debug("ping loop stopped")
309            self.close()
310
311    async def _read(self):
312        msg_len_len_bytes = await self._conn.readexactly(1)
313        msg_len_len = msg_len_len_bytes[0]
314
315        msg_len_bytes = await self._conn.readexactly(msg_len_len)
316        msg_len = _bebytes_to_uint(msg_len_bytes)
317
318        msg_bytes = await self._conn.readexactly(msg_len)
319        msg = _sbs_repo.decode('HatChatter.Msg', msg_bytes)
320
321        return msg
322
323    async def _write(self, msg):
324        msg_bytes = _sbs_repo.encode('HatChatter.Msg', msg)
325        msg_len = len(msg_bytes)
326        msg_len_bytes = _uint_to_bebytes(msg_len)
327        msg_len_len_bytes = [len(msg_len_bytes)]
328
329        await self._conn.write(bytes(itertools.chain(msg_len_len_bytes,
330                                                     msg_len_bytes,
331                                                     msg_bytes)))
332
333
334def _uint_to_bebytes(x):
335    bytes_len = max(math.ceil(x.bit_length() / 8), 1)
336    return x.to_bytes(bytes_len, 'big')
337
338
339def _bebytes_to_uint(b):
340    return int.from_bytes(b, 'big')
341
342
343with importlib.resources.as_file(importlib.resources.files(__package__) /
344                                 'sbs_repo.json') as _path:
345    _sbs_repo = sbs.Repository.from_json(_path)
mlog: logging.Logger = <Logger hat.drivers.chatter (WARNING)>

Module logger

ConnectionCb: TypeAlias = Callable[[ForwardRef('Connection')], None | Awaitable[None]]

Connection callback

class Data(typing.NamedTuple):
26class Data(typing.NamedTuple):
27    type: str
28    data: util.Bytes

Data(type, data)

Data(type: str, data: bytes | bytearray | memoryview)

Create new instance of Data(type, data)

type: str

Alias for field number 0

data: bytes | bytearray | memoryview

Alias for field number 1

class Conversation(typing.NamedTuple):
31class Conversation(typing.NamedTuple):
32    owner: bool
33    first_id: int

Conversation(owner, first_id)

Conversation(owner: bool, first_id: int)

Create new instance of Conversation(owner, first_id)

owner: bool

Alias for field number 0

first_id: int

Alias for field number 1

class Msg(typing.NamedTuple):
36class Msg(typing.NamedTuple):
37    data: Data
38    conv: Conversation
39    first: bool
40    last: bool
41    token: bool

Msg(data, conv, first, last, token)

Msg( data: Data, conv: Conversation, first: bool, last: bool, token: bool)

Create new instance of Msg(data, conv, first, last, token)

data: Data

Alias for field number 0

conv: Conversation

Alias for field number 1

first: bool

Alias for field number 2

last: bool

Alias for field number 3

token: bool

Alias for field number 4

async def connect( addr: hat.drivers.tcp.Address, *, ping_delay: float = 20, ping_timeout: float = 20, receive_queue_size: int = 1024, send_queue_size: int = 1024, **kwargs) -> Connection:
44async def connect(addr: tcp.Address,
45                  *,
46                  ping_delay: float = 20,
47                  ping_timeout: float = 20,
48                  receive_queue_size: int = 1024,
49                  send_queue_size: int = 1024,
50                  **kwargs
51                  ) -> 'Connection':
52    """Connect to remote server
53
54    Argument `addr` specifies remote server listening address.
55
56    If `ping_delay` is ``None`` or 0, ping requests are not sent.
57    Otherwise, it represents ping request delay in seconds.
58
59    Additional arguments are passed directly to `hat.drivers.tcp.connect`.
60
61    """
62    conn = await tcp.connect(addr, **kwargs)
63
64    try:
65        return Connection(conn=conn,
66                          ping_delay=ping_delay,
67                          ping_timeout=ping_timeout,
68                          receive_queue_size=receive_queue_size,
69                          send_queue_size=send_queue_size)
70
71    except Exception:
72        await aio.uncancellable(conn.async_close())
73        raise

Connect to remote server

Argument addr specifies remote server listening address.

If ping_delay is None or 0, ping requests are not sent. Otherwise, it represents ping request delay in seconds.

Additional arguments are passed directly to hat.drivers.tcp.connect.

async def listen( connection_cb: Callable[[hat.drivers.acse.Connection], None | Awaitable[None]], addr: hat.drivers.tcp.Address, *, ping_delay: float = 20, ping_timeout: float = 20, receive_queue_size: int = 1024, send_queue_size: int = 1024, bind_connections: bool = True, **kwargs) -> hat.drivers.tcp.Server:
 76async def listen(connection_cb: ConnectionCb,
 77                 addr: tcp.Address,
 78                 *,
 79                 ping_delay: float = 20,
 80                 ping_timeout: float = 20,
 81                 receive_queue_size: int = 1024,
 82                 send_queue_size: int = 1024,
 83                 bind_connections: bool = True,
 84                 **kwargs
 85                 ) -> tcp.Server:
 86    """Create listening server.
 87
 88    Argument `addr` specifies local server listening address.
 89
 90    If `ping_delay` is ``None`` or 0, ping requests are not sent.
 91    Otherwise, it represents ping request delay in seconds.
 92
 93    Additional arguments are passed directly to `hat.drivers.tcp.listen`.
 94
 95    """
 96
 97    async def on_connection(conn):
 98        try:
 99            conn = Connection(conn=conn,
100                              ping_delay=ping_delay,
101                              ping_timeout=ping_timeout,
102                              receive_queue_size=receive_queue_size,
103                              send_queue_size=send_queue_size)
104
105            await aio.call(connection_cb, conn)
106
107        except Exception as e:
108            mlog.warning('connection callback error: %s', e, exc_info=e)
109            await aio.uncancellable(conn.async_close())
110
111        except BaseException:
112            await aio.uncancellable(conn.async_close())
113            raise
114
115    return await tcp.listen(on_connection, addr,
116                            bind_connections=bind_connections,
117                            **kwargs)

Create listening server.

Argument addr specifies local server listening address.

If ping_delay is None or 0, ping requests are not sent. Otherwise, it represents ping request delay in seconds.

Additional arguments are passed directly to hat.drivers.tcp.listen.

class Connection(hat.aio.group.Resource):
120class Connection(aio.Resource):
121    """Single connection
122
123    For creating new connection see `connect` coroutine.
124
125    """
126
127    def __init__(self,
128                 conn: tcp.Connection,
129                 ping_delay: float,
130                 ping_timeout: float,
131                 receive_queue_size: int,
132                 send_queue_size: int):
133        self._conn = conn
134        self._receive_queue = aio.Queue(receive_queue_size)
135        self._send_queue = aio.Queue(receive_queue_size)
136        self._loop = asyncio.get_running_loop()
137        self._next_msg_ids = itertools.count(1)
138        self._ping_event = asyncio.Event()
139
140        self.async_group.spawn(self._read_loop)
141        self.async_group.spawn(self._write_loop)
142
143        if ping_delay:
144            self.async_group.spawn(self._ping_loop, ping_delay, ping_timeout)
145
146    @property
147    def async_group(self) -> aio.Group:
148        """Async group"""
149        return self._conn.async_group
150
151    @property
152    def info(self) -> tcp.ConnectionInfo:
153        """Connection info"""
154        return self._conn.info
155
156    async def receive(self) -> Msg:
157        """Receive message"""
158        try:
159            return await self._receive_queue.get()
160
161        except aio.QueueClosedError:
162            raise ConnectionError()
163
164    async def send(self,
165                   data: Data,
166                   *,
167                   conv: Conversation | None = None,
168                   last: bool = True,
169                   token: bool = True
170                   ) -> Conversation:
171        """Send message
172
173        If `conv` is ``None``, new conversation is created.
174
175        """
176        if self.is_closing:
177            raise ConnectionError()
178
179        msg_id = next(self._next_msg_ids)
180
181        if not conv:
182            conv = Conversation(owner=True,
183                                first_id=msg_id)
184
185        msg = {'id': msg_id,
186               'first': conv.first_id,
187               'owner': conv.owner,
188               'token': token,
189               'last': last,
190               'data': {'type': data.type,
191                        'data': data.data}}
192        await self._send_queue.put((msg, None))
193
194        return conv
195
196    async def drain(self):
197        """Drain output buffer"""
198        future = self._loop.create_future()
199        try:
200            await self._send_queue.put((None, future))
201            await future
202
203        except aio.QueueClosedError:
204            raise ConnectionError()
205
206    async def _read_loop(self):
207        mlog.debug("connection's read loop started")
208        try:
209            while True:
210                mlog.debug("waiting for incoming message")
211                data = await self._read()
212                msg = Msg(
213                    data=Data(type=data['data']['type'],
214                              data=data['data']['data']),
215                    conv=Conversation(owner=not data['owner'],
216                                      first_id=data['first']),
217                    first=data['owner'] and data['first'] == data['id'],
218                    last=data['last'],
219                    token=data['token'])
220
221                self._ping_event.set()
222
223                if msg.data.type == 'HatChatter.Ping':
224                    mlog.debug("received ping request - sending ping response")
225                    await self.send(Data('HatChatter.Pong', b''),
226                                    conv=msg.conv)
227
228                elif msg.data.type == 'HatChatter.Pong':
229                    mlog.debug("received ping response")
230
231                else:
232                    mlog.debug("received message %s", msg.data.type)
233                    await self._receive_queue.put(msg)
234
235        except ConnectionError:
236            mlog.debug("connection error")
237
238        except Exception as e:
239            mlog.error("read loop error: %s", e, exc_info=e)
240
241        finally:
242            mlog.debug("connection's read loop stopping")
243            self.close()
244            self._receive_queue.close()
245
246    async def _write_loop(self):
247        mlog.debug("connection's write loop started")
248        future = None
249
250        try:
251            while True:
252                mlog.debug("waiting for outgoing message")
253                msg, future = await self._send_queue.get()
254
255                if msg is None:
256                    mlog.debug("draining output buffer")
257                    await self._conn.drain()
258
259                else:
260                    mlog.debug("writing message %s", msg['data']['type'])
261                    await self._write(msg)
262
263                if future and not future.done():
264                    future.set_result(None)
265
266        except ConnectionError:
267            mlog.debug("connection error")
268
269        except Exception as e:
270            mlog.error("write loop error: %s", e, exc_info=e)
271
272        finally:
273            mlog.debug("connection's write loop stopping")
274            self.close()
275            self._send_queue.close()
276
277            while True:
278                if future and not future.done():
279                    future.set_exception(ConnectionError())
280                if self._send_queue.empty():
281                    break
282                _, future = self._send_queue.get_nowait()
283
284    async def _ping_loop(self, delay, timeout):
285        mlog.debug("ping loop started")
286        try:
287            while True:
288                self._ping_event.clear()
289
290                with contextlib.suppress(asyncio.TimeoutError):
291                    await aio.wait_for(self._ping_event.wait(), delay)
292                    continue
293
294                mlog.debug("sending ping request")
295                await self.send(Data('HatChatter.Ping', b''),
296                                last=False)
297
298                with contextlib.suppress(asyncio.TimeoutError):
299                    await aio.wait_for(self._ping_event.wait(), timeout)
300                    continue
301
302                mlog.debug("ping timeout")
303                break
304
305        except ConnectionError:
306            pass
307
308        finally:
309            mlog.debug("ping loop stopped")
310            self.close()
311
312    async def _read(self):
313        msg_len_len_bytes = await self._conn.readexactly(1)
314        msg_len_len = msg_len_len_bytes[0]
315
316        msg_len_bytes = await self._conn.readexactly(msg_len_len)
317        msg_len = _bebytes_to_uint(msg_len_bytes)
318
319        msg_bytes = await self._conn.readexactly(msg_len)
320        msg = _sbs_repo.decode('HatChatter.Msg', msg_bytes)
321
322        return msg
323
324    async def _write(self, msg):
325        msg_bytes = _sbs_repo.encode('HatChatter.Msg', msg)
326        msg_len = len(msg_bytes)
327        msg_len_bytes = _uint_to_bebytes(msg_len)
328        msg_len_len_bytes = [len(msg_len_bytes)]
329
330        await self._conn.write(bytes(itertools.chain(msg_len_len_bytes,
331                                                     msg_len_bytes,
332                                                     msg_bytes)))

Single connection

For creating new connection see connect coroutine.

Connection( conn: hat.drivers.tcp.Connection, ping_delay: float, ping_timeout: float, receive_queue_size: int, send_queue_size: int)
127    def __init__(self,
128                 conn: tcp.Connection,
129                 ping_delay: float,
130                 ping_timeout: float,
131                 receive_queue_size: int,
132                 send_queue_size: int):
133        self._conn = conn
134        self._receive_queue = aio.Queue(receive_queue_size)
135        self._send_queue = aio.Queue(receive_queue_size)
136        self._loop = asyncio.get_running_loop()
137        self._next_msg_ids = itertools.count(1)
138        self._ping_event = asyncio.Event()
139
140        self.async_group.spawn(self._read_loop)
141        self.async_group.spawn(self._write_loop)
142
143        if ping_delay:
144            self.async_group.spawn(self._ping_loop, ping_delay, ping_timeout)
async_group: hat.aio.group.Group
146    @property
147    def async_group(self) -> aio.Group:
148        """Async group"""
149        return self._conn.async_group

Async group

info: hat.drivers.tcp.ConnectionInfo
151    @property
152    def info(self) -> tcp.ConnectionInfo:
153        """Connection info"""
154        return self._conn.info

Connection info

async def receive(self) -> Msg:
156    async def receive(self) -> Msg:
157        """Receive message"""
158        try:
159            return await self._receive_queue.get()
160
161        except aio.QueueClosedError:
162            raise ConnectionError()

Receive message

async def send( self, data: Data, *, conv: Conversation | None = None, last: bool = True, token: bool = True) -> Conversation:
164    async def send(self,
165                   data: Data,
166                   *,
167                   conv: Conversation | None = None,
168                   last: bool = True,
169                   token: bool = True
170                   ) -> Conversation:
171        """Send message
172
173        If `conv` is ``None``, new conversation is created.
174
175        """
176        if self.is_closing:
177            raise ConnectionError()
178
179        msg_id = next(self._next_msg_ids)
180
181        if not conv:
182            conv = Conversation(owner=True,
183                                first_id=msg_id)
184
185        msg = {'id': msg_id,
186               'first': conv.first_id,
187               'owner': conv.owner,
188               'token': token,
189               'last': last,
190               'data': {'type': data.type,
191                        'data': data.data}}
192        await self._send_queue.put((msg, None))
193
194        return conv

Send message

If conv is None, new conversation is created.

async def drain(self):
196    async def drain(self):
197        """Drain output buffer"""
198        future = self._loop.create_future()
199        try:
200            await self._send_queue.put((None, future))
201            await future
202
203        except aio.QueueClosedError:
204            raise ConnectionError()

Drain output buffer