hat.drivers.udp
Asyncio UDP endpoint wrapper
1"""Asyncio UDP endpoint wrapper""" 2 3import asyncio 4import functools 5import logging 6import typing 7 8from hat import aio 9from hat import util 10 11from hat.drivers import common 12 13 14mlog: logging.Logger = logging.getLogger(__name__) 15"""Module logger""" 16 17 18class Address(typing.NamedTuple): 19 host: str 20 port: int 21 22 23class EndpointInfo(typing.NamedTuple): 24 name: str | None 25 local_addr: Address 26 remote_addr: Address | None 27 28 29async def create(local_addr: Address | None = None, 30 remote_addr: Address | None = None, 31 *, 32 name: str | None = None, 33 queue_size: int = 0, 34 **kwargs 35 ) -> 'Endpoint': 36 """Create new UDP endpoint 37 38 Args: 39 local_addr: local address 40 remote_addr: remote address 41 name: endpoint name 42 queue_size: receive queue max size 43 kwargs: additional arguments passed to 44 :meth:`asyncio.AbstractEventLoop.create_datagram_endpoint` 45 46 """ 47 endpoint = Endpoint() 48 endpoint._local_addr = local_addr 49 endpoint._remote_addr = remote_addr 50 endpoint._async_group = aio.Group() 51 endpoint._queue = aio.Queue(queue_size) 52 endpoint._transport = None 53 endpoint._protocol = None 54 endpoint._log = _create_logger(name, None) 55 endpoint._comm_log = _CommunicationLogger(name, None) 56 57 loop = asyncio.get_running_loop() 58 create_protocol = functools.partial(Protocol, endpoint) 59 endpoint._transport, endpoint._protocol = \ 60 await loop.create_datagram_endpoint(create_protocol, local_addr, 61 remote_addr, **kwargs) 62 63 try: 64 endpoint._async_group.spawn(aio.call_on_cancel, endpoint._on_close) 65 66 sockname = endpoint._transport.get_extra_info('sockname') 67 peername = endpoint._transport.get_extra_info('peername') 68 endpoint._info = EndpointInfo( 69 name=name, 70 local_addr=Address(sockname[0], sockname[1]), 71 remote_addr=(Address(peername[0], peername[1]) 72 if peername else None)) 73 74 endpoint._log = _create_logger(name, endpoint._info) 75 endpoint._comm_log = _CommunicationLogger(name, endpoint._info) 76 77 except BaseException: 78 await aio.uncancellable(endpoint.async_close()) 79 raise 80 81 endpoint._comm_log.log(common.CommLogAction.OPEN) 82 83 return endpoint 84 85 86class Endpoint(aio.Resource): 87 """UDP endpoint""" 88 89 @property 90 def async_group(self) -> aio.Group: 91 """Async group""" 92 return self._async_group 93 94 @property 95 def info(self) -> EndpointInfo: 96 """Endpoint info""" 97 return self._info 98 99 @property 100 def empty(self) -> bool: 101 """Is receive queue empty""" 102 return self._queue.empty() 103 104 def send(self, 105 data: util.Bytes, 106 remote_addr: Address | None = None): 107 """Send datagram 108 109 If `remote_addr` is not set, `remote_addr` passed to `create` is used. 110 111 """ 112 if not self.is_open or not self._transport: 113 raise ConnectionError() 114 115 msg = data, (remote_addr or self._remote_addr) 116 self._comm_log.log(common.CommLogAction.SEND, msg) 117 118 self._transport.sendto(msg[0], msg[1]) 119 120 async def receive(self) -> tuple[util.Bytes, Address]: 121 """Receive datagram""" 122 try: 123 data, addr = await self._queue.get() 124 125 except aio.QueueClosedError: 126 raise ConnectionError() 127 128 return data, addr 129 130 def _on_close(self): 131 self._queue.close() 132 133 if self._transport: 134 self._transport.close() 135 136 self._transport = None 137 self._protocol = None 138 139 self._comm_log.log(common.CommLogAction.CLOSE) 140 141 142class Protocol(asyncio.DatagramProtocol): 143 144 def __init__(self, endpoint: Endpoint): 145 self._endpoint = endpoint 146 147 def connection_lost(self, exc: Exception | None): 148 self._endpoint._log.debug('connection lost') 149 150 self._endpoint.close() 151 152 def datagram_received(self, data: util.Bytes, addr: tuple): 153 msg = data, Address(addr[0], addr[1]) 154 self._endpoint._comm_log.log(common.CommLogAction.RECEIVE, msg) 155 156 try: 157 self._endpoint._queue.put_nowait(msg) 158 159 except aio.QueueFullError: 160 self._endpoint._log.warning('receive queue full - ' 161 'dropping datagram') 162 163 164def _create_logger(name, info): 165 extra = {'meta': {'type': 'UdpEndpoint', 166 'name': name}} 167 168 if info is not None: 169 extra['meta']['local_addr'] = {'host': info.local_addr.host, 170 'port': info.local_addr.port} 171 172 if info.remote_addr is not None: 173 extra['meta']['remote_addr'] = {'host': info.remote_addr.host, 174 'port': info.remote_addr.port} 175 176 return logging.LoggerAdapter(mlog, extra) 177 178 179class _CommunicationLogger: 180 181 def __init__(self, 182 name: str | None, 183 info: EndpointInfo | None): 184 extra = {'meta': {'type': 'UdpEndpoint', 185 'communication': True, 186 'name': name}} 187 188 if info is not None: 189 extra['meta']['local_addr'] = {'host': info.local_addr.host, 190 'port': info.local_addr.port} 191 192 if info.remote_addr is not None: 193 extra['meta']['remote_addr'] = {'host': info.remote_addr.host, 194 'port': info.remote_addr.port} 195 196 self._log = logging.LoggerAdapter(mlog, extra) 197 198 def log(self, 199 action: common.CommLogAction, 200 msg: tuple[util.Bytes, Address] | None = None): 201 if not self._log.isEnabledFor(logging.DEBUG): 202 return 203 204 if msg is None: 205 self._log.debug(action.value, stacklevel=2) 206 207 else: 208 self._log.debug('%s (data=(%s) remote=%s)', 209 action.value, msg[0].hex(' '), tuple(msg[1]), 210 stacklevel=2)
Module logger
class
Address(typing.NamedTuple):
Address(host, port)
class
EndpointInfo(typing.NamedTuple):
24class EndpointInfo(typing.NamedTuple): 25 name: str | None 26 local_addr: Address 27 remote_addr: Address | None
EndpointInfo(name, local_addr, remote_addr)
async def
create( local_addr: Address | None = None, remote_addr: Address | None = None, *, name: str | None = None, queue_size: int = 0, **kwargs) -> Endpoint:
30async def create(local_addr: Address | None = None, 31 remote_addr: Address | None = None, 32 *, 33 name: str | None = None, 34 queue_size: int = 0, 35 **kwargs 36 ) -> 'Endpoint': 37 """Create new UDP endpoint 38 39 Args: 40 local_addr: local address 41 remote_addr: remote address 42 name: endpoint name 43 queue_size: receive queue max size 44 kwargs: additional arguments passed to 45 :meth:`asyncio.AbstractEventLoop.create_datagram_endpoint` 46 47 """ 48 endpoint = Endpoint() 49 endpoint._local_addr = local_addr 50 endpoint._remote_addr = remote_addr 51 endpoint._async_group = aio.Group() 52 endpoint._queue = aio.Queue(queue_size) 53 endpoint._transport = None 54 endpoint._protocol = None 55 endpoint._log = _create_logger(name, None) 56 endpoint._comm_log = _CommunicationLogger(name, None) 57 58 loop = asyncio.get_running_loop() 59 create_protocol = functools.partial(Protocol, endpoint) 60 endpoint._transport, endpoint._protocol = \ 61 await loop.create_datagram_endpoint(create_protocol, local_addr, 62 remote_addr, **kwargs) 63 64 try: 65 endpoint._async_group.spawn(aio.call_on_cancel, endpoint._on_close) 66 67 sockname = endpoint._transport.get_extra_info('sockname') 68 peername = endpoint._transport.get_extra_info('peername') 69 endpoint._info = EndpointInfo( 70 name=name, 71 local_addr=Address(sockname[0], sockname[1]), 72 remote_addr=(Address(peername[0], peername[1]) 73 if peername else None)) 74 75 endpoint._log = _create_logger(name, endpoint._info) 76 endpoint._comm_log = _CommunicationLogger(name, endpoint._info) 77 78 except BaseException: 79 await aio.uncancellable(endpoint.async_close()) 80 raise 81 82 endpoint._comm_log.log(common.CommLogAction.OPEN) 83 84 return endpoint
Create new UDP endpoint
Arguments:
- local_addr: local address
- remote_addr: remote address
- name: endpoint name
- queue_size: receive queue max size
- kwargs: additional arguments passed to
asyncio.AbstractEventLoop.create_datagram_endpoint()
class
Endpoint(hat.aio.group.Resource):
87class Endpoint(aio.Resource): 88 """UDP endpoint""" 89 90 @property 91 def async_group(self) -> aio.Group: 92 """Async group""" 93 return self._async_group 94 95 @property 96 def info(self) -> EndpointInfo: 97 """Endpoint info""" 98 return self._info 99 100 @property 101 def empty(self) -> bool: 102 """Is receive queue empty""" 103 return self._queue.empty() 104 105 def send(self, 106 data: util.Bytes, 107 remote_addr: Address | None = None): 108 """Send datagram 109 110 If `remote_addr` is not set, `remote_addr` passed to `create` is used. 111 112 """ 113 if not self.is_open or not self._transport: 114 raise ConnectionError() 115 116 msg = data, (remote_addr or self._remote_addr) 117 self._comm_log.log(common.CommLogAction.SEND, msg) 118 119 self._transport.sendto(msg[0], msg[1]) 120 121 async def receive(self) -> tuple[util.Bytes, Address]: 122 """Receive datagram""" 123 try: 124 data, addr = await self._queue.get() 125 126 except aio.QueueClosedError: 127 raise ConnectionError() 128 129 return data, addr 130 131 def _on_close(self): 132 self._queue.close() 133 134 if self._transport: 135 self._transport.close() 136 137 self._transport = None 138 self._protocol = None 139 140 self._comm_log.log(common.CommLogAction.CLOSE)
UDP endpoint
async_group: hat.aio.group.Group
90 @property 91 def async_group(self) -> aio.Group: 92 """Async group""" 93 return self._async_group
Async group
empty: bool
100 @property 101 def empty(self) -> bool: 102 """Is receive queue empty""" 103 return self._queue.empty()
Is receive queue empty
105 def send(self, 106 data: util.Bytes, 107 remote_addr: Address | None = None): 108 """Send datagram 109 110 If `remote_addr` is not set, `remote_addr` passed to `create` is used. 111 112 """ 113 if not self.is_open or not self._transport: 114 raise ConnectionError() 115 116 msg = data, (remote_addr or self._remote_addr) 117 self._comm_log.log(common.CommLogAction.SEND, msg) 118 119 self._transport.sendto(msg[0], msg[1])
Send datagram
If remote_addr is not set, remote_addr passed to create is used.
class
Protocol(asyncio.protocols.DatagramProtocol):
143class Protocol(asyncio.DatagramProtocol): 144 145 def __init__(self, endpoint: Endpoint): 146 self._endpoint = endpoint 147 148 def connection_lost(self, exc: Exception | None): 149 self._endpoint._log.debug('connection lost') 150 151 self._endpoint.close() 152 153 def datagram_received(self, data: util.Bytes, addr: tuple): 154 msg = data, Address(addr[0], addr[1]) 155 self._endpoint._comm_log.log(common.CommLogAction.RECEIVE, msg) 156 157 try: 158 self._endpoint._queue.put_nowait(msg) 159 160 except aio.QueueFullError: 161 self._endpoint._log.warning('receive queue full - ' 162 'dropping datagram')
Interface for datagram protocol.
Protocol(endpoint: Endpoint)
def
connection_lost(self, exc: Exception | None):
148 def connection_lost(self, exc: Exception | None): 149 self._endpoint._log.debug('connection lost') 150 151 self._endpoint.close()
Called when the connection is lost or closed.
The argument is an exception object or None (the latter meaning a regular EOF is received or the connection was aborted or closed).
def
datagram_received(self, data: bytes | bytearray | memoryview, addr: tuple):
153 def datagram_received(self, data: util.Bytes, addr: tuple): 154 msg = data, Address(addr[0], addr[1]) 155 self._endpoint._comm_log.log(common.CommLogAction.RECEIVE, msg) 156 157 try: 158 self._endpoint._queue.put_nowait(msg) 159 160 except aio.QueueFullError: 161 self._endpoint._log.warning('receive queue full - ' 162 'dropping datagram')
Called when some datagram is received.