hat.drivers.icmp

1from hat.drivers.icmp.endpoint import (create_endpoint,
2                                       Endpoint)
3
4
5__all__ = ['create_endpoint',
6           'Endpoint']
async def create_endpoint(local_host: str = '0.0.0.0') -> Endpoint:
19async def create_endpoint(local_host: str = '0.0.0.0') -> 'Endpoint':
20    loop = asyncio.get_running_loop()
21    local_addr = await _get_host_addr(loop, local_host)
22
23    endpoint = Endpoint()
24    endpoint._async_group = aio.Group()
25    endpoint._loop = loop
26    endpoint._echo_data = _echo_data_iter()
27    endpoint._echo_futures = {}
28
29    endpoint._socket = _create_socket(local_addr)
30
31    endpoint.async_group.spawn(endpoint._receive_loop)
32
33    return endpoint
class Endpoint(hat.aio.group.Resource):
 36class Endpoint(aio.Resource):
 37
 38    @property
 39    def async_group(self) -> aio.Group:
 40        return self._async_group
 41
 42    async def ping(self, remote_host: str):
 43        if not self.is_open:
 44            raise ConnectionError()
 45
 46        addr = await _get_host_addr(self._loop, remote_host)
 47
 48        if not self.is_open:
 49            raise ConnectionError()
 50
 51        data = next(self._echo_data)
 52
 53        # on linux, echo message identifier is chaged to
 54        # `self._socket.getsockname()[1]`
 55        req = common.EchoMsg(is_reply=False,
 56                             identifier=1,
 57                             sequence_number=1,
 58                             data=data)
 59        req_bytes = encoder.encode_msg(req)
 60
 61        future = self._loop.create_future()
 62
 63        try:
 64            self._echo_futures[data] = future
 65
 66            if sys.version_info[:2] >= (3, 11):
 67                await self._loop.sock_sendto(self._socket, req_bytes, addr)
 68
 69            else:
 70                self._socket.sendto(req_bytes, addr)
 71
 72            await future
 73
 74        finally:
 75            self._echo_futures.pop(data)
 76
 77    async def _receive_loop(self):
 78        try:
 79            while True:
 80                msg_bytes = await self._loop.sock_recv(self._socket, 1024)
 81
 82                try:
 83                    msg = encoder.decode_msg(memoryview(msg_bytes))
 84
 85                except Exception as e:
 86                    mlog.warning("error decoding message: %s", e, exc_info=e)
 87                    continue
 88
 89                if isinstance(msg, common.EchoMsg):
 90                    self._process_echo_msg(msg)
 91
 92        except Exception as e:
 93            mlog.error("receive loop error: %s", e, exc_info=e)
 94
 95        finally:
 96            self.close()
 97
 98            for future in self._echo_futures.values():
 99                if not future.done():
100                    future.set_exception(ConnectionError())
101
102            self._socket.close()
103
104    def _process_echo_msg(self, msg):
105        if not msg.is_reply:
106            return
107
108        # TODO check identifier and sequence number
109
110        data = bytes(msg.data)
111
112        future = self._echo_futures.get(data)
113        if not future or future.done():
114            return
115
116        future.set_result(None)

Resource with lifetime control based on Group.

async_group: hat.aio.group.Group
38    @property
39    def async_group(self) -> aio.Group:
40        return self._async_group

Group controlling resource's lifetime.

async def ping(self, remote_host: str):
42    async def ping(self, remote_host: str):
43        if not self.is_open:
44            raise ConnectionError()
45
46        addr = await _get_host_addr(self._loop, remote_host)
47
48        if not self.is_open:
49            raise ConnectionError()
50
51        data = next(self._echo_data)
52
53        # on linux, echo message identifier is chaged to
54        # `self._socket.getsockname()[1]`
55        req = common.EchoMsg(is_reply=False,
56                             identifier=1,
57                             sequence_number=1,
58                             data=data)
59        req_bytes = encoder.encode_msg(req)
60
61        future = self._loop.create_future()
62
63        try:
64            self._echo_futures[data] = future
65
66            if sys.version_info[:2] >= (3, 11):
67                await self._loop.sock_sendto(self._socket, req_bytes, addr)
68
69            else:
70                self._socket.sendto(req_bytes, addr)
71
72            await future
73
74        finally:
75            self._echo_futures.pop(data)