hat.drivers.chatter
Chatter communication protocol
1"""Chatter communication protocol""" 2 3import asyncio 4import contextlib 5import importlib.resources 6import itertools 7import logging 8import math 9import typing 10 11from hat import aio 12from hat import sbs 13from hat import util 14 15from hat.drivers import tcp 16 17 18mlog: logging.Logger = logging.getLogger(__name__) 19"""Module logger""" 20 21ConnectionCb: typing.TypeAlias = aio.AsyncCallable[['Connection'], None] 22"""Connection callback""" 23 24 25class Data(typing.NamedTuple): 26 type: str 27 data: util.Bytes 28 29 30class Conversation(typing.NamedTuple): 31 owner: bool 32 first_id: int 33 34 35class Msg(typing.NamedTuple): 36 data: Data 37 conv: Conversation 38 first: bool 39 last: bool 40 token: bool 41 42 43async def connect(addr: tcp.Address, 44 *, 45 ping_delay: float = 20, 46 ping_timeout: float = 20, 47 receive_queue_size: int = 1024, 48 send_queue_size: int = 1024, 49 **kwargs 50 ) -> 'Connection': 51 """Connect to remote server 52 53 Argument `addr` specifies remote server listening address. 54 55 If `ping_delay` is ``None`` or 0, ping requests are not sent. 56 Otherwise, it represents ping request delay in seconds. 57 58 Additional arguments are passed directly to `hat.drivers.tcp.connect`. 59 60 """ 61 conn = await tcp.connect(addr, **kwargs) 62 63 try: 64 return Connection(conn=conn, 65 ping_delay=ping_delay, 66 ping_timeout=ping_timeout, 67 receive_queue_size=receive_queue_size, 68 send_queue_size=send_queue_size) 69 70 except Exception: 71 await aio.uncancellable(conn.async_close()) 72 raise 73 74 75async def listen(connection_cb: ConnectionCb, 76 addr: tcp.Address, 77 *, 78 ping_delay: float = 20, 79 ping_timeout: float = 20, 80 receive_queue_size: int = 1024, 81 send_queue_size: int = 1024, 82 bind_connections: bool = True, 83 **kwargs 84 ) -> tcp.Server: 85 """Create listening server. 86 87 Argument `addr` specifies local server listening address. 88 89 If `ping_delay` is ``None`` or 0, ping requests are not sent. 90 Otherwise, it represents ping request delay in seconds. 91 92 Additional arguments are passed directly to `hat.drivers.tcp.listen`. 93 94 """ 95 96 log = _create_server_logger(kwargs.get('name'), None) 97 98 async def on_connection(conn): 99 try: 100 conn = Connection(conn=conn, 101 ping_delay=ping_delay, 102 ping_timeout=ping_timeout, 103 receive_queue_size=receive_queue_size, 104 send_queue_size=send_queue_size) 105 106 await aio.call(connection_cb, conn) 107 108 except Exception as e: 109 log.warning('connection callback error: %s', e, exc_info=e) 110 await aio.uncancellable(conn.async_close()) 111 112 except BaseException: 113 await aio.uncancellable(conn.async_close()) 114 raise 115 116 server = await tcp.listen(on_connection, addr, 117 bind_connections=bind_connections, 118 **kwargs) 119 120 log = _create_server_logger(kwargs.get('name'), server.info) 121 122 return server 123 124 125class Connection(aio.Resource): 126 """Single connection 127 128 For creating new connection see `connect` coroutine. 129 130 """ 131 132 def __init__(self, 133 conn: tcp.Connection, 134 ping_delay: float, 135 ping_timeout: float, 136 receive_queue_size: int, 137 send_queue_size: int): 138 self._conn = conn 139 self._receive_queue = aio.Queue(receive_queue_size) 140 self._send_queue = aio.Queue(receive_queue_size) 141 self._loop = asyncio.get_running_loop() 142 self._next_msg_ids = itertools.count(1) 143 self._ping_event = asyncio.Event() 144 self._log = _create_connection_logger(conn.info) 145 146 self.async_group.spawn(self._read_loop) 147 self.async_group.spawn(self._write_loop) 148 149 if ping_delay: 150 self.async_group.spawn(self._ping_loop, ping_delay, ping_timeout) 151 152 @property 153 def async_group(self) -> aio.Group: 154 """Async group""" 155 return self._conn.async_group 156 157 @property 158 def info(self) -> tcp.ConnectionInfo: 159 """Connection info""" 160 return self._conn.info 161 162 async def receive(self) -> Msg: 163 """Receive message""" 164 try: 165 return await self._receive_queue.get() 166 167 except aio.QueueClosedError: 168 raise ConnectionError() 169 170 async def send(self, 171 data: Data, 172 *, 173 conv: Conversation | None = None, 174 last: bool = True, 175 token: bool = True 176 ) -> Conversation: 177 """Send message 178 179 If `conv` is ``None``, new conversation is created. 180 181 """ 182 if self.is_closing: 183 raise ConnectionError() 184 185 msg_id = next(self._next_msg_ids) 186 187 if not conv: 188 conv = Conversation(owner=True, 189 first_id=msg_id) 190 191 msg = {'id': msg_id, 192 'first': conv.first_id, 193 'owner': conv.owner, 194 'token': token, 195 'last': last, 196 'data': {'type': data.type, 197 'data': data.data}} 198 await self._send_queue.put((msg, None)) 199 200 return conv 201 202 async def drain(self): 203 """Drain output buffer""" 204 future = self._loop.create_future() 205 try: 206 await self._send_queue.put((None, future)) 207 await future 208 209 except aio.QueueClosedError: 210 raise ConnectionError() 211 212 async def _read_loop(self): 213 self._log.debug("connection's read loop started") 214 try: 215 while True: 216 self._log.debug("waiting for incoming message") 217 data = await self._read() 218 msg = Msg( 219 data=Data(type=data['data']['type'], 220 data=data['data']['data']), 221 conv=Conversation(owner=not data['owner'], 222 first_id=data['first']), 223 first=data['owner'] and data['first'] == data['id'], 224 last=data['last'], 225 token=data['token']) 226 227 self._ping_event.set() 228 229 if msg.data.type == 'HatChatter.Ping': 230 self._log.debug("received ping request - " 231 "sending ping response") 232 await self.send(Data('HatChatter.Pong', b''), 233 conv=msg.conv) 234 235 elif msg.data.type == 'HatChatter.Pong': 236 self._log.debug("received ping response") 237 238 else: 239 self._log.debug("received message %s", msg.data.type) 240 await self._receive_queue.put(msg) 241 242 except ConnectionError: 243 self._log.debug("connection error") 244 245 except Exception as e: 246 self._log.error("read loop error: %s", e, exc_info=e) 247 248 finally: 249 self._log.debug("connection's read loop stopping") 250 self.close() 251 self._receive_queue.close() 252 253 async def _write_loop(self): 254 self._log.debug("connection's write loop started") 255 future = None 256 257 try: 258 while True: 259 self._log.debug("waiting for outgoing message") 260 msg, future = await self._send_queue.get() 261 262 if msg is None: 263 self._log.debug("draining output buffer") 264 await self._conn.drain() 265 266 else: 267 self._log.debug("writing message %s", msg['data']['type']) 268 await self._write(msg) 269 270 if future and not future.done(): 271 future.set_result(None) 272 273 except ConnectionError: 274 self._log.debug("connection error") 275 276 except Exception as e: 277 self._log.error("write loop error: %s", e, exc_info=e) 278 279 finally: 280 self._log.debug("connection's write loop stopping") 281 self.close() 282 self._send_queue.close() 283 284 while True: 285 if future and not future.done(): 286 future.set_exception(ConnectionError()) 287 if self._send_queue.empty(): 288 break 289 _, future = self._send_queue.get_nowait() 290 291 async def _ping_loop(self, delay, timeout): 292 self._log.debug("ping loop started") 293 try: 294 while True: 295 self._ping_event.clear() 296 297 with contextlib.suppress(asyncio.TimeoutError): 298 await aio.wait_for(self._ping_event.wait(), delay) 299 continue 300 301 self._log.debug("sending ping request") 302 await self.send(Data('HatChatter.Ping', b''), 303 last=False) 304 305 with contextlib.suppress(asyncio.TimeoutError): 306 await aio.wait_for(self._ping_event.wait(), timeout) 307 continue 308 309 self._log.warning("ping timeout") 310 break 311 312 except ConnectionError: 313 pass 314 315 finally: 316 self._log.debug("ping loop stopped") 317 self.close() 318 319 async def _read(self): 320 msg_len_len_bytes = await self._conn.readexactly(1) 321 msg_len_len = msg_len_len_bytes[0] 322 323 msg_len_bytes = await self._conn.readexactly(msg_len_len) 324 msg_len = _bebytes_to_uint(msg_len_bytes) 325 326 msg_bytes = await self._conn.readexactly(msg_len) 327 msg = _sbs_repo.decode('HatChatter.Msg', msg_bytes) 328 329 return msg 330 331 async def _write(self, msg): 332 msg_bytes = _sbs_repo.encode('HatChatter.Msg', msg) 333 msg_len = len(msg_bytes) 334 msg_len_bytes = _uint_to_bebytes(msg_len) 335 msg_len_len_bytes = [len(msg_len_bytes)] 336 337 await self._conn.write(bytes(itertools.chain(msg_len_len_bytes, 338 msg_len_bytes, 339 msg_bytes))) 340 341 342def _uint_to_bebytes(x): 343 bytes_len = max(math.ceil(x.bit_length() / 8), 1) 344 return x.to_bytes(bytes_len, 'big') 345 346 347def _bebytes_to_uint(b): 348 return int.from_bytes(b, 'big') 349 350 351with importlib.resources.as_file(importlib.resources.files(__package__) / 352 'sbs_repo.json') as _path: 353 _sbs_repo = sbs.Repository.from_json(_path) 354 355 356def _create_server_logger(name, info): 357 extra = {'meta': {'type': 'ChatterServer', 358 'name': name}} 359 360 if info is not None: 361 extra['meta']['addresses'] = [{'host': addr.host, 362 'port': addr.port} 363 for addr in info.addresses] 364 365 return logging.LoggerAdapter(mlog, extra) 366 367 368def _create_connection_logger(info): 369 extra = {'meta': {'type': 'ChatterConnection', 370 'name': info.name, 371 'local_addr': {'host': info.local_addr.host, 372 'port': info.local_addr.port}, 373 'remote_addr': {'host': info.remote_addr.host, 374 'port': info.remote_addr.port}}} 375 376 return logging.LoggerAdapter(mlog, extra)
Module logger
Connection callback
Data(type, data)
Conversation(owner, first_id)
36class Msg(typing.NamedTuple): 37 data: Data 38 conv: Conversation 39 first: bool 40 last: bool 41 token: bool
Msg(data, conv, first, last, token)
Create new instance of Msg(data, conv, first, last, token)
44async def connect(addr: tcp.Address, 45 *, 46 ping_delay: float = 20, 47 ping_timeout: float = 20, 48 receive_queue_size: int = 1024, 49 send_queue_size: int = 1024, 50 **kwargs 51 ) -> 'Connection': 52 """Connect to remote server 53 54 Argument `addr` specifies remote server listening address. 55 56 If `ping_delay` is ``None`` or 0, ping requests are not sent. 57 Otherwise, it represents ping request delay in seconds. 58 59 Additional arguments are passed directly to `hat.drivers.tcp.connect`. 60 61 """ 62 conn = await tcp.connect(addr, **kwargs) 63 64 try: 65 return Connection(conn=conn, 66 ping_delay=ping_delay, 67 ping_timeout=ping_timeout, 68 receive_queue_size=receive_queue_size, 69 send_queue_size=send_queue_size) 70 71 except Exception: 72 await aio.uncancellable(conn.async_close()) 73 raise
Connect to remote server
Argument addr specifies remote server listening address.
If ping_delay is None or 0, ping requests are not sent.
Otherwise, it represents ping request delay in seconds.
Additional arguments are passed directly to hat.drivers.tcp.connect.
76async def listen(connection_cb: ConnectionCb, 77 addr: tcp.Address, 78 *, 79 ping_delay: float = 20, 80 ping_timeout: float = 20, 81 receive_queue_size: int = 1024, 82 send_queue_size: int = 1024, 83 bind_connections: bool = True, 84 **kwargs 85 ) -> tcp.Server: 86 """Create listening server. 87 88 Argument `addr` specifies local server listening address. 89 90 If `ping_delay` is ``None`` or 0, ping requests are not sent. 91 Otherwise, it represents ping request delay in seconds. 92 93 Additional arguments are passed directly to `hat.drivers.tcp.listen`. 94 95 """ 96 97 log = _create_server_logger(kwargs.get('name'), None) 98 99 async def on_connection(conn): 100 try: 101 conn = Connection(conn=conn, 102 ping_delay=ping_delay, 103 ping_timeout=ping_timeout, 104 receive_queue_size=receive_queue_size, 105 send_queue_size=send_queue_size) 106 107 await aio.call(connection_cb, conn) 108 109 except Exception as e: 110 log.warning('connection callback error: %s', e, exc_info=e) 111 await aio.uncancellable(conn.async_close()) 112 113 except BaseException: 114 await aio.uncancellable(conn.async_close()) 115 raise 116 117 server = await tcp.listen(on_connection, addr, 118 bind_connections=bind_connections, 119 **kwargs) 120 121 log = _create_server_logger(kwargs.get('name'), server.info) 122 123 return server
Create listening server.
Argument addr specifies local server listening address.
If ping_delay is None or 0, ping requests are not sent.
Otherwise, it represents ping request delay in seconds.
Additional arguments are passed directly to hat.drivers.tcp.listen.
126class Connection(aio.Resource): 127 """Single connection 128 129 For creating new connection see `connect` coroutine. 130 131 """ 132 133 def __init__(self, 134 conn: tcp.Connection, 135 ping_delay: float, 136 ping_timeout: float, 137 receive_queue_size: int, 138 send_queue_size: int): 139 self._conn = conn 140 self._receive_queue = aio.Queue(receive_queue_size) 141 self._send_queue = aio.Queue(receive_queue_size) 142 self._loop = asyncio.get_running_loop() 143 self._next_msg_ids = itertools.count(1) 144 self._ping_event = asyncio.Event() 145 self._log = _create_connection_logger(conn.info) 146 147 self.async_group.spawn(self._read_loop) 148 self.async_group.spawn(self._write_loop) 149 150 if ping_delay: 151 self.async_group.spawn(self._ping_loop, ping_delay, ping_timeout) 152 153 @property 154 def async_group(self) -> aio.Group: 155 """Async group""" 156 return self._conn.async_group 157 158 @property 159 def info(self) -> tcp.ConnectionInfo: 160 """Connection info""" 161 return self._conn.info 162 163 async def receive(self) -> Msg: 164 """Receive message""" 165 try: 166 return await self._receive_queue.get() 167 168 except aio.QueueClosedError: 169 raise ConnectionError() 170 171 async def send(self, 172 data: Data, 173 *, 174 conv: Conversation | None = None, 175 last: bool = True, 176 token: bool = True 177 ) -> Conversation: 178 """Send message 179 180 If `conv` is ``None``, new conversation is created. 181 182 """ 183 if self.is_closing: 184 raise ConnectionError() 185 186 msg_id = next(self._next_msg_ids) 187 188 if not conv: 189 conv = Conversation(owner=True, 190 first_id=msg_id) 191 192 msg = {'id': msg_id, 193 'first': conv.first_id, 194 'owner': conv.owner, 195 'token': token, 196 'last': last, 197 'data': {'type': data.type, 198 'data': data.data}} 199 await self._send_queue.put((msg, None)) 200 201 return conv 202 203 async def drain(self): 204 """Drain output buffer""" 205 future = self._loop.create_future() 206 try: 207 await self._send_queue.put((None, future)) 208 await future 209 210 except aio.QueueClosedError: 211 raise ConnectionError() 212 213 async def _read_loop(self): 214 self._log.debug("connection's read loop started") 215 try: 216 while True: 217 self._log.debug("waiting for incoming message") 218 data = await self._read() 219 msg = Msg( 220 data=Data(type=data['data']['type'], 221 data=data['data']['data']), 222 conv=Conversation(owner=not data['owner'], 223 first_id=data['first']), 224 first=data['owner'] and data['first'] == data['id'], 225 last=data['last'], 226 token=data['token']) 227 228 self._ping_event.set() 229 230 if msg.data.type == 'HatChatter.Ping': 231 self._log.debug("received ping request - " 232 "sending ping response") 233 await self.send(Data('HatChatter.Pong', b''), 234 conv=msg.conv) 235 236 elif msg.data.type == 'HatChatter.Pong': 237 self._log.debug("received ping response") 238 239 else: 240 self._log.debug("received message %s", msg.data.type) 241 await self._receive_queue.put(msg) 242 243 except ConnectionError: 244 self._log.debug("connection error") 245 246 except Exception as e: 247 self._log.error("read loop error: %s", e, exc_info=e) 248 249 finally: 250 self._log.debug("connection's read loop stopping") 251 self.close() 252 self._receive_queue.close() 253 254 async def _write_loop(self): 255 self._log.debug("connection's write loop started") 256 future = None 257 258 try: 259 while True: 260 self._log.debug("waiting for outgoing message") 261 msg, future = await self._send_queue.get() 262 263 if msg is None: 264 self._log.debug("draining output buffer") 265 await self._conn.drain() 266 267 else: 268 self._log.debug("writing message %s", msg['data']['type']) 269 await self._write(msg) 270 271 if future and not future.done(): 272 future.set_result(None) 273 274 except ConnectionError: 275 self._log.debug("connection error") 276 277 except Exception as e: 278 self._log.error("write loop error: %s", e, exc_info=e) 279 280 finally: 281 self._log.debug("connection's write loop stopping") 282 self.close() 283 self._send_queue.close() 284 285 while True: 286 if future and not future.done(): 287 future.set_exception(ConnectionError()) 288 if self._send_queue.empty(): 289 break 290 _, future = self._send_queue.get_nowait() 291 292 async def _ping_loop(self, delay, timeout): 293 self._log.debug("ping loop started") 294 try: 295 while True: 296 self._ping_event.clear() 297 298 with contextlib.suppress(asyncio.TimeoutError): 299 await aio.wait_for(self._ping_event.wait(), delay) 300 continue 301 302 self._log.debug("sending ping request") 303 await self.send(Data('HatChatter.Ping', b''), 304 last=False) 305 306 with contextlib.suppress(asyncio.TimeoutError): 307 await aio.wait_for(self._ping_event.wait(), timeout) 308 continue 309 310 self._log.warning("ping timeout") 311 break 312 313 except ConnectionError: 314 pass 315 316 finally: 317 self._log.debug("ping loop stopped") 318 self.close() 319 320 async def _read(self): 321 msg_len_len_bytes = await self._conn.readexactly(1) 322 msg_len_len = msg_len_len_bytes[0] 323 324 msg_len_bytes = await self._conn.readexactly(msg_len_len) 325 msg_len = _bebytes_to_uint(msg_len_bytes) 326 327 msg_bytes = await self._conn.readexactly(msg_len) 328 msg = _sbs_repo.decode('HatChatter.Msg', msg_bytes) 329 330 return msg 331 332 async def _write(self, msg): 333 msg_bytes = _sbs_repo.encode('HatChatter.Msg', msg) 334 msg_len = len(msg_bytes) 335 msg_len_bytes = _uint_to_bebytes(msg_len) 336 msg_len_len_bytes = [len(msg_len_bytes)] 337 338 await self._conn.write(bytes(itertools.chain(msg_len_len_bytes, 339 msg_len_bytes, 340 msg_bytes)))
Single connection
For creating new connection see connect coroutine.
133 def __init__(self, 134 conn: tcp.Connection, 135 ping_delay: float, 136 ping_timeout: float, 137 receive_queue_size: int, 138 send_queue_size: int): 139 self._conn = conn 140 self._receive_queue = aio.Queue(receive_queue_size) 141 self._send_queue = aio.Queue(receive_queue_size) 142 self._loop = asyncio.get_running_loop() 143 self._next_msg_ids = itertools.count(1) 144 self._ping_event = asyncio.Event() 145 self._log = _create_connection_logger(conn.info) 146 147 self.async_group.spawn(self._read_loop) 148 self.async_group.spawn(self._write_loop) 149 150 if ping_delay: 151 self.async_group.spawn(self._ping_loop, ping_delay, ping_timeout)
153 @property 154 def async_group(self) -> aio.Group: 155 """Async group""" 156 return self._conn.async_group
Async group
158 @property 159 def info(self) -> tcp.ConnectionInfo: 160 """Connection info""" 161 return self._conn.info
Connection info
163 async def receive(self) -> Msg: 164 """Receive message""" 165 try: 166 return await self._receive_queue.get() 167 168 except aio.QueueClosedError: 169 raise ConnectionError()
Receive message
171 async def send(self, 172 data: Data, 173 *, 174 conv: Conversation | None = None, 175 last: bool = True, 176 token: bool = True 177 ) -> Conversation: 178 """Send message 179 180 If `conv` is ``None``, new conversation is created. 181 182 """ 183 if self.is_closing: 184 raise ConnectionError() 185 186 msg_id = next(self._next_msg_ids) 187 188 if not conv: 189 conv = Conversation(owner=True, 190 first_id=msg_id) 191 192 msg = {'id': msg_id, 193 'first': conv.first_id, 194 'owner': conv.owner, 195 'token': token, 196 'last': last, 197 'data': {'type': data.type, 198 'data': data.data}} 199 await self._send_queue.put((msg, None)) 200 201 return conv
Send message
If conv is None, new conversation is created.