hat.drivers.icmp
19async def create_endpoint(local_host: str = '0.0.0.0') -> 'Endpoint': 20 loop = asyncio.get_running_loop() 21 local_addr = await _get_host_addr(loop, local_host) 22 23 endpoint = Endpoint() 24 endpoint._async_group = aio.Group() 25 endpoint._loop = loop 26 endpoint._echo_data = _echo_data_iter() 27 endpoint._echo_futures = {} 28 29 endpoint._socket = _create_socket(local_addr) 30 31 endpoint.async_group.spawn(endpoint._receive_loop) 32 33 return endpoint
class
Endpoint(hat.aio.group.Resource):
36class Endpoint(aio.Resource): 37 38 @property 39 def async_group(self) -> aio.Group: 40 return self._async_group 41 42 async def ping(self, remote_host: str): 43 if not self.is_open: 44 raise ConnectionError() 45 46 addr = await _get_host_addr(self._loop, remote_host) 47 48 if not self.is_open: 49 raise ConnectionError() 50 51 data = next(self._echo_data) 52 53 # on linux, echo message identifier is chaged to 54 # `self._socket.getsockname()[1]` 55 req = common.EchoMsg(is_reply=False, 56 identifier=1, 57 sequence_number=1, 58 data=data) 59 req_bytes = encoder.encode_msg(req) 60 61 future = self._loop.create_future() 62 63 try: 64 self._echo_futures[data] = future 65 66 if sys.version_info[:2] >= (3, 11): 67 await self._loop.sock_sendto(self._socket, req_bytes, addr) 68 69 else: 70 self._socket.sendto(req_bytes, addr) 71 72 await future 73 74 finally: 75 self._echo_futures.pop(data) 76 77 async def _receive_loop(self): 78 try: 79 while True: 80 msg_bytes = await self._loop.sock_recv(self._socket, 1024) 81 82 try: 83 msg = encoder.decode_msg(memoryview(msg_bytes)) 84 85 except Exception as e: 86 mlog.warning("error decoding message: %s", e, exc_info=e) 87 continue 88 89 if isinstance(msg, common.EchoMsg): 90 self._process_echo_msg(msg) 91 92 except Exception as e: 93 mlog.error("receive loop error: %s", e, exc_info=e) 94 95 finally: 96 self.close() 97 98 for future in self._echo_futures.values(): 99 if not future.done(): 100 future.set_exception(ConnectionError()) 101 102 self._socket.close() 103 104 def _process_echo_msg(self, msg): 105 if not msg.is_reply: 106 return 107 108 # TODO check identifier and sequence number 109 110 data = bytes(msg.data) 111 112 future = self._echo_futures.get(data) 113 if not future or future.done(): 114 return 115 116 future.set_result(None)
Resource with lifetime control based on Group
.
async def
ping(self, remote_host: str):
42 async def ping(self, remote_host: str): 43 if not self.is_open: 44 raise ConnectionError() 45 46 addr = await _get_host_addr(self._loop, remote_host) 47 48 if not self.is_open: 49 raise ConnectionError() 50 51 data = next(self._echo_data) 52 53 # on linux, echo message identifier is chaged to 54 # `self._socket.getsockname()[1]` 55 req = common.EchoMsg(is_reply=False, 56 identifier=1, 57 sequence_number=1, 58 data=data) 59 req_bytes = encoder.encode_msg(req) 60 61 future = self._loop.create_future() 62 63 try: 64 self._echo_futures[data] = future 65 66 if sys.version_info[:2] >= (3, 11): 67 await self._loop.sock_sendto(self._socket, req_bytes, addr) 68 69 else: 70 self._socket.sendto(req_bytes, addr) 71 72 await future 73 74 finally: 75 self._echo_futures.pop(data)