hat.drivers.udp

Asyncio UDP endpoint wrapper

  1"""Asyncio UDP endpoint wrapper"""
  2
  3import asyncio
  4import functools
  5import logging
  6import typing
  7
  8from hat import aio
  9from hat import util
 10
 11from hat.drivers import common
 12
 13
 14mlog: logging.Logger = logging.getLogger(__name__)
 15"""Module logger"""
 16
 17
 18class Address(typing.NamedTuple):
 19    host: str
 20    port: int
 21
 22
 23class EndpointInfo(typing.NamedTuple):
 24    name: str | None
 25    local_addr: Address
 26    remote_addr: Address | None
 27
 28
 29async def create(local_addr: Address | None = None,
 30                 remote_addr: Address | None = None,
 31                 *,
 32                 name: str | None = None,
 33                 queue_size: int = 0,
 34                 **kwargs
 35                 ) -> 'Endpoint':
 36    """Create new UDP endpoint
 37
 38    Args:
 39        local_addr: local address
 40        remote_addr: remote address
 41        name: endpoint name
 42        queue_size: receive queue max size
 43        kwargs: additional arguments passed to
 44            :meth:`asyncio.AbstractEventLoop.create_datagram_endpoint`
 45
 46    """
 47    endpoint = Endpoint()
 48    endpoint._local_addr = local_addr
 49    endpoint._remote_addr = remote_addr
 50    endpoint._async_group = aio.Group()
 51    endpoint._queue = aio.Queue(queue_size)
 52    endpoint._transport = None
 53    endpoint._protocol = None
 54    endpoint._log = _create_logger(name, None)
 55    endpoint._comm_log = _CommunicationLogger(name, None)
 56
 57    loop = asyncio.get_running_loop()
 58    create_protocol = functools.partial(Protocol, endpoint)
 59    endpoint._transport, endpoint._protocol = \
 60        await loop.create_datagram_endpoint(create_protocol, local_addr,
 61                                            remote_addr, **kwargs)
 62
 63    try:
 64        endpoint._async_group.spawn(aio.call_on_cancel, endpoint._on_close)
 65
 66        sockname = endpoint._transport.get_extra_info('sockname')
 67        peername = endpoint._transport.get_extra_info('peername')
 68        endpoint._info = EndpointInfo(
 69            name=name,
 70            local_addr=Address(sockname[0], sockname[1]),
 71            remote_addr=(Address(peername[0], peername[1])
 72                         if peername else None))
 73
 74        endpoint._log = _create_logger(name, endpoint._info)
 75        endpoint._comm_log = _CommunicationLogger(name, endpoint._info)
 76
 77    except BaseException:
 78        await aio.uncancellable(endpoint.async_close())
 79        raise
 80
 81    endpoint._comm_log.log(common.CommLogAction.OPEN)
 82
 83    return endpoint
 84
 85
 86class Endpoint(aio.Resource):
 87    """UDP endpoint"""
 88
 89    @property
 90    def async_group(self) -> aio.Group:
 91        """Async group"""
 92        return self._async_group
 93
 94    @property
 95    def info(self) -> EndpointInfo:
 96        """Endpoint info"""
 97        return self._info
 98
 99    @property
100    def empty(self) -> bool:
101        """Is receive queue empty"""
102        return self._queue.empty()
103
104    def send(self,
105             data: util.Bytes,
106             remote_addr: Address | None = None):
107        """Send datagram
108
109        If `remote_addr` is not set, `remote_addr` passed to `create` is used.
110
111        """
112        if not self.is_open or not self._transport:
113            raise ConnectionError()
114
115        msg = data, (remote_addr or self._remote_addr)
116        self._comm_log.log(common.CommLogAction.SEND, msg)
117
118        self._transport.sendto(msg[0], msg[1])
119
120    async def receive(self) -> tuple[util.Bytes, Address]:
121        """Receive datagram"""
122        try:
123            data, addr = await self._queue.get()
124
125        except aio.QueueClosedError:
126            raise ConnectionError()
127
128        return data, addr
129
130    def _on_close(self):
131        self._queue.close()
132
133        if self._transport:
134            self._transport.close()
135
136        self._transport = None
137        self._protocol = None
138
139        self._comm_log.log(common.CommLogAction.CLOSE)
140
141
142class Protocol(asyncio.DatagramProtocol):
143
144    def __init__(self, endpoint: Endpoint):
145        self._endpoint = endpoint
146
147    def connection_lost(self, exc: Exception | None):
148        self._endpoint._log.debug('connection lost')
149
150        self._endpoint.close()
151
152    def datagram_received(self, data: util.Bytes, addr: tuple):
153        msg = data, Address(addr[0], addr[1])
154        self._endpoint._comm_log.log(common.CommLogAction.RECEIVE, msg)
155
156        try:
157            self._endpoint._queue.put_nowait(msg)
158
159        except aio.QueueFullError:
160            self._endpoint._log.warning('receive queue full - '
161                                        'dropping datagram')
162
163
164def _create_logger(name, info):
165    extra = {'meta': {'type': 'UdpEndpoint',
166                      'name': name}}
167
168    if info is not None:
169        extra['meta']['local_addr'] = {'host': info.local_addr.host,
170                                       'port': info.local_addr.port}
171
172        if info.remote_addr is not None:
173            extra['meta']['remote_addr'] = {'host': info.remote_addr.host,
174                                            'port': info.remote_addr.port}
175
176    return logging.LoggerAdapter(mlog, extra)
177
178
179class _CommunicationLogger:
180
181    def __init__(self,
182                 name: str | None,
183                 info: EndpointInfo | None):
184        extra = {'meta': {'type': 'UdpEndpoint',
185                          'communication': True,
186                          'name': name}}
187
188        if info is not None:
189            extra['meta']['local_addr'] = {'host': info.local_addr.host,
190                                           'port': info.local_addr.port}
191
192            if info.remote_addr is not None:
193                extra['meta']['remote_addr'] = {'host': info.remote_addr.host,
194                                                'port': info.remote_addr.port}
195
196        self._log = logging.LoggerAdapter(mlog, extra)
197
198    def log(self,
199            action: common.CommLogAction,
200            msg: tuple[util.Bytes, Address] | None = None):
201        if not self._log.isEnabledFor(logging.DEBUG):
202            return
203
204        if msg is None:
205            self._log.debug(action.value, stacklevel=2)
206
207        else:
208            self._log.debug('%s (data=(%s) remote=%s)',
209                            action.value, msg[0].hex(' '), tuple(msg[1]),
210                            stacklevel=2)
mlog: logging.Logger = <Logger hat.drivers.udp (WARNING)>

Module logger

class Address(typing.NamedTuple):
19class Address(typing.NamedTuple):
20    host: str
21    port: int

Address(host, port)

Address(host: str, port: int)

Create new instance of Address(host, port)

host: str

Alias for field number 0

port: int

Alias for field number 1

class EndpointInfo(typing.NamedTuple):
24class EndpointInfo(typing.NamedTuple):
25    name: str | None
26    local_addr: Address
27    remote_addr: Address | None

EndpointInfo(name, local_addr, remote_addr)

EndpointInfo( name: str | None, local_addr: Address, remote_addr: Address | None)

Create new instance of EndpointInfo(name, local_addr, remote_addr)

name: str | None

Alias for field number 0

local_addr: Address

Alias for field number 1

remote_addr: Address | None

Alias for field number 2

async def create( local_addr: Address | None = None, remote_addr: Address | None = None, *, name: str | None = None, queue_size: int = 0, **kwargs) -> Endpoint:
30async def create(local_addr: Address | None = None,
31                 remote_addr: Address | None = None,
32                 *,
33                 name: str | None = None,
34                 queue_size: int = 0,
35                 **kwargs
36                 ) -> 'Endpoint':
37    """Create new UDP endpoint
38
39    Args:
40        local_addr: local address
41        remote_addr: remote address
42        name: endpoint name
43        queue_size: receive queue max size
44        kwargs: additional arguments passed to
45            :meth:`asyncio.AbstractEventLoop.create_datagram_endpoint`
46
47    """
48    endpoint = Endpoint()
49    endpoint._local_addr = local_addr
50    endpoint._remote_addr = remote_addr
51    endpoint._async_group = aio.Group()
52    endpoint._queue = aio.Queue(queue_size)
53    endpoint._transport = None
54    endpoint._protocol = None
55    endpoint._log = _create_logger(name, None)
56    endpoint._comm_log = _CommunicationLogger(name, None)
57
58    loop = asyncio.get_running_loop()
59    create_protocol = functools.partial(Protocol, endpoint)
60    endpoint._transport, endpoint._protocol = \
61        await loop.create_datagram_endpoint(create_protocol, local_addr,
62                                            remote_addr, **kwargs)
63
64    try:
65        endpoint._async_group.spawn(aio.call_on_cancel, endpoint._on_close)
66
67        sockname = endpoint._transport.get_extra_info('sockname')
68        peername = endpoint._transport.get_extra_info('peername')
69        endpoint._info = EndpointInfo(
70            name=name,
71            local_addr=Address(sockname[0], sockname[1]),
72            remote_addr=(Address(peername[0], peername[1])
73                         if peername else None))
74
75        endpoint._log = _create_logger(name, endpoint._info)
76        endpoint._comm_log = _CommunicationLogger(name, endpoint._info)
77
78    except BaseException:
79        await aio.uncancellable(endpoint.async_close())
80        raise
81
82    endpoint._comm_log.log(common.CommLogAction.OPEN)
83
84    return endpoint

Create new UDP endpoint

Arguments:
  • local_addr: local address
  • remote_addr: remote address
  • name: endpoint name
  • queue_size: receive queue max size
  • kwargs: additional arguments passed to asyncio.AbstractEventLoop.create_datagram_endpoint()
class Endpoint(hat.aio.group.Resource):
 87class Endpoint(aio.Resource):
 88    """UDP endpoint"""
 89
 90    @property
 91    def async_group(self) -> aio.Group:
 92        """Async group"""
 93        return self._async_group
 94
 95    @property
 96    def info(self) -> EndpointInfo:
 97        """Endpoint info"""
 98        return self._info
 99
100    @property
101    def empty(self) -> bool:
102        """Is receive queue empty"""
103        return self._queue.empty()
104
105    def send(self,
106             data: util.Bytes,
107             remote_addr: Address | None = None):
108        """Send datagram
109
110        If `remote_addr` is not set, `remote_addr` passed to `create` is used.
111
112        """
113        if not self.is_open or not self._transport:
114            raise ConnectionError()
115
116        msg = data, (remote_addr or self._remote_addr)
117        self._comm_log.log(common.CommLogAction.SEND, msg)
118
119        self._transport.sendto(msg[0], msg[1])
120
121    async def receive(self) -> tuple[util.Bytes, Address]:
122        """Receive datagram"""
123        try:
124            data, addr = await self._queue.get()
125
126        except aio.QueueClosedError:
127            raise ConnectionError()
128
129        return data, addr
130
131    def _on_close(self):
132        self._queue.close()
133
134        if self._transport:
135            self._transport.close()
136
137        self._transport = None
138        self._protocol = None
139
140        self._comm_log.log(common.CommLogAction.CLOSE)

UDP endpoint

async_group: hat.aio.group.Group
90    @property
91    def async_group(self) -> aio.Group:
92        """Async group"""
93        return self._async_group

Async group

info: EndpointInfo
95    @property
96    def info(self) -> EndpointInfo:
97        """Endpoint info"""
98        return self._info

Endpoint info

empty: bool
100    @property
101    def empty(self) -> bool:
102        """Is receive queue empty"""
103        return self._queue.empty()

Is receive queue empty

def send( self, data: bytes | bytearray | memoryview, remote_addr: Address | None = None):
105    def send(self,
106             data: util.Bytes,
107             remote_addr: Address | None = None):
108        """Send datagram
109
110        If `remote_addr` is not set, `remote_addr` passed to `create` is used.
111
112        """
113        if not self.is_open or not self._transport:
114            raise ConnectionError()
115
116        msg = data, (remote_addr or self._remote_addr)
117        self._comm_log.log(common.CommLogAction.SEND, msg)
118
119        self._transport.sendto(msg[0], msg[1])

Send datagram

If remote_addr is not set, remote_addr passed to create is used.

async def receive(self) -> tuple[bytes | bytearray | memoryview, Address]:
121    async def receive(self) -> tuple[util.Bytes, Address]:
122        """Receive datagram"""
123        try:
124            data, addr = await self._queue.get()
125
126        except aio.QueueClosedError:
127            raise ConnectionError()
128
129        return data, addr

Receive datagram

class Protocol(asyncio.protocols.DatagramProtocol):
143class Protocol(asyncio.DatagramProtocol):
144
145    def __init__(self, endpoint: Endpoint):
146        self._endpoint = endpoint
147
148    def connection_lost(self, exc: Exception | None):
149        self._endpoint._log.debug('connection lost')
150
151        self._endpoint.close()
152
153    def datagram_received(self, data: util.Bytes, addr: tuple):
154        msg = data, Address(addr[0], addr[1])
155        self._endpoint._comm_log.log(common.CommLogAction.RECEIVE, msg)
156
157        try:
158            self._endpoint._queue.put_nowait(msg)
159
160        except aio.QueueFullError:
161            self._endpoint._log.warning('receive queue full - '
162                                        'dropping datagram')

Interface for datagram protocol.

Protocol(endpoint: Endpoint)
145    def __init__(self, endpoint: Endpoint):
146        self._endpoint = endpoint
def connection_lost(self, exc: Exception | None):
148    def connection_lost(self, exc: Exception | None):
149        self._endpoint._log.debug('connection lost')
150
151        self._endpoint.close()

Called when the connection is lost or closed.

The argument is an exception object or None (the latter meaning a regular EOF is received or the connection was aborted or closed).

def datagram_received(self, data: bytes | bytearray | memoryview, addr: tuple):
153    def datagram_received(self, data: util.Bytes, addr: tuple):
154        msg = data, Address(addr[0], addr[1])
155        self._endpoint._comm_log.log(common.CommLogAction.RECEIVE, msg)
156
157        try:
158            self._endpoint._queue.put_nowait(msg)
159
160        except aio.QueueFullError:
161            self._endpoint._log.warning('receive queue full - '
162                                        'dropping datagram')

Called when some datagram is received.