hat.drivers.ssdp

Simple Service Discovery Protocol

 1"""Simple Service Discovery Protocol"""
 2
 3import logging
 4import typing
 5
 6from hat import aio
 7from hat.drivers import udp
 8
 9
10mlog: logging.Logger = logging.getLogger(__name__)
11"""Module logger"""
12
13
14class DeviceInfo(typing.NamedTuple):
15    addr: udp.Address
16    location: str
17    server: str
18    service: str
19
20
21DeviceInfoCb: typing.TypeAlias = aio.AsyncCallable[[DeviceInfo], None]
22"""Device info callback"""
23
24
25default_multicast_addr = udp.Address('239.255.255.250', 1900)
26
27
28async def discover(device_info_cb: DeviceInfoCb,
29                   multicast_addr: udp.Address = default_multicast_addr,
30                   local_name: str = 'hat'
31                   ) -> 'DiscoveryServer':
32    """Create discovery server"""
33    endpoint = await udp.create(udp.Address('0.0.0.0', multicast_addr.port))
34
35    srv = DiscoveryServer()
36    srv._endpoint = endpoint
37    srv._device_info_cb = device_info_cb
38    srv._multicast_addr = multicast_addr
39    srv._local_name = local_name
40    srv._async_group = aio.Group()
41    srv._async_group.spawn(aio.call_on_cancel, endpoint.async_close)
42    srv._async_group.spawn(srv._discovery_loop)
43    return srv
44
45
46class DiscoveryServer(aio.Resource):
47    """Discovery server"""
48
49    @property
50    def async_group(self) -> aio.Group:
51        """Async group"""
52        return self._async_group
53
54    async def _discovery_loop(self):
55        try:
56            req = _encode_search_req(self._local_name)
57            self._endpoint.send(req, self._multicast_addr)
58
59            while True:
60                res, addr = await self._endpoint.receive()
61                try:
62                    info = _decode_search_res(addr, res)
63                except Exception:
64                    continue
65                await aio.call(self._device_info_cb, info)
66
67        finally:
68            self._async_group.close()
69
70
71def _encode_search_req(local_name):
72    return (f'M-SEARCH * HTTP/1.1\r\n'
73            f'HOST: 239.255.255.250:1900\r\n'
74            f'MAN: "ssdp:discover"\r\n'
75            f'MX: 1\r\n'
76            f'ST: ssdp:all\r\n'
77            f'CPFN.UPNP.ORG: {local_name}\r\n').encode('utf-8')
78
79
80def _decode_search_res(addr, data):
81    lines = str(data, encoding='utf-8').strip().split('\r\n')
82    if lines[0].strip() != 'HTTP/1.1 200 OK':
83        raise Exception('invalid response')
84    entries = {}
85    for line in lines[1:]:
86        line = line.strip()
87        if not line:
88            continue
89        segments = [i.strip() for i in line.split(':', 1)]
90        entries[segments[0].upper()] = segments[1]
91    return DeviceInfo(addr=addr,
92                      location=entries['LOCATION'],
93                      server=entries['SERVER'],
94                      service=entries['USN'])
mlog: logging.Logger = <Logger hat.drivers.ssdp (WARNING)>

Module logger

class DeviceInfo(typing.NamedTuple):
15class DeviceInfo(typing.NamedTuple):
16    addr: udp.Address
17    location: str
18    server: str
19    service: str

DeviceInfo(addr, location, server, service)

DeviceInfo( addr: hat.drivers.udp.Address, location: str, server: str, service: str)

Create new instance of DeviceInfo(addr, location, server, service)

Alias for field number 0

location: str

Alias for field number 1

server: str

Alias for field number 2

service: str

Alias for field number 3

DeviceInfoCb: TypeAlias = Callable[[DeviceInfo], Optional[Awaitable[NoneType]]]

Device info callback

default_multicast_addr = Address(host='239.255.255.250', port=1900)
async def discover( device_info_cb: Callable[[DeviceInfo], Optional[Awaitable[NoneType]]], multicast_addr: hat.drivers.udp.Address = Address(host='239.255.255.250', port=1900), local_name: str = 'hat') -> DiscoveryServer:
29async def discover(device_info_cb: DeviceInfoCb,
30                   multicast_addr: udp.Address = default_multicast_addr,
31                   local_name: str = 'hat'
32                   ) -> 'DiscoveryServer':
33    """Create discovery server"""
34    endpoint = await udp.create(udp.Address('0.0.0.0', multicast_addr.port))
35
36    srv = DiscoveryServer()
37    srv._endpoint = endpoint
38    srv._device_info_cb = device_info_cb
39    srv._multicast_addr = multicast_addr
40    srv._local_name = local_name
41    srv._async_group = aio.Group()
42    srv._async_group.spawn(aio.call_on_cancel, endpoint.async_close)
43    srv._async_group.spawn(srv._discovery_loop)
44    return srv

Create discovery server

class DiscoveryServer(hat.aio.group.Resource):
47class DiscoveryServer(aio.Resource):
48    """Discovery server"""
49
50    @property
51    def async_group(self) -> aio.Group:
52        """Async group"""
53        return self._async_group
54
55    async def _discovery_loop(self):
56        try:
57            req = _encode_search_req(self._local_name)
58            self._endpoint.send(req, self._multicast_addr)
59
60            while True:
61                res, addr = await self._endpoint.receive()
62                try:
63                    info = _decode_search_res(addr, res)
64                except Exception:
65                    continue
66                await aio.call(self._device_info_cb, info)
67
68        finally:
69            self._async_group.close()

Discovery server

async_group: hat.aio.group.Group
50    @property
51    def async_group(self) -> aio.Group:
52        """Async group"""
53        return self._async_group

Async group