hat.drivers.icmp

 1from hat.drivers.icmp.endpoint import (Address,
 2                                       EndpointInfo,
 3                                       create_endpoint,
 4                                       Endpoint)
 5
 6
 7__all__ = ['Address',
 8           'EndpointInfo',
 9           'create_endpoint',
10           'Endpoint']
class Address(typing.NamedTuple):
20class Address(typing.NamedTuple):
21    host: str
22    port: int

Address(host, port)

Address(host: str, port: int)

Create new instance of Address(host, port)

host: str

Alias for field number 0

port: int

Alias for field number 1

class EndpointInfo(typing.NamedTuple):
25class EndpointInfo(typing.NamedTuple):
26    name: str | None
27    local_addr: Address

EndpointInfo(name, local_addr)

EndpointInfo(name: str | None, local_addr: Address)

Create new instance of EndpointInfo(name, local_addr)

name: str | None

Alias for field number 0

local_addr: Address

Alias for field number 1

async def create_endpoint( local_host: str = '0.0.0.0', *, name: str | None = None) -> Endpoint:
30async def create_endpoint(local_host: str = '0.0.0.0',
31                          *,
32                          name: str | None = None
33                          ) -> 'Endpoint':
34    loop = asyncio.get_running_loop()
35    local_addr = await _get_host_addr(loop, local_host)
36
37    endpoint = Endpoint()
38    endpoint._async_group = aio.Group()
39    endpoint._loop = loop
40    endpoint._echo_data = _echo_data_iter()
41    endpoint._echo_futures = {}
42    endpoint._info = EndpointInfo(name=name,
43                                  local_addr=local_addr)
44    endpoint._log = _create_logger_adapter(endpoint._info)
45
46    endpoint._socket = _create_socket(local_addr)
47
48    endpoint.async_group.spawn(endpoint._receive_loop)
49
50    return endpoint
class Endpoint(hat.aio.group.Resource):
 53class Endpoint(aio.Resource):
 54
 55    @property
 56    def async_group(self) -> aio.Group:
 57        return self._async_group
 58
 59    @property
 60    def info(self) -> EndpointInfo:
 61        return self._info
 62
 63    async def ping(self, remote_host: str):
 64        if not self.is_open:
 65            raise ConnectionError()
 66
 67        addr = await _get_host_addr(self._loop, remote_host)
 68
 69        if not self.is_open:
 70            raise ConnectionError()
 71
 72        data = next(self._echo_data)
 73
 74        # on linux, echo message identifier is chaged to
 75        # `self._socket.getsockname()[1]`
 76        req = common.EchoMsg(is_reply=False,
 77                             identifier=1,
 78                             sequence_number=1,
 79                             data=data)
 80        req_bytes = encoder.encode_msg(req)
 81
 82        future = self._loop.create_future()
 83
 84        try:
 85            self._echo_futures[data] = future
 86
 87            if sys.version_info[:2] >= (3, 11):
 88                await self._loop.sock_sendto(self._socket, req_bytes,
 89                                             tuple(addr))
 90
 91            else:
 92                self._socket.sendto(req_bytes, tuple(addr))
 93
 94            await future
 95
 96        finally:
 97            self._echo_futures.pop(data)
 98
 99    async def _receive_loop(self):
100        try:
101            while True:
102                msg_bytes = await self._loop.sock_recv(self._socket, 1024)
103
104                try:
105                    msg = encoder.decode_msg(memoryview(msg_bytes))
106
107                except Exception as e:
108                    self._log.warning("error decoding message: %s",
109                                      e, exc_info=e)
110                    continue
111
112                if isinstance(msg, common.EchoMsg):
113                    self._process_echo_msg(msg)
114
115        except Exception as e:
116            self._log.error("receive loop error: %s", e, exc_info=e)
117
118        finally:
119            self.close()
120
121            for future in self._echo_futures.values():
122                if not future.done():
123                    future.set_exception(ConnectionError())
124
125            self._socket.close()
126
127    def _process_echo_msg(self, msg):
128        if not msg.is_reply:
129            return
130
131        # TODO check identifier and sequence number
132
133        data = bytes(msg.data)
134
135        future = self._echo_futures.get(data)
136        if not future or future.done():
137            return
138
139        future.set_result(None)

Resource with lifetime control based on Group.

async_group: hat.aio.group.Group
55    @property
56    def async_group(self) -> aio.Group:
57        return self._async_group

Group controlling resource's lifetime.

info: EndpointInfo
59    @property
60    def info(self) -> EndpointInfo:
61        return self._info
async def ping(self, remote_host: str):
63    async def ping(self, remote_host: str):
64        if not self.is_open:
65            raise ConnectionError()
66
67        addr = await _get_host_addr(self._loop, remote_host)
68
69        if not self.is_open:
70            raise ConnectionError()
71
72        data = next(self._echo_data)
73
74        # on linux, echo message identifier is chaged to
75        # `self._socket.getsockname()[1]`
76        req = common.EchoMsg(is_reply=False,
77                             identifier=1,
78                             sequence_number=1,
79                             data=data)
80        req_bytes = encoder.encode_msg(req)
81
82        future = self._loop.create_future()
83
84        try:
85            self._echo_futures[data] = future
86
87            if sys.version_info[:2] >= (3, 11):
88                await self._loop.sock_sendto(self._socket, req_bytes,
89                                             tuple(addr))
90
91            else:
92                self._socket.sendto(req_bytes, tuple(addr))
93
94            await future
95
96        finally:
97            self._echo_futures.pop(data)