hat.drivers.chatter

Chatter communication protocol

  1"""Chatter communication protocol"""
  2
  3import asyncio
  4import contextlib
  5import importlib.resources
  6import itertools
  7import logging
  8import math
  9import typing
 10
 11from hat import aio
 12from hat import sbs
 13from hat import util
 14
 15from hat.drivers import tcp
 16
 17
 18mlog: logging.Logger = logging.getLogger(__name__)
 19"""Module logger"""
 20
 21ConnectionCb: typing.TypeAlias = aio.AsyncCallable[['Connection'], None]
 22"""Connection callback"""
 23
 24
 25class Data(typing.NamedTuple):
 26    type: str
 27    data: util.Bytes
 28
 29
 30class Conversation(typing.NamedTuple):
 31    owner: bool
 32    first_id: int
 33
 34
 35class Msg(typing.NamedTuple):
 36    data: Data
 37    conv: Conversation
 38    first: bool
 39    last: bool
 40    token: bool
 41
 42
 43async def connect(addr: tcp.Address,
 44                  *,
 45                  ping_delay: float = 20,
 46                  ping_timeout: float = 20,
 47                  receive_queue_size: int = 1024,
 48                  send_queue_size: int = 1024,
 49                  **kwargs
 50                  ) -> 'Connection':
 51    """Connect to remote server
 52
 53    Argument `addr` specifies remote server listening address.
 54
 55    If `ping_delay` is ``None`` or 0, ping requests are not sent.
 56    Otherwise, it represents ping request delay in seconds.
 57
 58    Additional arguments are passed directly to `hat.drivers.tcp.connect`.
 59
 60    """
 61    conn = await tcp.connect(addr, **kwargs)
 62
 63    try:
 64        return Connection(conn=conn,
 65                          ping_delay=ping_delay,
 66                          ping_timeout=ping_timeout,
 67                          receive_queue_size=receive_queue_size,
 68                          send_queue_size=send_queue_size)
 69
 70    except Exception:
 71        await aio.uncancellable(conn.async_close())
 72        raise
 73
 74
 75async def listen(connection_cb: ConnectionCb,
 76                 addr: tcp.Address,
 77                 *,
 78                 ping_delay: float = 20,
 79                 ping_timeout: float = 20,
 80                 receive_queue_size: int = 1024,
 81                 send_queue_size: int = 1024,
 82                 bind_connections: bool = True,
 83                 **kwargs
 84                 ) -> tcp.Server:
 85    """Create listening server.
 86
 87    Argument `addr` specifies local server listening address.
 88
 89    If `ping_delay` is ``None`` or 0, ping requests are not sent.
 90    Otherwise, it represents ping request delay in seconds.
 91
 92    Additional arguments are passed directly to `hat.drivers.tcp.listen`.
 93
 94    """
 95
 96    log = _create_server_logger(kwargs.get('name'), None)
 97
 98    async def on_connection(conn):
 99        try:
100            conn = Connection(conn=conn,
101                              ping_delay=ping_delay,
102                              ping_timeout=ping_timeout,
103                              receive_queue_size=receive_queue_size,
104                              send_queue_size=send_queue_size)
105
106            await aio.call(connection_cb, conn)
107
108        except Exception as e:
109            log.warning('connection callback error: %s', e, exc_info=e)
110            await aio.uncancellable(conn.async_close())
111
112        except BaseException:
113            await aio.uncancellable(conn.async_close())
114            raise
115
116    server = await tcp.listen(on_connection, addr,
117                              bind_connections=bind_connections,
118                              **kwargs)
119
120    log = _create_server_logger(kwargs.get('name'), server.info)
121
122    return server
123
124
125class Connection(aio.Resource):
126    """Single connection
127
128    For creating new connection see `connect` coroutine.
129
130    """
131
132    def __init__(self,
133                 conn: tcp.Connection,
134                 ping_delay: float,
135                 ping_timeout: float,
136                 receive_queue_size: int,
137                 send_queue_size: int):
138        self._conn = conn
139        self._receive_queue = aio.Queue(receive_queue_size)
140        self._send_queue = aio.Queue(receive_queue_size)
141        self._loop = asyncio.get_running_loop()
142        self._next_msg_ids = itertools.count(1)
143        self._ping_event = asyncio.Event()
144        self._log = _create_connection_logger(conn.info)
145
146        self.async_group.spawn(self._read_loop)
147        self.async_group.spawn(self._write_loop)
148
149        if ping_delay:
150            self.async_group.spawn(self._ping_loop, ping_delay, ping_timeout)
151
152    @property
153    def async_group(self) -> aio.Group:
154        """Async group"""
155        return self._conn.async_group
156
157    @property
158    def info(self) -> tcp.ConnectionInfo:
159        """Connection info"""
160        return self._conn.info
161
162    async def receive(self) -> Msg:
163        """Receive message"""
164        try:
165            return await self._receive_queue.get()
166
167        except aio.QueueClosedError:
168            raise ConnectionError()
169
170    async def send(self,
171                   data: Data,
172                   *,
173                   conv: Conversation | None = None,
174                   last: bool = True,
175                   token: bool = True
176                   ) -> Conversation:
177        """Send message
178
179        If `conv` is ``None``, new conversation is created.
180
181        """
182        if self.is_closing:
183            raise ConnectionError()
184
185        msg_id = next(self._next_msg_ids)
186
187        if not conv:
188            conv = Conversation(owner=True,
189                                first_id=msg_id)
190
191        msg = {'id': msg_id,
192               'first': conv.first_id,
193               'owner': conv.owner,
194               'token': token,
195               'last': last,
196               'data': {'type': data.type,
197                        'data': data.data}}
198        await self._send_queue.put((msg, None))
199
200        return conv
201
202    async def drain(self):
203        """Drain output buffer"""
204        future = self._loop.create_future()
205        try:
206            await self._send_queue.put((None, future))
207            await future
208
209        except aio.QueueClosedError:
210            raise ConnectionError()
211
212    async def _read_loop(self):
213        self._log.debug("connection's read loop started")
214        try:
215            while True:
216                self._log.debug("waiting for incoming message")
217                data = await self._read()
218                msg = Msg(
219                    data=Data(type=data['data']['type'],
220                              data=data['data']['data']),
221                    conv=Conversation(owner=not data['owner'],
222                                      first_id=data['first']),
223                    first=data['owner'] and data['first'] == data['id'],
224                    last=data['last'],
225                    token=data['token'])
226
227                self._ping_event.set()
228
229                if msg.data.type == 'HatChatter.Ping':
230                    self._log.debug("received ping request - "
231                                    "sending ping response")
232                    await self.send(Data('HatChatter.Pong', b''),
233                                    conv=msg.conv)
234
235                elif msg.data.type == 'HatChatter.Pong':
236                    self._log.debug("received ping response")
237
238                else:
239                    self._log.debug("received message %s", msg.data.type)
240                    await self._receive_queue.put(msg)
241
242        except ConnectionError:
243            self._log.debug("connection error")
244
245        except Exception as e:
246            self._log.error("read loop error: %s", e, exc_info=e)
247
248        finally:
249            self._log.debug("connection's read loop stopping")
250            self.close()
251            self._receive_queue.close()
252
253    async def _write_loop(self):
254        self._log.debug("connection's write loop started")
255        future = None
256
257        try:
258            while True:
259                self._log.debug("waiting for outgoing message")
260                msg, future = await self._send_queue.get()
261
262                if msg is None:
263                    self._log.debug("draining output buffer")
264                    await self._conn.drain()
265
266                else:
267                    self._log.debug("writing message %s", msg['data']['type'])
268                    await self._write(msg)
269
270                if future and not future.done():
271                    future.set_result(None)
272
273        except ConnectionError:
274            self._log.debug("connection error")
275
276        except Exception as e:
277            self._log.error("write loop error: %s", e, exc_info=e)
278
279        finally:
280            self._log.debug("connection's write loop stopping")
281            self.close()
282            self._send_queue.close()
283
284            while True:
285                if future and not future.done():
286                    future.set_exception(ConnectionError())
287                if self._send_queue.empty():
288                    break
289                _, future = self._send_queue.get_nowait()
290
291    async def _ping_loop(self, delay, timeout):
292        self._log.debug("ping loop started")
293        try:
294            while True:
295                self._ping_event.clear()
296
297                with contextlib.suppress(asyncio.TimeoutError):
298                    await aio.wait_for(self._ping_event.wait(), delay)
299                    continue
300
301                self._log.debug("sending ping request")
302                await self.send(Data('HatChatter.Ping', b''),
303                                last=False)
304
305                with contextlib.suppress(asyncio.TimeoutError):
306                    await aio.wait_for(self._ping_event.wait(), timeout)
307                    continue
308
309                self._log.warning("ping timeout")
310                break
311
312        except ConnectionError:
313            pass
314
315        finally:
316            self._log.debug("ping loop stopped")
317            self.close()
318
319    async def _read(self):
320        msg_len_len_bytes = await self._conn.readexactly(1)
321        msg_len_len = msg_len_len_bytes[0]
322
323        msg_len_bytes = await self._conn.readexactly(msg_len_len)
324        msg_len = _bebytes_to_uint(msg_len_bytes)
325
326        msg_bytes = await self._conn.readexactly(msg_len)
327        msg = _sbs_repo.decode('HatChatter.Msg', msg_bytes)
328
329        return msg
330
331    async def _write(self, msg):
332        msg_bytes = _sbs_repo.encode('HatChatter.Msg', msg)
333        msg_len = len(msg_bytes)
334        msg_len_bytes = _uint_to_bebytes(msg_len)
335        msg_len_len_bytes = [len(msg_len_bytes)]
336
337        await self._conn.write(bytes(itertools.chain(msg_len_len_bytes,
338                                                     msg_len_bytes,
339                                                     msg_bytes)))
340
341
342def _uint_to_bebytes(x):
343    bytes_len = max(math.ceil(x.bit_length() / 8), 1)
344    return x.to_bytes(bytes_len, 'big')
345
346
347def _bebytes_to_uint(b):
348    return int.from_bytes(b, 'big')
349
350
351with importlib.resources.as_file(importlib.resources.files(__package__) /
352                                 'sbs_repo.json') as _path:
353    _sbs_repo = sbs.Repository.from_json(_path)
354
355
356def _create_server_logger(name, info):
357    extra = {'meta': {'type': 'ChatterServer',
358                      'name': name}}
359
360    if info is not None:
361        extra['meta']['addresses'] = [{'host': addr.host,
362                                       'port': addr.port}
363                                      for addr in info.addresses]
364
365    return logging.LoggerAdapter(mlog, extra)
366
367
368def _create_connection_logger(info):
369    extra = {'meta': {'type': 'ChatterConnection',
370                      'name': info.name,
371                      'local_addr': {'host': info.local_addr.host,
372                                     'port': info.local_addr.port},
373                      'remote_addr': {'host': info.remote_addr.host,
374                                      'port': info.remote_addr.port}}}
375
376    return logging.LoggerAdapter(mlog, extra)
mlog: logging.Logger = <Logger hat.drivers.chatter (WARNING)>

Module logger

ConnectionCb: TypeAlias = Callable[[ForwardRef('Connection')], None | Awaitable[None]]

Connection callback

class Data(typing.NamedTuple):
26class Data(typing.NamedTuple):
27    type: str
28    data: util.Bytes

Data(type, data)

Data(type: str, data: bytes | bytearray | memoryview)

Create new instance of Data(type, data)

type: str

Alias for field number 0

data: bytes | bytearray | memoryview

Alias for field number 1

class Conversation(typing.NamedTuple):
31class Conversation(typing.NamedTuple):
32    owner: bool
33    first_id: int

Conversation(owner, first_id)

Conversation(owner: bool, first_id: int)

Create new instance of Conversation(owner, first_id)

owner: bool

Alias for field number 0

first_id: int

Alias for field number 1

class Msg(typing.NamedTuple):
36class Msg(typing.NamedTuple):
37    data: Data
38    conv: Conversation
39    first: bool
40    last: bool
41    token: bool

Msg(data, conv, first, last, token)

Msg( data: Data, conv: Conversation, first: bool, last: bool, token: bool)

Create new instance of Msg(data, conv, first, last, token)

data: Data

Alias for field number 0

conv: Conversation

Alias for field number 1

first: bool

Alias for field number 2

last: bool

Alias for field number 3

token: bool

Alias for field number 4

async def connect( addr: hat.drivers.tcp.Address, *, ping_delay: float = 20, ping_timeout: float = 20, receive_queue_size: int = 1024, send_queue_size: int = 1024, **kwargs) -> Connection:
44async def connect(addr: tcp.Address,
45                  *,
46                  ping_delay: float = 20,
47                  ping_timeout: float = 20,
48                  receive_queue_size: int = 1024,
49                  send_queue_size: int = 1024,
50                  **kwargs
51                  ) -> 'Connection':
52    """Connect to remote server
53
54    Argument `addr` specifies remote server listening address.
55
56    If `ping_delay` is ``None`` or 0, ping requests are not sent.
57    Otherwise, it represents ping request delay in seconds.
58
59    Additional arguments are passed directly to `hat.drivers.tcp.connect`.
60
61    """
62    conn = await tcp.connect(addr, **kwargs)
63
64    try:
65        return Connection(conn=conn,
66                          ping_delay=ping_delay,
67                          ping_timeout=ping_timeout,
68                          receive_queue_size=receive_queue_size,
69                          send_queue_size=send_queue_size)
70
71    except Exception:
72        await aio.uncancellable(conn.async_close())
73        raise

Connect to remote server

Argument addr specifies remote server listening address.

If ping_delay is None or 0, ping requests are not sent. Otherwise, it represents ping request delay in seconds.

Additional arguments are passed directly to hat.drivers.tcp.connect.

async def listen( connection_cb: Callable[[Connection], None | Awaitable[None]], addr: hat.drivers.tcp.Address, *, ping_delay: float = 20, ping_timeout: float = 20, receive_queue_size: int = 1024, send_queue_size: int = 1024, bind_connections: bool = True, **kwargs) -> hat.drivers.tcp.Server:
 76async def listen(connection_cb: ConnectionCb,
 77                 addr: tcp.Address,
 78                 *,
 79                 ping_delay: float = 20,
 80                 ping_timeout: float = 20,
 81                 receive_queue_size: int = 1024,
 82                 send_queue_size: int = 1024,
 83                 bind_connections: bool = True,
 84                 **kwargs
 85                 ) -> tcp.Server:
 86    """Create listening server.
 87
 88    Argument `addr` specifies local server listening address.
 89
 90    If `ping_delay` is ``None`` or 0, ping requests are not sent.
 91    Otherwise, it represents ping request delay in seconds.
 92
 93    Additional arguments are passed directly to `hat.drivers.tcp.listen`.
 94
 95    """
 96
 97    log = _create_server_logger(kwargs.get('name'), None)
 98
 99    async def on_connection(conn):
100        try:
101            conn = Connection(conn=conn,
102                              ping_delay=ping_delay,
103                              ping_timeout=ping_timeout,
104                              receive_queue_size=receive_queue_size,
105                              send_queue_size=send_queue_size)
106
107            await aio.call(connection_cb, conn)
108
109        except Exception as e:
110            log.warning('connection callback error: %s', e, exc_info=e)
111            await aio.uncancellable(conn.async_close())
112
113        except BaseException:
114            await aio.uncancellable(conn.async_close())
115            raise
116
117    server = await tcp.listen(on_connection, addr,
118                              bind_connections=bind_connections,
119                              **kwargs)
120
121    log = _create_server_logger(kwargs.get('name'), server.info)
122
123    return server

Create listening server.

Argument addr specifies local server listening address.

If ping_delay is None or 0, ping requests are not sent. Otherwise, it represents ping request delay in seconds.

Additional arguments are passed directly to hat.drivers.tcp.listen.

class Connection(hat.aio.group.Resource):
126class Connection(aio.Resource):
127    """Single connection
128
129    For creating new connection see `connect` coroutine.
130
131    """
132
133    def __init__(self,
134                 conn: tcp.Connection,
135                 ping_delay: float,
136                 ping_timeout: float,
137                 receive_queue_size: int,
138                 send_queue_size: int):
139        self._conn = conn
140        self._receive_queue = aio.Queue(receive_queue_size)
141        self._send_queue = aio.Queue(receive_queue_size)
142        self._loop = asyncio.get_running_loop()
143        self._next_msg_ids = itertools.count(1)
144        self._ping_event = asyncio.Event()
145        self._log = _create_connection_logger(conn.info)
146
147        self.async_group.spawn(self._read_loop)
148        self.async_group.spawn(self._write_loop)
149
150        if ping_delay:
151            self.async_group.spawn(self._ping_loop, ping_delay, ping_timeout)
152
153    @property
154    def async_group(self) -> aio.Group:
155        """Async group"""
156        return self._conn.async_group
157
158    @property
159    def info(self) -> tcp.ConnectionInfo:
160        """Connection info"""
161        return self._conn.info
162
163    async def receive(self) -> Msg:
164        """Receive message"""
165        try:
166            return await self._receive_queue.get()
167
168        except aio.QueueClosedError:
169            raise ConnectionError()
170
171    async def send(self,
172                   data: Data,
173                   *,
174                   conv: Conversation | None = None,
175                   last: bool = True,
176                   token: bool = True
177                   ) -> Conversation:
178        """Send message
179
180        If `conv` is ``None``, new conversation is created.
181
182        """
183        if self.is_closing:
184            raise ConnectionError()
185
186        msg_id = next(self._next_msg_ids)
187
188        if not conv:
189            conv = Conversation(owner=True,
190                                first_id=msg_id)
191
192        msg = {'id': msg_id,
193               'first': conv.first_id,
194               'owner': conv.owner,
195               'token': token,
196               'last': last,
197               'data': {'type': data.type,
198                        'data': data.data}}
199        await self._send_queue.put((msg, None))
200
201        return conv
202
203    async def drain(self):
204        """Drain output buffer"""
205        future = self._loop.create_future()
206        try:
207            await self._send_queue.put((None, future))
208            await future
209
210        except aio.QueueClosedError:
211            raise ConnectionError()
212
213    async def _read_loop(self):
214        self._log.debug("connection's read loop started")
215        try:
216            while True:
217                self._log.debug("waiting for incoming message")
218                data = await self._read()
219                msg = Msg(
220                    data=Data(type=data['data']['type'],
221                              data=data['data']['data']),
222                    conv=Conversation(owner=not data['owner'],
223                                      first_id=data['first']),
224                    first=data['owner'] and data['first'] == data['id'],
225                    last=data['last'],
226                    token=data['token'])
227
228                self._ping_event.set()
229
230                if msg.data.type == 'HatChatter.Ping':
231                    self._log.debug("received ping request - "
232                                    "sending ping response")
233                    await self.send(Data('HatChatter.Pong', b''),
234                                    conv=msg.conv)
235
236                elif msg.data.type == 'HatChatter.Pong':
237                    self._log.debug("received ping response")
238
239                else:
240                    self._log.debug("received message %s", msg.data.type)
241                    await self._receive_queue.put(msg)
242
243        except ConnectionError:
244            self._log.debug("connection error")
245
246        except Exception as e:
247            self._log.error("read loop error: %s", e, exc_info=e)
248
249        finally:
250            self._log.debug("connection's read loop stopping")
251            self.close()
252            self._receive_queue.close()
253
254    async def _write_loop(self):
255        self._log.debug("connection's write loop started")
256        future = None
257
258        try:
259            while True:
260                self._log.debug("waiting for outgoing message")
261                msg, future = await self._send_queue.get()
262
263                if msg is None:
264                    self._log.debug("draining output buffer")
265                    await self._conn.drain()
266
267                else:
268                    self._log.debug("writing message %s", msg['data']['type'])
269                    await self._write(msg)
270
271                if future and not future.done():
272                    future.set_result(None)
273
274        except ConnectionError:
275            self._log.debug("connection error")
276
277        except Exception as e:
278            self._log.error("write loop error: %s", e, exc_info=e)
279
280        finally:
281            self._log.debug("connection's write loop stopping")
282            self.close()
283            self._send_queue.close()
284
285            while True:
286                if future and not future.done():
287                    future.set_exception(ConnectionError())
288                if self._send_queue.empty():
289                    break
290                _, future = self._send_queue.get_nowait()
291
292    async def _ping_loop(self, delay, timeout):
293        self._log.debug("ping loop started")
294        try:
295            while True:
296                self._ping_event.clear()
297
298                with contextlib.suppress(asyncio.TimeoutError):
299                    await aio.wait_for(self._ping_event.wait(), delay)
300                    continue
301
302                self._log.debug("sending ping request")
303                await self.send(Data('HatChatter.Ping', b''),
304                                last=False)
305
306                with contextlib.suppress(asyncio.TimeoutError):
307                    await aio.wait_for(self._ping_event.wait(), timeout)
308                    continue
309
310                self._log.warning("ping timeout")
311                break
312
313        except ConnectionError:
314            pass
315
316        finally:
317            self._log.debug("ping loop stopped")
318            self.close()
319
320    async def _read(self):
321        msg_len_len_bytes = await self._conn.readexactly(1)
322        msg_len_len = msg_len_len_bytes[0]
323
324        msg_len_bytes = await self._conn.readexactly(msg_len_len)
325        msg_len = _bebytes_to_uint(msg_len_bytes)
326
327        msg_bytes = await self._conn.readexactly(msg_len)
328        msg = _sbs_repo.decode('HatChatter.Msg', msg_bytes)
329
330        return msg
331
332    async def _write(self, msg):
333        msg_bytes = _sbs_repo.encode('HatChatter.Msg', msg)
334        msg_len = len(msg_bytes)
335        msg_len_bytes = _uint_to_bebytes(msg_len)
336        msg_len_len_bytes = [len(msg_len_bytes)]
337
338        await self._conn.write(bytes(itertools.chain(msg_len_len_bytes,
339                                                     msg_len_bytes,
340                                                     msg_bytes)))

Single connection

For creating new connection see connect coroutine.

Connection( conn: hat.drivers.tcp.Connection, ping_delay: float, ping_timeout: float, receive_queue_size: int, send_queue_size: int)
133    def __init__(self,
134                 conn: tcp.Connection,
135                 ping_delay: float,
136                 ping_timeout: float,
137                 receive_queue_size: int,
138                 send_queue_size: int):
139        self._conn = conn
140        self._receive_queue = aio.Queue(receive_queue_size)
141        self._send_queue = aio.Queue(receive_queue_size)
142        self._loop = asyncio.get_running_loop()
143        self._next_msg_ids = itertools.count(1)
144        self._ping_event = asyncio.Event()
145        self._log = _create_connection_logger(conn.info)
146
147        self.async_group.spawn(self._read_loop)
148        self.async_group.spawn(self._write_loop)
149
150        if ping_delay:
151            self.async_group.spawn(self._ping_loop, ping_delay, ping_timeout)
async_group: hat.aio.group.Group
153    @property
154    def async_group(self) -> aio.Group:
155        """Async group"""
156        return self._conn.async_group

Async group

info: hat.drivers.tcp.ConnectionInfo
158    @property
159    def info(self) -> tcp.ConnectionInfo:
160        """Connection info"""
161        return self._conn.info

Connection info

async def receive(self) -> Msg:
163    async def receive(self) -> Msg:
164        """Receive message"""
165        try:
166            return await self._receive_queue.get()
167
168        except aio.QueueClosedError:
169            raise ConnectionError()

Receive message

async def send( self, data: Data, *, conv: Conversation | None = None, last: bool = True, token: bool = True) -> Conversation:
171    async def send(self,
172                   data: Data,
173                   *,
174                   conv: Conversation | None = None,
175                   last: bool = True,
176                   token: bool = True
177                   ) -> Conversation:
178        """Send message
179
180        If `conv` is ``None``, new conversation is created.
181
182        """
183        if self.is_closing:
184            raise ConnectionError()
185
186        msg_id = next(self._next_msg_ids)
187
188        if not conv:
189            conv = Conversation(owner=True,
190                                first_id=msg_id)
191
192        msg = {'id': msg_id,
193               'first': conv.first_id,
194               'owner': conv.owner,
195               'token': token,
196               'last': last,
197               'data': {'type': data.type,
198                        'data': data.data}}
199        await self._send_queue.put((msg, None))
200
201        return conv

Send message

If conv is None, new conversation is created.

async def drain(self):
203    async def drain(self):
204        """Drain output buffer"""
205        future = self._loop.create_future()
206        try:
207            await self._send_queue.put((None, future))
208            await future
209
210        except aio.QueueClosedError:
211            raise ConnectionError()

Drain output buffer