hat.drivers.icmp
class
Address(typing.NamedTuple):
Address(host, port)
class
EndpointInfo(typing.NamedTuple):
EndpointInfo(name, local_addr)
EndpointInfo(name: str | None, local_addr: Address)
Create new instance of EndpointInfo(name, local_addr)
30async def create_endpoint(local_host: str = '0.0.0.0', 31 *, 32 name: str | None = None 33 ) -> 'Endpoint': 34 loop = asyncio.get_running_loop() 35 local_addr = await _get_host_addr(loop, local_host) 36 37 endpoint = Endpoint() 38 endpoint._async_group = aio.Group() 39 endpoint._loop = loop 40 endpoint._echo_data = _echo_data_iter() 41 endpoint._echo_futures = {} 42 endpoint._info = EndpointInfo(name=name, 43 local_addr=local_addr) 44 endpoint._log = _create_logger_adapter(endpoint._info) 45 46 endpoint._socket = _create_socket(local_addr) 47 48 endpoint.async_group.spawn(endpoint._receive_loop) 49 50 return endpoint
class
Endpoint(hat.aio.group.Resource):
53class Endpoint(aio.Resource): 54 55 @property 56 def async_group(self) -> aio.Group: 57 return self._async_group 58 59 @property 60 def info(self) -> EndpointInfo: 61 return self._info 62 63 async def ping(self, remote_host: str): 64 if not self.is_open: 65 raise ConnectionError() 66 67 addr = await _get_host_addr(self._loop, remote_host) 68 69 if not self.is_open: 70 raise ConnectionError() 71 72 data = next(self._echo_data) 73 74 # on linux, echo message identifier is chaged to 75 # `self._socket.getsockname()[1]` 76 req = common.EchoMsg(is_reply=False, 77 identifier=1, 78 sequence_number=1, 79 data=data) 80 req_bytes = encoder.encode_msg(req) 81 82 future = self._loop.create_future() 83 84 try: 85 self._echo_futures[data] = future 86 87 if sys.version_info[:2] >= (3, 11): 88 await self._loop.sock_sendto(self._socket, req_bytes, 89 tuple(addr)) 90 91 else: 92 self._socket.sendto(req_bytes, tuple(addr)) 93 94 await future 95 96 finally: 97 self._echo_futures.pop(data) 98 99 async def _receive_loop(self): 100 try: 101 while True: 102 msg_bytes = await self._loop.sock_recv(self._socket, 1024) 103 104 try: 105 msg = encoder.decode_msg(memoryview(msg_bytes)) 106 107 except Exception as e: 108 self._log.warning("error decoding message: %s", 109 e, exc_info=e) 110 continue 111 112 if isinstance(msg, common.EchoMsg): 113 self._process_echo_msg(msg) 114 115 except Exception as e: 116 self._log.error("receive loop error: %s", e, exc_info=e) 117 118 finally: 119 self.close() 120 121 for future in self._echo_futures.values(): 122 if not future.done(): 123 future.set_exception(ConnectionError()) 124 125 self._socket.close() 126 127 def _process_echo_msg(self, msg): 128 if not msg.is_reply: 129 return 130 131 # TODO check identifier and sequence number 132 133 data = bytes(msg.data) 134 135 future = self._echo_futures.get(data) 136 if not future or future.done(): 137 return 138 139 future.set_result(None)
Resource with lifetime control based on Group.
async def
ping(self, remote_host: str):
63 async def ping(self, remote_host: str): 64 if not self.is_open: 65 raise ConnectionError() 66 67 addr = await _get_host_addr(self._loop, remote_host) 68 69 if not self.is_open: 70 raise ConnectionError() 71 72 data = next(self._echo_data) 73 74 # on linux, echo message identifier is chaged to 75 # `self._socket.getsockname()[1]` 76 req = common.EchoMsg(is_reply=False, 77 identifier=1, 78 sequence_number=1, 79 data=data) 80 req_bytes = encoder.encode_msg(req) 81 82 future = self._loop.create_future() 83 84 try: 85 self._echo_futures[data] = future 86 87 if sys.version_info[:2] >= (3, 11): 88 await self._loop.sock_sendto(self._socket, req_bytes, 89 tuple(addr)) 90 91 else: 92 self._socket.sendto(req_bytes, tuple(addr)) 93 94 await future 95 96 finally: 97 self._echo_futures.pop(data)