hat.drivers.ssdp
Simple Service Discovery Protocol
1"""Simple Service Discovery Protocol""" 2 3import logging 4import typing 5 6from hat import aio 7from hat.drivers import udp 8 9 10mlog: logging.Logger = logging.getLogger(__name__) 11"""Module logger""" 12 13 14class DeviceInfo(typing.NamedTuple): 15 addr: udp.Address 16 location: str 17 server: str 18 service: str 19 20 21DeviceInfoCb: typing.TypeAlias = aio.AsyncCallable[[DeviceInfo], None] 22"""Device info callback""" 23 24 25default_multicast_addr = udp.Address('239.255.255.250', 1900) 26 27 28async def discover(device_info_cb: DeviceInfoCb, 29 multicast_addr: udp.Address = default_multicast_addr, 30 local_name: str = 'hat' 31 ) -> 'DiscoveryServer': 32 """Create discovery server""" 33 endpoint = await udp.create(udp.Address('0.0.0.0', multicast_addr.port)) 34 35 srv = DiscoveryServer() 36 srv._endpoint = endpoint 37 srv._device_info_cb = device_info_cb 38 srv._multicast_addr = multicast_addr 39 srv._local_name = local_name 40 srv._async_group = aio.Group() 41 srv._async_group.spawn(aio.call_on_cancel, endpoint.async_close) 42 srv._async_group.spawn(srv._discovery_loop) 43 return srv 44 45 46class DiscoveryServer(aio.Resource): 47 """Discovery server""" 48 49 @property 50 def async_group(self) -> aio.Group: 51 """Async group""" 52 return self._async_group 53 54 async def _discovery_loop(self): 55 try: 56 req = _encode_search_req(self._local_name) 57 self._endpoint.send(req, self._multicast_addr) 58 59 while True: 60 res, addr = await self._endpoint.receive() 61 try: 62 info = _decode_search_res(addr, res) 63 except Exception: 64 continue 65 await aio.call(self._device_info_cb, info) 66 67 finally: 68 self._async_group.close() 69 70 71def _encode_search_req(local_name): 72 return (f'M-SEARCH * HTTP/1.1\r\n' 73 f'HOST: 239.255.255.250:1900\r\n' 74 f'MAN: "ssdp:discover"\r\n' 75 f'MX: 1\r\n' 76 f'ST: ssdp:all\r\n' 77 f'CPFN.UPNP.ORG: {local_name}\r\n').encode('utf-8') 78 79 80def _decode_search_res(addr, data): 81 lines = str(data, encoding='utf-8').strip().split('\r\n') 82 if lines[0].strip() != 'HTTP/1.1 200 OK': 83 raise Exception('invalid response') 84 entries = {} 85 for line in lines[1:]: 86 line = line.strip() 87 if not line: 88 continue 89 segments = [i.strip() for i in line.split(':', 1)] 90 entries[segments[0].upper()] = segments[1] 91 return DeviceInfo(addr=addr, 92 location=entries['LOCATION'], 93 server=entries['SERVER'], 94 service=entries['USN'])
Module logger
class
DeviceInfo(typing.NamedTuple):
15class DeviceInfo(typing.NamedTuple): 16 addr: udp.Address 17 location: str 18 server: str 19 service: str
DeviceInfo(addr, location, server, service)
DeviceInfo( addr: hat.drivers.udp.Address, location: str, server: str, service: str)
Create new instance of DeviceInfo(addr, location, server, service)
Device info callback
default_multicast_addr =
Address(host='239.255.255.250', port=1900)
async def
discover( device_info_cb: Callable[[DeviceInfo], Optional[Awaitable[NoneType]]], multicast_addr: hat.drivers.udp.Address = Address(host='239.255.255.250', port=1900), local_name: str = 'hat') -> DiscoveryServer:
29async def discover(device_info_cb: DeviceInfoCb, 30 multicast_addr: udp.Address = default_multicast_addr, 31 local_name: str = 'hat' 32 ) -> 'DiscoveryServer': 33 """Create discovery server""" 34 endpoint = await udp.create(udp.Address('0.0.0.0', multicast_addr.port)) 35 36 srv = DiscoveryServer() 37 srv._endpoint = endpoint 38 srv._device_info_cb = device_info_cb 39 srv._multicast_addr = multicast_addr 40 srv._local_name = local_name 41 srv._async_group = aio.Group() 42 srv._async_group.spawn(aio.call_on_cancel, endpoint.async_close) 43 srv._async_group.spawn(srv._discovery_loop) 44 return srv
Create discovery server
class
DiscoveryServer(hat.aio.group.Resource):
47class DiscoveryServer(aio.Resource): 48 """Discovery server""" 49 50 @property 51 def async_group(self) -> aio.Group: 52 """Async group""" 53 return self._async_group 54 55 async def _discovery_loop(self): 56 try: 57 req = _encode_search_req(self._local_name) 58 self._endpoint.send(req, self._multicast_addr) 59 60 while True: 61 res, addr = await self._endpoint.receive() 62 try: 63 info = _decode_search_res(addr, res) 64 except Exception: 65 continue 66 await aio.call(self._device_info_cb, info) 67 68 finally: 69 self._async_group.close()
Discovery server