hat.drivers.udp
Asyncio UDP endpoint wrapper
1"""Asyncio UDP endpoint wrapper""" 2 3import asyncio 4import logging 5import typing 6 7from hat import aio 8from hat import util 9 10 11mlog: logging.Logger = logging.getLogger(__name__) 12"""Module logger""" 13 14 15class Address(typing.NamedTuple): 16 host: str 17 port: int 18 19 20class EndpointInfo(typing.NamedTuple): 21 local_addr: Address 22 remote_addr: Address | None 23 24 25async def create(local_addr: Address | None = None, 26 remote_addr: Address | None = None, 27 queue_size: int = 0, 28 **kwargs 29 ) -> 'Endpoint': 30 """Create new UDP endpoint 31 32 Args: 33 local_addr: local address 34 remote_addr: remote address 35 queue_size: receive queue max size 36 kwargs: additional arguments passed to 37 :meth:`asyncio.AbstractEventLoop.create_datagram_endpoint` 38 39 """ 40 endpoint = Endpoint() 41 endpoint._local_addr = local_addr 42 endpoint._remote_addr = remote_addr 43 endpoint._async_group = aio.Group() 44 endpoint._queue = aio.Queue(queue_size) 45 46 class Protocol(asyncio.DatagramProtocol): 47 48 def connection_lost(self, exc): 49 endpoint._async_group.close() 50 51 def datagram_received(self, data, addr): 52 try: 53 endpoint._queue.put_nowait( 54 (data, Address(addr[0], addr[1]))) 55 56 except aio.QueueFullError: 57 mlog.warning('receive queue full - dropping datagram') 58 59 loop = asyncio.get_running_loop() 60 endpoint._transport, endpoint._protocol = \ 61 await loop.create_datagram_endpoint(Protocol, local_addr, remote_addr, 62 **kwargs) 63 64 endpoint._async_group.spawn(aio.call_on_cancel, endpoint._transport.close) 65 endpoint._async_group.spawn(aio.call_on_cancel, endpoint._queue.close) 66 67 sockname = endpoint._transport.get_extra_info('sockname') 68 peername = endpoint._transport.get_extra_info('peername') 69 endpoint._info = EndpointInfo( 70 local_addr=Address(sockname[0], sockname[1]), 71 remote_addr=Address(peername[0], peername[1]) if peername else None) 72 73 return endpoint 74 75 76class Endpoint(aio.Resource): 77 """UDP endpoint""" 78 79 @property 80 def async_group(self) -> aio.Group: 81 """Async group""" 82 return self._async_group 83 84 @property 85 def info(self) -> EndpointInfo: 86 """Endpoint info""" 87 return self._info 88 89 @property 90 def empty(self) -> bool: 91 """Is receive queue empty""" 92 return self._queue.empty() 93 94 def send(self, 95 data: util.Bytes, 96 remote_addr: Address | None = None): 97 """Send datagram 98 99 If `remote_addr` is not set, `remote_addr` passed to :func:`create` 100 is used. 101 102 """ 103 if not self.is_open: 104 raise ConnectionError() 105 106 self._transport.sendto(data, remote_addr or self._remote_addr) 107 108 async def receive(self) -> tuple[util.Bytes, Address]: 109 """Receive datagram""" 110 try: 111 data, addr = await self._queue.get() 112 113 except aio.QueueClosedError: 114 raise ConnectionError() 115 116 return data, addr
Module logger
class
Address(typing.NamedTuple):
Address(host, port)
class
EndpointInfo(typing.NamedTuple):
EndpointInfo(local_addr, remote_addr)
async def
create( local_addr: Address | None = None, remote_addr: Address | None = None, queue_size: int = 0, **kwargs) -> Endpoint:
26async def create(local_addr: Address | None = None, 27 remote_addr: Address | None = None, 28 queue_size: int = 0, 29 **kwargs 30 ) -> 'Endpoint': 31 """Create new UDP endpoint 32 33 Args: 34 local_addr: local address 35 remote_addr: remote address 36 queue_size: receive queue max size 37 kwargs: additional arguments passed to 38 :meth:`asyncio.AbstractEventLoop.create_datagram_endpoint` 39 40 """ 41 endpoint = Endpoint() 42 endpoint._local_addr = local_addr 43 endpoint._remote_addr = remote_addr 44 endpoint._async_group = aio.Group() 45 endpoint._queue = aio.Queue(queue_size) 46 47 class Protocol(asyncio.DatagramProtocol): 48 49 def connection_lost(self, exc): 50 endpoint._async_group.close() 51 52 def datagram_received(self, data, addr): 53 try: 54 endpoint._queue.put_nowait( 55 (data, Address(addr[0], addr[1]))) 56 57 except aio.QueueFullError: 58 mlog.warning('receive queue full - dropping datagram') 59 60 loop = asyncio.get_running_loop() 61 endpoint._transport, endpoint._protocol = \ 62 await loop.create_datagram_endpoint(Protocol, local_addr, remote_addr, 63 **kwargs) 64 65 endpoint._async_group.spawn(aio.call_on_cancel, endpoint._transport.close) 66 endpoint._async_group.spawn(aio.call_on_cancel, endpoint._queue.close) 67 68 sockname = endpoint._transport.get_extra_info('sockname') 69 peername = endpoint._transport.get_extra_info('peername') 70 endpoint._info = EndpointInfo( 71 local_addr=Address(sockname[0], sockname[1]), 72 remote_addr=Address(peername[0], peername[1]) if peername else None) 73 74 return endpoint
Create new UDP endpoint
Arguments:
- local_addr: local address
- remote_addr: remote address
- queue_size: receive queue max size
- kwargs: additional arguments passed to
asyncio.AbstractEventLoop.create_datagram_endpoint()
class
Endpoint(hat.aio.group.Resource):
77class Endpoint(aio.Resource): 78 """UDP endpoint""" 79 80 @property 81 def async_group(self) -> aio.Group: 82 """Async group""" 83 return self._async_group 84 85 @property 86 def info(self) -> EndpointInfo: 87 """Endpoint info""" 88 return self._info 89 90 @property 91 def empty(self) -> bool: 92 """Is receive queue empty""" 93 return self._queue.empty() 94 95 def send(self, 96 data: util.Bytes, 97 remote_addr: Address | None = None): 98 """Send datagram 99 100 If `remote_addr` is not set, `remote_addr` passed to :func:`create` 101 is used. 102 103 """ 104 if not self.is_open: 105 raise ConnectionError() 106 107 self._transport.sendto(data, remote_addr or self._remote_addr) 108 109 async def receive(self) -> tuple[util.Bytes, Address]: 110 """Receive datagram""" 111 try: 112 data, addr = await self._queue.get() 113 114 except aio.QueueClosedError: 115 raise ConnectionError() 116 117 return data, addr
UDP endpoint
async_group: hat.aio.group.Group
80 @property 81 def async_group(self) -> aio.Group: 82 """Async group""" 83 return self._async_group
Async group
empty: bool
90 @property 91 def empty(self) -> bool: 92 """Is receive queue empty""" 93 return self._queue.empty()
Is receive queue empty
95 def send(self, 96 data: util.Bytes, 97 remote_addr: Address | None = None): 98 """Send datagram 99 100 If `remote_addr` is not set, `remote_addr` passed to :func:`create` 101 is used. 102 103 """ 104 if not self.is_open: 105 raise ConnectionError() 106 107 self._transport.sendto(data, remote_addr or self._remote_addr)
Send datagram
If remote_addr
is not set, remote_addr
passed to create()
is used.