hat.drivers.chatter
Chatter communication protocol
1"""Chatter communication protocol""" 2 3import asyncio 4import contextlib 5import importlib.resources 6import itertools 7import logging 8import math 9import typing 10 11from hat import aio 12from hat import sbs 13from hat import util 14 15from hat.drivers import tcp 16 17 18mlog: logging.Logger = logging.getLogger(__name__) 19"""Module logger""" 20 21ConnectionCb: typing.TypeAlias = aio.AsyncCallable[['Connection'], None] 22"""Connection callback""" 23 24 25class Data(typing.NamedTuple): 26 type: str 27 data: util.Bytes 28 29 30class Conversation(typing.NamedTuple): 31 owner: bool 32 first_id: int 33 34 35class Msg(typing.NamedTuple): 36 data: Data 37 conv: Conversation 38 first: bool 39 last: bool 40 token: bool 41 42 43async def connect(addr: tcp.Address, 44 *, 45 ping_delay: float = 20, 46 ping_timeout: float = 20, 47 receive_queue_size: int = 1024, 48 send_queue_size: int = 1024, 49 **kwargs 50 ) -> 'Connection': 51 """Connect to remote server 52 53 Argument `addr` specifies remote server listening address. 54 55 If `ping_delay` is ``None`` or 0, ping requests are not sent. 56 Otherwise, it represents ping request delay in seconds. 57 58 Additional arguments are passed directly to `hat.drivers.tcp.connect`. 59 60 """ 61 conn = await tcp.connect(addr, **kwargs) 62 63 try: 64 return Connection(conn=conn, 65 ping_delay=ping_delay, 66 ping_timeout=ping_timeout, 67 receive_queue_size=receive_queue_size, 68 send_queue_size=send_queue_size) 69 70 except Exception: 71 await aio.uncancellable(conn.async_close()) 72 raise 73 74 75async def listen(connection_cb: ConnectionCb, 76 addr: tcp.Address, 77 *, 78 ping_delay: float = 20, 79 ping_timeout: float = 20, 80 receive_queue_size: int = 1024, 81 send_queue_size: int = 1024, 82 bind_connections: bool = True, 83 **kwargs 84 ) -> tcp.Server: 85 """Create listening server. 86 87 Argument `addr` specifies local server listening address. 88 89 If `ping_delay` is ``None`` or 0, ping requests are not sent. 90 Otherwise, it represents ping request delay in seconds. 91 92 Additional arguments are passed directly to `hat.drivers.tcp.listen`. 93 94 """ 95 96 async def on_connection(conn): 97 try: 98 conn = Connection(conn=conn, 99 ping_delay=ping_delay, 100 ping_timeout=ping_timeout, 101 receive_queue_size=receive_queue_size, 102 send_queue_size=send_queue_size) 103 104 await aio.call(connection_cb, conn) 105 106 except Exception as e: 107 mlog.warning('connection callback error: %s', e, exc_info=e) 108 await aio.uncancellable(conn.async_close()) 109 110 except BaseException: 111 await aio.uncancellable(conn.async_close()) 112 raise 113 114 return await tcp.listen(on_connection, addr, 115 bind_connections=bind_connections, 116 **kwargs) 117 118 119class Connection(aio.Resource): 120 """Single connection 121 122 For creating new connection see `connect` coroutine. 123 124 """ 125 126 def __init__(self, 127 conn: tcp.Connection, 128 ping_delay: float, 129 ping_timeout: float, 130 receive_queue_size: int, 131 send_queue_size: int): 132 self._conn = conn 133 self._receive_queue = aio.Queue(receive_queue_size) 134 self._send_queue = aio.Queue(receive_queue_size) 135 self._loop = asyncio.get_running_loop() 136 self._next_msg_ids = itertools.count(1) 137 self._ping_event = asyncio.Event() 138 139 self.async_group.spawn(self._read_loop) 140 self.async_group.spawn(self._write_loop) 141 142 if ping_delay: 143 self.async_group.spawn(self._ping_loop, ping_delay, ping_timeout) 144 145 @property 146 def async_group(self) -> aio.Group: 147 """Async group""" 148 return self._conn.async_group 149 150 @property 151 def info(self) -> tcp.ConnectionInfo: 152 """Connection info""" 153 return self._conn.info 154 155 async def receive(self) -> Msg: 156 """Receive message""" 157 try: 158 return await self._receive_queue.get() 159 160 except aio.QueueClosedError: 161 raise ConnectionError() 162 163 async def send(self, 164 data: Data, 165 *, 166 conv: Conversation | None = None, 167 last: bool = True, 168 token: bool = True 169 ) -> Conversation: 170 """Send message 171 172 If `conv` is ``None``, new conversation is created. 173 174 """ 175 if self.is_closing: 176 raise ConnectionError() 177 178 msg_id = next(self._next_msg_ids) 179 180 if not conv: 181 conv = Conversation(owner=True, 182 first_id=msg_id) 183 184 msg = {'id': msg_id, 185 'first': conv.first_id, 186 'owner': conv.owner, 187 'token': token, 188 'last': last, 189 'data': {'type': data.type, 190 'data': data.data}} 191 await self._send_queue.put((msg, None)) 192 193 return conv 194 195 async def drain(self): 196 """Drain output buffer""" 197 future = self._loop.create_future() 198 try: 199 await self._send_queue.put((None, future)) 200 await future 201 202 except aio.QueueClosedError: 203 raise ConnectionError() 204 205 async def _read_loop(self): 206 mlog.debug("connection's read loop started") 207 try: 208 while True: 209 mlog.debug("waiting for incoming message") 210 data = await self._read() 211 msg = Msg( 212 data=Data(type=data['data']['type'], 213 data=data['data']['data']), 214 conv=Conversation(owner=not data['owner'], 215 first_id=data['first']), 216 first=data['owner'] and data['first'] == data['id'], 217 last=data['last'], 218 token=data['token']) 219 220 self._ping_event.set() 221 222 if msg.data.type == 'HatChatter.Ping': 223 mlog.debug("received ping request - sending ping response") 224 await self.send(Data('HatChatter.Pong', b''), 225 conv=msg.conv) 226 227 elif msg.data.type == 'HatChatter.Pong': 228 mlog.debug("received ping response") 229 230 else: 231 mlog.debug("received message %s", msg.data.type) 232 await self._receive_queue.put(msg) 233 234 except ConnectionError: 235 mlog.debug("connection error") 236 237 except Exception as e: 238 mlog.error("read loop error: %s", e, exc_info=e) 239 240 finally: 241 mlog.debug("connection's read loop stopping") 242 self.close() 243 self._receive_queue.close() 244 245 async def _write_loop(self): 246 mlog.debug("connection's write loop started") 247 future = None 248 249 try: 250 while True: 251 mlog.debug("waiting for outgoing message") 252 msg, future = await self._send_queue.get() 253 254 if msg is None: 255 mlog.debug("draining output buffer") 256 await self._conn.drain() 257 258 else: 259 mlog.debug("writing message %s", msg['data']['type']) 260 await self._write(msg) 261 262 if future and not future.done(): 263 future.set_result(None) 264 265 except ConnectionError: 266 mlog.debug("connection error") 267 268 except Exception as e: 269 mlog.error("write loop error: %s", e, exc_info=e) 270 271 finally: 272 mlog.debug("connection's write loop stopping") 273 self.close() 274 self._send_queue.close() 275 276 while True: 277 if future and not future.done(): 278 future.set_exception(ConnectionError()) 279 if self._send_queue.empty(): 280 break 281 _, future = self._send_queue.get_nowait() 282 283 async def _ping_loop(self, delay, timeout): 284 mlog.debug("ping loop started") 285 try: 286 while True: 287 self._ping_event.clear() 288 289 with contextlib.suppress(asyncio.TimeoutError): 290 await aio.wait_for(self._ping_event.wait(), delay) 291 continue 292 293 mlog.debug("sending ping request") 294 await self.send(Data('HatChatter.Ping', b''), 295 last=False) 296 297 with contextlib.suppress(asyncio.TimeoutError): 298 await aio.wait_for(self._ping_event.wait(), timeout) 299 continue 300 301 mlog.debug("ping timeout") 302 break 303 304 except ConnectionError: 305 pass 306 307 finally: 308 mlog.debug("ping loop stopped") 309 self.close() 310 311 async def _read(self): 312 msg_len_len_bytes = await self._conn.readexactly(1) 313 msg_len_len = msg_len_len_bytes[0] 314 315 msg_len_bytes = await self._conn.readexactly(msg_len_len) 316 msg_len = _bebytes_to_uint(msg_len_bytes) 317 318 msg_bytes = await self._conn.readexactly(msg_len) 319 msg = _sbs_repo.decode('HatChatter.Msg', msg_bytes) 320 321 return msg 322 323 async def _write(self, msg): 324 msg_bytes = _sbs_repo.encode('HatChatter.Msg', msg) 325 msg_len = len(msg_bytes) 326 msg_len_bytes = _uint_to_bebytes(msg_len) 327 msg_len_len_bytes = [len(msg_len_bytes)] 328 329 await self._conn.write(bytes(itertools.chain(msg_len_len_bytes, 330 msg_len_bytes, 331 msg_bytes))) 332 333 334def _uint_to_bebytes(x): 335 bytes_len = max(math.ceil(x.bit_length() / 8), 1) 336 return x.to_bytes(bytes_len, 'big') 337 338 339def _bebytes_to_uint(b): 340 return int.from_bytes(b, 'big') 341 342 343with importlib.resources.as_file(importlib.resources.files(__package__) / 344 'sbs_repo.json') as _path: 345 _sbs_repo = sbs.Repository.from_json(_path)
Module logger
Connection callback
Data(type, data)
Conversation(owner, first_id)
36class Msg(typing.NamedTuple): 37 data: Data 38 conv: Conversation 39 first: bool 40 last: bool 41 token: bool
Msg(data, conv, first, last, token)
Create new instance of Msg(data, conv, first, last, token)
44async def connect(addr: tcp.Address, 45 *, 46 ping_delay: float = 20, 47 ping_timeout: float = 20, 48 receive_queue_size: int = 1024, 49 send_queue_size: int = 1024, 50 **kwargs 51 ) -> 'Connection': 52 """Connect to remote server 53 54 Argument `addr` specifies remote server listening address. 55 56 If `ping_delay` is ``None`` or 0, ping requests are not sent. 57 Otherwise, it represents ping request delay in seconds. 58 59 Additional arguments are passed directly to `hat.drivers.tcp.connect`. 60 61 """ 62 conn = await tcp.connect(addr, **kwargs) 63 64 try: 65 return Connection(conn=conn, 66 ping_delay=ping_delay, 67 ping_timeout=ping_timeout, 68 receive_queue_size=receive_queue_size, 69 send_queue_size=send_queue_size) 70 71 except Exception: 72 await aio.uncancellable(conn.async_close()) 73 raise
Connect to remote server
Argument addr
specifies remote server listening address.
If ping_delay
is None
or 0, ping requests are not sent.
Otherwise, it represents ping request delay in seconds.
Additional arguments are passed directly to hat.drivers.tcp.connect
.
76async def listen(connection_cb: ConnectionCb, 77 addr: tcp.Address, 78 *, 79 ping_delay: float = 20, 80 ping_timeout: float = 20, 81 receive_queue_size: int = 1024, 82 send_queue_size: int = 1024, 83 bind_connections: bool = True, 84 **kwargs 85 ) -> tcp.Server: 86 """Create listening server. 87 88 Argument `addr` specifies local server listening address. 89 90 If `ping_delay` is ``None`` or 0, ping requests are not sent. 91 Otherwise, it represents ping request delay in seconds. 92 93 Additional arguments are passed directly to `hat.drivers.tcp.listen`. 94 95 """ 96 97 async def on_connection(conn): 98 try: 99 conn = Connection(conn=conn, 100 ping_delay=ping_delay, 101 ping_timeout=ping_timeout, 102 receive_queue_size=receive_queue_size, 103 send_queue_size=send_queue_size) 104 105 await aio.call(connection_cb, conn) 106 107 except Exception as e: 108 mlog.warning('connection callback error: %s', e, exc_info=e) 109 await aio.uncancellable(conn.async_close()) 110 111 except BaseException: 112 await aio.uncancellable(conn.async_close()) 113 raise 114 115 return await tcp.listen(on_connection, addr, 116 bind_connections=bind_connections, 117 **kwargs)
Create listening server.
Argument addr
specifies local server listening address.
If ping_delay
is None
or 0, ping requests are not sent.
Otherwise, it represents ping request delay in seconds.
Additional arguments are passed directly to hat.drivers.tcp.listen
.
120class Connection(aio.Resource): 121 """Single connection 122 123 For creating new connection see `connect` coroutine. 124 125 """ 126 127 def __init__(self, 128 conn: tcp.Connection, 129 ping_delay: float, 130 ping_timeout: float, 131 receive_queue_size: int, 132 send_queue_size: int): 133 self._conn = conn 134 self._receive_queue = aio.Queue(receive_queue_size) 135 self._send_queue = aio.Queue(receive_queue_size) 136 self._loop = asyncio.get_running_loop() 137 self._next_msg_ids = itertools.count(1) 138 self._ping_event = asyncio.Event() 139 140 self.async_group.spawn(self._read_loop) 141 self.async_group.spawn(self._write_loop) 142 143 if ping_delay: 144 self.async_group.spawn(self._ping_loop, ping_delay, ping_timeout) 145 146 @property 147 def async_group(self) -> aio.Group: 148 """Async group""" 149 return self._conn.async_group 150 151 @property 152 def info(self) -> tcp.ConnectionInfo: 153 """Connection info""" 154 return self._conn.info 155 156 async def receive(self) -> Msg: 157 """Receive message""" 158 try: 159 return await self._receive_queue.get() 160 161 except aio.QueueClosedError: 162 raise ConnectionError() 163 164 async def send(self, 165 data: Data, 166 *, 167 conv: Conversation | None = None, 168 last: bool = True, 169 token: bool = True 170 ) -> Conversation: 171 """Send message 172 173 If `conv` is ``None``, new conversation is created. 174 175 """ 176 if self.is_closing: 177 raise ConnectionError() 178 179 msg_id = next(self._next_msg_ids) 180 181 if not conv: 182 conv = Conversation(owner=True, 183 first_id=msg_id) 184 185 msg = {'id': msg_id, 186 'first': conv.first_id, 187 'owner': conv.owner, 188 'token': token, 189 'last': last, 190 'data': {'type': data.type, 191 'data': data.data}} 192 await self._send_queue.put((msg, None)) 193 194 return conv 195 196 async def drain(self): 197 """Drain output buffer""" 198 future = self._loop.create_future() 199 try: 200 await self._send_queue.put((None, future)) 201 await future 202 203 except aio.QueueClosedError: 204 raise ConnectionError() 205 206 async def _read_loop(self): 207 mlog.debug("connection's read loop started") 208 try: 209 while True: 210 mlog.debug("waiting for incoming message") 211 data = await self._read() 212 msg = Msg( 213 data=Data(type=data['data']['type'], 214 data=data['data']['data']), 215 conv=Conversation(owner=not data['owner'], 216 first_id=data['first']), 217 first=data['owner'] and data['first'] == data['id'], 218 last=data['last'], 219 token=data['token']) 220 221 self._ping_event.set() 222 223 if msg.data.type == 'HatChatter.Ping': 224 mlog.debug("received ping request - sending ping response") 225 await self.send(Data('HatChatter.Pong', b''), 226 conv=msg.conv) 227 228 elif msg.data.type == 'HatChatter.Pong': 229 mlog.debug("received ping response") 230 231 else: 232 mlog.debug("received message %s", msg.data.type) 233 await self._receive_queue.put(msg) 234 235 except ConnectionError: 236 mlog.debug("connection error") 237 238 except Exception as e: 239 mlog.error("read loop error: %s", e, exc_info=e) 240 241 finally: 242 mlog.debug("connection's read loop stopping") 243 self.close() 244 self._receive_queue.close() 245 246 async def _write_loop(self): 247 mlog.debug("connection's write loop started") 248 future = None 249 250 try: 251 while True: 252 mlog.debug("waiting for outgoing message") 253 msg, future = await self._send_queue.get() 254 255 if msg is None: 256 mlog.debug("draining output buffer") 257 await self._conn.drain() 258 259 else: 260 mlog.debug("writing message %s", msg['data']['type']) 261 await self._write(msg) 262 263 if future and not future.done(): 264 future.set_result(None) 265 266 except ConnectionError: 267 mlog.debug("connection error") 268 269 except Exception as e: 270 mlog.error("write loop error: %s", e, exc_info=e) 271 272 finally: 273 mlog.debug("connection's write loop stopping") 274 self.close() 275 self._send_queue.close() 276 277 while True: 278 if future and not future.done(): 279 future.set_exception(ConnectionError()) 280 if self._send_queue.empty(): 281 break 282 _, future = self._send_queue.get_nowait() 283 284 async def _ping_loop(self, delay, timeout): 285 mlog.debug("ping loop started") 286 try: 287 while True: 288 self._ping_event.clear() 289 290 with contextlib.suppress(asyncio.TimeoutError): 291 await aio.wait_for(self._ping_event.wait(), delay) 292 continue 293 294 mlog.debug("sending ping request") 295 await self.send(Data('HatChatter.Ping', b''), 296 last=False) 297 298 with contextlib.suppress(asyncio.TimeoutError): 299 await aio.wait_for(self._ping_event.wait(), timeout) 300 continue 301 302 mlog.debug("ping timeout") 303 break 304 305 except ConnectionError: 306 pass 307 308 finally: 309 mlog.debug("ping loop stopped") 310 self.close() 311 312 async def _read(self): 313 msg_len_len_bytes = await self._conn.readexactly(1) 314 msg_len_len = msg_len_len_bytes[0] 315 316 msg_len_bytes = await self._conn.readexactly(msg_len_len) 317 msg_len = _bebytes_to_uint(msg_len_bytes) 318 319 msg_bytes = await self._conn.readexactly(msg_len) 320 msg = _sbs_repo.decode('HatChatter.Msg', msg_bytes) 321 322 return msg 323 324 async def _write(self, msg): 325 msg_bytes = _sbs_repo.encode('HatChatter.Msg', msg) 326 msg_len = len(msg_bytes) 327 msg_len_bytes = _uint_to_bebytes(msg_len) 328 msg_len_len_bytes = [len(msg_len_bytes)] 329 330 await self._conn.write(bytes(itertools.chain(msg_len_len_bytes, 331 msg_len_bytes, 332 msg_bytes)))
Single connection
For creating new connection see connect
coroutine.
127 def __init__(self, 128 conn: tcp.Connection, 129 ping_delay: float, 130 ping_timeout: float, 131 receive_queue_size: int, 132 send_queue_size: int): 133 self._conn = conn 134 self._receive_queue = aio.Queue(receive_queue_size) 135 self._send_queue = aio.Queue(receive_queue_size) 136 self._loop = asyncio.get_running_loop() 137 self._next_msg_ids = itertools.count(1) 138 self._ping_event = asyncio.Event() 139 140 self.async_group.spawn(self._read_loop) 141 self.async_group.spawn(self._write_loop) 142 143 if ping_delay: 144 self.async_group.spawn(self._ping_loop, ping_delay, ping_timeout)
146 @property 147 def async_group(self) -> aio.Group: 148 """Async group""" 149 return self._conn.async_group
Async group
151 @property 152 def info(self) -> tcp.ConnectionInfo: 153 """Connection info""" 154 return self._conn.info
Connection info
156 async def receive(self) -> Msg: 157 """Receive message""" 158 try: 159 return await self._receive_queue.get() 160 161 except aio.QueueClosedError: 162 raise ConnectionError()
Receive message
164 async def send(self, 165 data: Data, 166 *, 167 conv: Conversation | None = None, 168 last: bool = True, 169 token: bool = True 170 ) -> Conversation: 171 """Send message 172 173 If `conv` is ``None``, new conversation is created. 174 175 """ 176 if self.is_closing: 177 raise ConnectionError() 178 179 msg_id = next(self._next_msg_ids) 180 181 if not conv: 182 conv = Conversation(owner=True, 183 first_id=msg_id) 184 185 msg = {'id': msg_id, 186 'first': conv.first_id, 187 'owner': conv.owner, 188 'token': token, 189 'last': last, 190 'data': {'type': data.type, 191 'data': data.data}} 192 await self._send_queue.put((msg, None)) 193 194 return conv
Send message
If conv
is None
, new conversation is created.