hat.drivers.udp

Asyncio UDP endpoint wrapper

  1"""Asyncio UDP endpoint wrapper"""
  2
  3import asyncio
  4import logging
  5import typing
  6
  7from hat import aio
  8from hat import util
  9
 10
 11mlog: logging.Logger = logging.getLogger(__name__)
 12"""Module logger"""
 13
 14
 15class Address(typing.NamedTuple):
 16    host: str
 17    port: int
 18
 19
 20class EndpointInfo(typing.NamedTuple):
 21    local_addr: Address
 22    remote_addr: Address | None
 23
 24
 25async def create(local_addr: Address | None = None,
 26                 remote_addr: Address | None = None,
 27                 queue_size: int = 0,
 28                 **kwargs
 29                 ) -> 'Endpoint':
 30    """Create new UDP endpoint
 31
 32    Args:
 33        local_addr: local address
 34        remote_addr: remote address
 35        queue_size: receive queue max size
 36        kwargs: additional arguments passed to
 37            :meth:`asyncio.AbstractEventLoop.create_datagram_endpoint`
 38
 39    """
 40    endpoint = Endpoint()
 41    endpoint._local_addr = local_addr
 42    endpoint._remote_addr = remote_addr
 43    endpoint._async_group = aio.Group()
 44    endpoint._queue = aio.Queue(queue_size)
 45
 46    class Protocol(asyncio.DatagramProtocol):
 47
 48        def connection_lost(self, exc):
 49            endpoint._async_group.close()
 50
 51        def datagram_received(self, data, addr):
 52            try:
 53                endpoint._queue.put_nowait(
 54                    (data, Address(addr[0], addr[1])))
 55
 56            except aio.QueueFullError:
 57                mlog.warning('receive queue full - dropping datagram')
 58
 59    loop = asyncio.get_running_loop()
 60    endpoint._transport, endpoint._protocol = \
 61        await loop.create_datagram_endpoint(Protocol, local_addr, remote_addr,
 62                                            **kwargs)
 63
 64    endpoint._async_group.spawn(aio.call_on_cancel, endpoint._transport.close)
 65    endpoint._async_group.spawn(aio.call_on_cancel, endpoint._queue.close)
 66
 67    sockname = endpoint._transport.get_extra_info('sockname')
 68    peername = endpoint._transport.get_extra_info('peername')
 69    endpoint._info = EndpointInfo(
 70        local_addr=Address(sockname[0], sockname[1]),
 71        remote_addr=Address(peername[0], peername[1]) if peername else None)
 72
 73    return endpoint
 74
 75
 76class Endpoint(aio.Resource):
 77    """UDP endpoint"""
 78
 79    @property
 80    def async_group(self) -> aio.Group:
 81        """Async group"""
 82        return self._async_group
 83
 84    @property
 85    def info(self) -> EndpointInfo:
 86        """Endpoint info"""
 87        return self._info
 88
 89    @property
 90    def empty(self) -> bool:
 91        """Is receive queue empty"""
 92        return self._queue.empty()
 93
 94    def send(self,
 95             data: util.Bytes,
 96             remote_addr: Address | None = None):
 97        """Send datagram
 98
 99        If `remote_addr` is not set, `remote_addr` passed to :func:`create`
100        is used.
101
102        """
103        if not self.is_open:
104            raise ConnectionError()
105
106        self._transport.sendto(data, remote_addr or self._remote_addr)
107
108    async def receive(self) -> tuple[util.Bytes, Address]:
109        """Receive datagram"""
110        try:
111            data, addr = await self._queue.get()
112
113        except aio.QueueClosedError:
114            raise ConnectionError()
115
116        return data, addr
mlog: logging.Logger = <Logger hat.drivers.udp (WARNING)>

Module logger

class Address(typing.NamedTuple):
16class Address(typing.NamedTuple):
17    host: str
18    port: int

Address(host, port)

Address(host: str, port: int)

Create new instance of Address(host, port)

host: str

Alias for field number 0

port: int

Alias for field number 1

class EndpointInfo(typing.NamedTuple):
21class EndpointInfo(typing.NamedTuple):
22    local_addr: Address
23    remote_addr: Address | None

EndpointInfo(local_addr, remote_addr)

EndpointInfo( local_addr: Address, remote_addr: Address | None)

Create new instance of EndpointInfo(local_addr, remote_addr)

local_addr: Address

Alias for field number 0

remote_addr: Address | None

Alias for field number 1

async def create( local_addr: Address | None = None, remote_addr: Address | None = None, queue_size: int = 0, **kwargs) -> Endpoint:
26async def create(local_addr: Address | None = None,
27                 remote_addr: Address | None = None,
28                 queue_size: int = 0,
29                 **kwargs
30                 ) -> 'Endpoint':
31    """Create new UDP endpoint
32
33    Args:
34        local_addr: local address
35        remote_addr: remote address
36        queue_size: receive queue max size
37        kwargs: additional arguments passed to
38            :meth:`asyncio.AbstractEventLoop.create_datagram_endpoint`
39
40    """
41    endpoint = Endpoint()
42    endpoint._local_addr = local_addr
43    endpoint._remote_addr = remote_addr
44    endpoint._async_group = aio.Group()
45    endpoint._queue = aio.Queue(queue_size)
46
47    class Protocol(asyncio.DatagramProtocol):
48
49        def connection_lost(self, exc):
50            endpoint._async_group.close()
51
52        def datagram_received(self, data, addr):
53            try:
54                endpoint._queue.put_nowait(
55                    (data, Address(addr[0], addr[1])))
56
57            except aio.QueueFullError:
58                mlog.warning('receive queue full - dropping datagram')
59
60    loop = asyncio.get_running_loop()
61    endpoint._transport, endpoint._protocol = \
62        await loop.create_datagram_endpoint(Protocol, local_addr, remote_addr,
63                                            **kwargs)
64
65    endpoint._async_group.spawn(aio.call_on_cancel, endpoint._transport.close)
66    endpoint._async_group.spawn(aio.call_on_cancel, endpoint._queue.close)
67
68    sockname = endpoint._transport.get_extra_info('sockname')
69    peername = endpoint._transport.get_extra_info('peername')
70    endpoint._info = EndpointInfo(
71        local_addr=Address(sockname[0], sockname[1]),
72        remote_addr=Address(peername[0], peername[1]) if peername else None)
73
74    return endpoint

Create new UDP endpoint

Arguments:
  • local_addr: local address
  • remote_addr: remote address
  • queue_size: receive queue max size
  • kwargs: additional arguments passed to asyncio.AbstractEventLoop.create_datagram_endpoint()
class Endpoint(hat.aio.group.Resource):
 77class Endpoint(aio.Resource):
 78    """UDP endpoint"""
 79
 80    @property
 81    def async_group(self) -> aio.Group:
 82        """Async group"""
 83        return self._async_group
 84
 85    @property
 86    def info(self) -> EndpointInfo:
 87        """Endpoint info"""
 88        return self._info
 89
 90    @property
 91    def empty(self) -> bool:
 92        """Is receive queue empty"""
 93        return self._queue.empty()
 94
 95    def send(self,
 96             data: util.Bytes,
 97             remote_addr: Address | None = None):
 98        """Send datagram
 99
100        If `remote_addr` is not set, `remote_addr` passed to :func:`create`
101        is used.
102
103        """
104        if not self.is_open:
105            raise ConnectionError()
106
107        self._transport.sendto(data, remote_addr or self._remote_addr)
108
109    async def receive(self) -> tuple[util.Bytes, Address]:
110        """Receive datagram"""
111        try:
112            data, addr = await self._queue.get()
113
114        except aio.QueueClosedError:
115            raise ConnectionError()
116
117        return data, addr

UDP endpoint

async_group: hat.aio.group.Group
80    @property
81    def async_group(self) -> aio.Group:
82        """Async group"""
83        return self._async_group

Async group

info: EndpointInfo
85    @property
86    def info(self) -> EndpointInfo:
87        """Endpoint info"""
88        return self._info

Endpoint info

empty: bool
90    @property
91    def empty(self) -> bool:
92        """Is receive queue empty"""
93        return self._queue.empty()

Is receive queue empty

def send( self, data: bytes | bytearray | memoryview, remote_addr: Address | None = None):
 95    def send(self,
 96             data: util.Bytes,
 97             remote_addr: Address | None = None):
 98        """Send datagram
 99
100        If `remote_addr` is not set, `remote_addr` passed to :func:`create`
101        is used.
102
103        """
104        if not self.is_open:
105            raise ConnectionError()
106
107        self._transport.sendto(data, remote_addr or self._remote_addr)

Send datagram

If remote_addr is not set, remote_addr passed to create() is used.

async def receive(self) -> tuple[bytes | bytearray | memoryview, Address]:
109    async def receive(self) -> tuple[util.Bytes, Address]:
110        """Receive datagram"""
111        try:
112            data, addr = await self._queue.get()
113
114        except aio.QueueClosedError:
115            raise ConnectionError()
116
117        return data, addr

Receive datagram