hat.drivers.mqtt

 1from hat.drivers.mqtt.client import (Msg,
 2                                     MsgCb,
 3                                     connect,
 4                                     Client)
 5from hat.drivers.mqtt.common import (UInt32,
 6                                     String,
 7                                     Binary,
 8                                     QoS,
 9                                     RetainHandling,
10                                     Reason,
11                                     MqttError,
12                                     Subscription,
13                                     is_error_reason)
14
15
16__all__ = ['Msg',
17           'MsgCb',
18           'connect',
19           'Client',
20           'UInt32',
21           'String',
22           'Binary',
23           'QoS',
24           'RetainHandling',
25           'Reason',
26           'MqttError',
27           'Subscription',
28           'is_error_reason']
class Msg(typing.NamedTuple):
21class Msg(typing.NamedTuple):
22    topic: common.String
23    payload: str | util.Bytes
24    qos: common.QoS = common.QoS.AT_MOST_ONCE
25    retain: bool = True
26    message_expiry_interval: common.UInt32 | None = None
27    response_topic: common.String | None = None
28    correlation_data: common.Binary | None = None
29    user_properties: Collection[tuple[common.String, common.String]] = []
30    content_type: common.String | None = None

Msg(topic, payload, qos, retain, message_expiry_interval, response_topic, correlation_data, user_properties, content_type)

Msg( topic: str, payload: str | bytes | bytearray | memoryview, qos: QoS = <QoS.AT_MOST_ONCE: 0>, retain: bool = True, message_expiry_interval: int | None = None, response_topic: str | None = None, correlation_data: bytes | bytearray | memoryview | None = None, user_properties: Collection[tuple[str, str]] = [], content_type: str | None = None)

Create new instance of Msg(topic, payload, qos, retain, message_expiry_interval, response_topic, correlation_data, user_properties, content_type)

topic: str

Alias for field number 0

payload: str | bytes | bytearray | memoryview

Alias for field number 1

qos: QoS

Alias for field number 2

retain: bool

Alias for field number 3

message_expiry_interval: int | None

Alias for field number 4

response_topic: str | None

Alias for field number 5

correlation_data: bytes | bytearray | memoryview | None

Alias for field number 6

user_properties: Collection[tuple[str, str]]

Alias for field number 7

content_type: str | None

Alias for field number 8

MsgCb = typing.Callable[[ForwardRef('Client'), Msg], None | collections.abc.Awaitable[None]]
async def connect( addr: hat.drivers.tcp.Address, msg_cb: Optional[Callable[[Client, Msg], None | Awaitable[None]]] = None, will_msg: Msg | None = None, will_delay: int = 0, client_id: str = '', user_name: str | None = None, password: bytes | bytearray | memoryview | None = None, ping_delay: float | None = None, response_timeout: float = 30, **kwargs) -> Client:
 36async def connect(addr: tcp.Address,
 37                  msg_cb: MsgCb | None = None,
 38                  will_msg: Msg | None = None,
 39                  will_delay: common.UInt32 = 0,
 40                  client_id: common.String = '',
 41                  user_name: common.String | None = None,
 42                  password: common.Binary | None = None,
 43                  ping_delay: float | None = None,
 44                  response_timeout: float = 30,
 45                  **kwargs
 46                  ) -> 'Client':
 47    conn = await transport.connect(addr, **kwargs)
 48
 49    try:
 50        req = _create_connect_packet(will_msg=will_msg,
 51                                     will_delay=will_delay,
 52                                     ping_delay=ping_delay,
 53                                     client_id=client_id,
 54                                     user_name=user_name,
 55                                     password=password)
 56        await conn.send(req)
 57
 58        res = await aio.wait_for(conn.receive(), response_timeout)
 59
 60        if isinstance(res, transport.DisconnectPacket):
 61            raise common.MqttError(res.reason, res.reason_string)
 62
 63        if not isinstance(res, transport.ConnAckPacket):
 64            raise Exception('unexpected response')
 65
 66        if res.reason != common.Reason.SUCCESS:
 67            raise common.MqttError(res.reason, res.reason_string)
 68
 69    except BaseException:
 70        await aio.uncancellable(conn.async_close())
 71        raise
 72
 73    client = Client()
 74    client._msg_cb = msg_cb
 75    client._response_timeout = response_timeout
 76    client._loop = asyncio.get_running_loop()
 77    client._async_group = aio.Group()
 78    client._conn = conn
 79    client._sent_event = asyncio.Event()
 80    client._ping_future = None
 81    client._disconnect_reason = common.Reason.SUCCESS
 82    client._disconnect_reason_string = None
 83    client._log = _create_logger(conn.info)
 84
 85    client._maximum_qos = res.maximum_qos
 86    client._client_id = (res.assigned_client_identifier
 87                         if res.assigned_client_identifier is not None
 88                         else client_id)
 89    client._keep_alive = (req.keep_alive if res.server_keep_alive is None
 90                          else res.server_keep_alive)
 91    client._response_information = res.response_information
 92
 93    try:
 94        client.async_group.spawn(aio.call_on_cancel, client._on_close)
 95
 96        client._identifier_registry = _IdentifierRegistry(
 97            client.async_group.create_subgroup())
 98
 99        client.async_group.spawn(client._receive_loop)
100        if client._keep_alive > 0:
101            client.async_group.spawn(client._ping_loop)
102
103    except BaseException:
104        await aio.uncancellable(client.async_close())
105        raise
106
107    return client
class Client(hat.aio.group.Resource):
110class Client(aio.Resource):
111
112    @property
113    def async_group(self) -> aio.Group:
114        return self._async_group
115
116    @property
117    def info(self) -> tcp.ConnectionInfo:
118        return self._conn.info
119
120    @property
121    def maximum_qos(self) -> common.QoS:
122        return self._maximum_qos
123
124    @property
125    def client_id(self) -> common.String:
126        return self._client_id
127
128    @property
129    def response_information(self) -> common.String | None:
130        return self._response_information
131
132    async def publish(self, msg: Msg):
133        if msg.qos.value > self._maximum_qos.value:
134            raise Exception(f'maximum supported QoS is {self._maximum_qos}')
135
136        if msg.qos == common.QoS.AT_MOST_ONCE:
137            identifier = None
138
139        elif msg.qos in (common.QoS.AT_LEAST_ONCE,
140                         common.QoS.EXACLTY_ONCE):
141            identifier = await self._identifier_registry.allocate_identifier()
142
143        else:
144            raise ValueError('unsupported QoS')
145
146        try:
147            req = transport.PublishPacket(
148                duplicate=False,
149                qos=self._maximum_qos,
150                retain=msg.retain,
151                topic_name=msg.topic,
152                packet_identifier=identifier,
153                message_expiry_interval=msg.message_expiry_interval,
154                topic_alias=None,
155                response_topic=msg.response_topic,
156                correlation_data=msg.correlation_data,
157                user_properties=msg.user_properties,
158                subscription_identifiers=[],
159                content_type=msg.content_type,
160                payload=msg.payload)
161
162            if msg.qos == common.QoS.AT_MOST_ONCE:
163                await self._conn.send(req)
164                return
165
166            future = self._identifier_registry.create_future(identifier)
167            await self._conn.send(req)
168            res = await aio.wait_for(future, self._response_timeout)
169
170            if msg.qos == common.QoS.AT_LEAST_ONCE:
171                self._assert_res_type(res, transport.PubAckPacket)
172                if common.is_error_reason(res.reason):
173                    raise common.MqttError(res.reason, res.reason_string)
174
175                return
176
177            self._assert_res_type(res, transport.PubRecPacket)
178            if common.is_error_reason(res.reason):
179                raise common.MqttError(res.reason, res.reason_string)
180
181            req = transport.PubRelPacket(packet_identifier=identifier,
182                                         reason=common.Reason.SUCCESS,
183                                         reason_string=None,
184                                         user_properties=[])
185
186            future = self._identifier_registry.create_future(identifier)
187            await self._conn.send(req)
188            res = await aio.wait_for(future, self._response_timeout)
189
190            self._assert_res_type(res, transport.PubCompPacket)
191            if common.is_error_reason(res.reason):
192                raise common.MqttError(res.reason, res.reason_string)
193
194        except asyncio.TimeoutError:
195            self._log.error("response timeout exceeded")
196
197            self._set_disconnect_reason(
198                common.Reason.IMPLEMENTATION_SPECIFIC_ERROR,
199                "response timeout exceeded")
200            self.close()
201
202            raise ConnectionError()
203
204        finally:
205            if identifier is not None:
206                self._identifier_registry.release_identifier(identifier)
207
208    async def subscribe(self,
209                        subscriptions: Collection[common.Subscription]
210                        ) -> Collection[common.Reason]:
211        identifier = await self._identifier_registry.allocate_identifier()
212
213        try:
214            req = transport.SubscribePacket(packet_identifier=identifier,
215                                            subscription_identifier=None,
216                                            user_properties=[],
217                                            subscriptions=subscriptions)
218
219            future = self._identifier_registry.create_future(identifier)
220            await self._conn.send(req)
221            res = await aio.wait_for(future, self._response_timeout)
222
223            self._assert_res_type(res, transport.SubAckPacket)
224            return res.reasons
225
226        except asyncio.TimeoutError:
227            self._log.error("response timeout exceeded")
228
229            self._set_disconnect_reason(
230                common.Reason.IMPLEMENTATION_SPECIFIC_ERROR,
231                "response timeout exceeded")
232            self.close()
233
234            raise ConnectionError()
235
236        finally:
237            self._identifier_registry.release_identifier(identifier)
238
239    async def unsubscribe(self,
240                          topic_filters: Collection[common.String]
241                          ) -> Collection[common.Reason]:
242        identifier = await self._identifier_registry.allocate_identifier()
243
244        try:
245            req = transport.UnsubscribePacket(packet_identifier=identifier,
246                                              user_properties=[],
247                                              topic_filters=topic_filters)
248
249            future = self._identifier_registry.create_future(identifier)
250            await self._conn.send(req)
251            res = await aio.wait_for(future, self._response_timeout)
252
253            self._assert_res_type(res, transport.UnsubAckPacket)
254            return res.reasons
255
256        except asyncio.TimeoutError:
257            self._log.error("response timeout exceeded")
258
259            self._set_disconnect_reason(
260                common.Reason.IMPLEMENTATION_SPECIFIC_ERROR,
261                "response timeout exceeded")
262            self.close()
263
264            raise ConnectionError()
265
266        finally:
267            self._identifier_registry.release_identifier(identifier)
268
269    async def _on_close(self):
270        if self._conn.is_open:
271            with contextlib.suppress(Exception):
272                await self._conn.send(
273                    transport.DisconnectPacket(
274                        reason=self._disconnect_reason,
275                        session_expiry_interval=None,
276                        reason_string=self._disconnect_reason_string,
277                        user_properties=[],
278                        server_reference=None))
279
280        await self._conn.async_close()
281
282    def _set_disconnect_reason(self, reason, reason_string):
283        if common.is_error_reason(self._disconnect_reason):
284            return
285
286        self._disconnect_reason = reason
287        self._disconnect_reason_string = reason_string
288
289    async def _send(self, packet):
290        await self._conn.send(packet)
291        self._sent_event.set()
292
293    def _assert_res_type(self, res, cls):
294        if isinstance(res, cls):
295            return
296
297        self._log.error('received invalid response (expecting %s)', cls)
298
299        self._set_disconnect_reason(common.Reason.PROTOCOL_ERROR,
300                                    'invalid response type')
301        self.close()
302
303        raise ConnectionError()
304
305    async def _receive_loop(self):
306        try:
307            self._log.debug('starting receive loop')
308
309            while True:
310                packet = await self._conn.receive()
311
312                if isinstance(packet, transport.PublishPacket):
313                    await self._process_publish_packet(packet)
314
315                elif isinstance(packet, transport.PubRelPacket):
316                    await self._process_publish_release_packet(packet)
317
318                elif isinstance(packet, (transport.PubAckPacket,
319                                         transport.PubRecPacket,
320                                         transport.PubCompPacket,
321                                         transport.SubAckPacket,
322                                         transport.UnsubAckPacket)):
323                    if not self._identifier_registry.register_packet(packet):
324                        self._log.warning('packet identifier not expected - '
325                                          'dropping packet')
326
327                elif isinstance(packet, transport.PingResPacket):
328                    if self._ping_future and not self._ping_future.done():
329                        self._ping_future.set_result(None)
330
331                elif isinstance(packet, transport.DisconnectPacket):
332                    self._log.debug('received disconnect packet: %s: %s',
333                                    packet.reason, packet.reason_string)
334
335                    self._conn.close()
336                    break
337
338                elif isinstance(packet, transport.AuthPacket):
339                    raise common.MqttError(
340                        common.Reason.IMPLEMENTATION_SPECIFIC_ERROR,
341                        'auth packet not supported')
342
343                else:
344                    raise common.MqttError(common.Reason.PROTOCOL_ERROR,
345                                           'unexpected packet')
346
347        except ConnectionError:
348            pass
349
350        except common.MqttError as e:
351            self._log.error('receive loop mqtt error: %s', e, exc_info=e)
352
353            self._set_disconnect_reason(e.reason, e.description)
354
355        except Exception as e:
356            self._log.error('receive loop error: %s', e, exc_info=e)
357
358            self._set_disconnect_reason(common.Reason.UNSPECIFIED_ERROR, None)
359
360        finally:
361            self._log.debug('stopping receive loop')
362
363            self.close()
364
365    async def _ping_loop(self):
366        try:
367            self._log.debug('starting ping loop')
368
369            while True:
370                self._sent_event.clear()
371
372                with contextlib.suppress(asyncio.TimeoutError):
373                    await aio.wait_for(self._sent_event.wait(),
374                                       self._keep_alive)
375                    continue
376
377                self._ping_future = self._loop.create_future()
378                await self._conn.send(transport.PingReqPacket())
379
380                await aio.wait_for(self._ping_future, self._response_timeout)
381
382        except ConnectionError:
383            pass
384
385        except asyncio.TimeoutError:
386            self._log.error('ping response timeout')
387
388            self._set_disconnect_reason(
389                common.Reason.IMPLEMENTATION_SPECIFIC_ERROR,
390                "response timeout exceeded")
391
392        except Exception as e:
393            self._log.error('ping loop error: %s', e, exc_info=e)
394
395            self._set_disconnect_reason(common.Reason.UNSPECIFIED_ERROR, None)
396
397        finally:
398            self._log.debug('stopping ping loop')
399
400            self.close()
401
402    async def _process_publish_packet(self, packet):
403        if self._msg_cb:
404            msg = Msg(
405                topic=packet.topic_name,
406                payload=packet.payload,
407                qos=packet.qos,
408                retain=packet.retain,
409                message_expiry_interval=packet.message_expiry_interval,
410                response_topic=packet.response_topic,
411                correlation_data=packet.correlation_data,
412                user_properties=packet.user_properties,
413                content_type=packet.content_type)
414
415            await aio.call(self._msg_cb, self, msg)
416
417        if msg.qos == common.QoS.AT_MOST_ONCE:
418            return
419
420        if msg.qos == common.QoS.AT_LEAST_ONCE:
421            res = transport.PubAckPacket(
422                packet_identifier=packet.packet_identifier,
423                reason=common.Reason.SUCCESS,
424                reason_string=None,
425                user_properties=[])
426
427            await self._send(res)
428            return
429
430        if msg.qos != common.QoS.EXACLTY_ONCE:
431            raise ValueError('unsupported QoS')
432
433        req = transport.PubRecPacket(
434            packet_identifier=packet.packet_identifier,
435            reason=common.Reason.SUCCESS,
436            reason_string=None,
437            user_properties=[])
438
439        await self._conn.send(req)
440
441    async def _process_publish_release_packet(self, packet):
442
443        # TODO should remember previous publish packet and check response
444        #      timeout
445
446        if common.is_error_reason(packet.reason):
447            self._log.warning('publish release error: %s: %s',
448                              packet.reason, packet.reason_string)
449            return
450
451        req = transport.PubCompPacket(
452            packet_identifier=packet.packet_identifier,
453            reason=common.Reason.SUCCESS,
454            reason_string=None,
455            user_properties=[])
456
457        await self._conn.send(req)

Resource with lifetime control based on Group.

async_group: hat.aio.group.Group
112    @property
113    def async_group(self) -> aio.Group:
114        return self._async_group

Group controlling resource's lifetime.

info: hat.drivers.tcp.ConnectionInfo
116    @property
117    def info(self) -> tcp.ConnectionInfo:
118        return self._conn.info
maximum_qos: QoS
120    @property
121    def maximum_qos(self) -> common.QoS:
122        return self._maximum_qos
client_id: str
124    @property
125    def client_id(self) -> common.String:
126        return self._client_id
response_information: str | None
128    @property
129    def response_information(self) -> common.String | None:
130        return self._response_information
async def publish(self, msg: Msg):
132    async def publish(self, msg: Msg):
133        if msg.qos.value > self._maximum_qos.value:
134            raise Exception(f'maximum supported QoS is {self._maximum_qos}')
135
136        if msg.qos == common.QoS.AT_MOST_ONCE:
137            identifier = None
138
139        elif msg.qos in (common.QoS.AT_LEAST_ONCE,
140                         common.QoS.EXACLTY_ONCE):
141            identifier = await self._identifier_registry.allocate_identifier()
142
143        else:
144            raise ValueError('unsupported QoS')
145
146        try:
147            req = transport.PublishPacket(
148                duplicate=False,
149                qos=self._maximum_qos,
150                retain=msg.retain,
151                topic_name=msg.topic,
152                packet_identifier=identifier,
153                message_expiry_interval=msg.message_expiry_interval,
154                topic_alias=None,
155                response_topic=msg.response_topic,
156                correlation_data=msg.correlation_data,
157                user_properties=msg.user_properties,
158                subscription_identifiers=[],
159                content_type=msg.content_type,
160                payload=msg.payload)
161
162            if msg.qos == common.QoS.AT_MOST_ONCE:
163                await self._conn.send(req)
164                return
165
166            future = self._identifier_registry.create_future(identifier)
167            await self._conn.send(req)
168            res = await aio.wait_for(future, self._response_timeout)
169
170            if msg.qos == common.QoS.AT_LEAST_ONCE:
171                self._assert_res_type(res, transport.PubAckPacket)
172                if common.is_error_reason(res.reason):
173                    raise common.MqttError(res.reason, res.reason_string)
174
175                return
176
177            self._assert_res_type(res, transport.PubRecPacket)
178            if common.is_error_reason(res.reason):
179                raise common.MqttError(res.reason, res.reason_string)
180
181            req = transport.PubRelPacket(packet_identifier=identifier,
182                                         reason=common.Reason.SUCCESS,
183                                         reason_string=None,
184                                         user_properties=[])
185
186            future = self._identifier_registry.create_future(identifier)
187            await self._conn.send(req)
188            res = await aio.wait_for(future, self._response_timeout)
189
190            self._assert_res_type(res, transport.PubCompPacket)
191            if common.is_error_reason(res.reason):
192                raise common.MqttError(res.reason, res.reason_string)
193
194        except asyncio.TimeoutError:
195            self._log.error("response timeout exceeded")
196
197            self._set_disconnect_reason(
198                common.Reason.IMPLEMENTATION_SPECIFIC_ERROR,
199                "response timeout exceeded")
200            self.close()
201
202            raise ConnectionError()
203
204        finally:
205            if identifier is not None:
206                self._identifier_registry.release_identifier(identifier)
async def subscribe( self, subscriptions: Collection[Subscription]) -> Collection[Reason]:
208    async def subscribe(self,
209                        subscriptions: Collection[common.Subscription]
210                        ) -> Collection[common.Reason]:
211        identifier = await self._identifier_registry.allocate_identifier()
212
213        try:
214            req = transport.SubscribePacket(packet_identifier=identifier,
215                                            subscription_identifier=None,
216                                            user_properties=[],
217                                            subscriptions=subscriptions)
218
219            future = self._identifier_registry.create_future(identifier)
220            await self._conn.send(req)
221            res = await aio.wait_for(future, self._response_timeout)
222
223            self._assert_res_type(res, transport.SubAckPacket)
224            return res.reasons
225
226        except asyncio.TimeoutError:
227            self._log.error("response timeout exceeded")
228
229            self._set_disconnect_reason(
230                common.Reason.IMPLEMENTATION_SPECIFIC_ERROR,
231                "response timeout exceeded")
232            self.close()
233
234            raise ConnectionError()
235
236        finally:
237            self._identifier_registry.release_identifier(identifier)
async def unsubscribe( self, topic_filters: Collection[str]) -> Collection[Reason]:
239    async def unsubscribe(self,
240                          topic_filters: Collection[common.String]
241                          ) -> Collection[common.Reason]:
242        identifier = await self._identifier_registry.allocate_identifier()
243
244        try:
245            req = transport.UnsubscribePacket(packet_identifier=identifier,
246                                              user_properties=[],
247                                              topic_filters=topic_filters)
248
249            future = self._identifier_registry.create_future(identifier)
250            await self._conn.send(req)
251            res = await aio.wait_for(future, self._response_timeout)
252
253            self._assert_res_type(res, transport.UnsubAckPacket)
254            return res.reasons
255
256        except asyncio.TimeoutError:
257            self._log.error("response timeout exceeded")
258
259            self._set_disconnect_reason(
260                common.Reason.IMPLEMENTATION_SPECIFIC_ERROR,
261                "response timeout exceeded")
262            self.close()
263
264            raise ConnectionError()
265
266        finally:
267            self._identifier_registry.release_identifier(identifier)
UInt32 = <class 'int'>
String = <class 'str'>
Binary = bytes | bytearray | memoryview
class QoS(enum.Enum):
27class QoS(enum.Enum):
28    AT_MOST_ONCE = 0
29    AT_LEAST_ONCE = 1
30    EXACLTY_ONCE = 2
AT_MOST_ONCE = <QoS.AT_MOST_ONCE: 0>
AT_LEAST_ONCE = <QoS.AT_LEAST_ONCE: 1>
EXACLTY_ONCE = <QoS.EXACLTY_ONCE: 2>
class RetainHandling(enum.Enum):
33class RetainHandling(enum.Enum):
34    SEND_ON_SUBSCRIBE = 0
35    SEND_ON_NEW_SUBSCRIBE = 1
36    DONT_SEND = 2
SEND_ON_SUBSCRIBE = <RetainHandling.SEND_ON_SUBSCRIBE: 0>
SEND_ON_NEW_SUBSCRIBE = <RetainHandling.SEND_ON_NEW_SUBSCRIBE: 1>
DONT_SEND = <RetainHandling.DONT_SEND: 2>
class Reason(enum.Enum):
39class Reason(enum.Enum):
40    SUCCESS = 0x00
41    GRANTED_QOS_1 = 0x01
42    GRANTED_QOS_2 = 0x02
43    DISCONNECT_WITH_WILL_MESSAGE = 0x04
44    NO_MATCHING_SUBSCRIBERS = 0x10
45    NO_SUBSCRIPTION_EXISTED = 0x11
46    CONTINUE_AUTHENTICATION = 0x24
47    RE_AUTHENTICATE = 0x25
48    UNSPECIFIED_ERROR = 0x80
49    MALFORMED_PACKET = 0x81
50    PROTOCOL_ERROR = 0x82
51    IMPLEMENTATION_SPECIFIC_ERROR = 0x83
52    UNSUPPORTED_PROTOCOL_VERSION = 0x84
53    CLIENT_IDENTIFIER_NOT_VALID = 0x85
54    BAD_USER_NAME_OR_PASSWORD = 0x86
55    NOT_AUTHORIZED = 0x87
56    SERVER_UNAVAILABLE = 0x88
57    SERVER_BUSY = 0x89
58    BANNED = 0x8a
59    SERVER_SHUTTING_DOWN = 0x8b
60    BAD_AUTHENTICATION_METHOD = 0x8c
61    KEEP_ALIVE_TIMEOUT = 0x8d
62    SESSION_TAKEN_OVER = 0x8e
63    TOPIC_FILTER_INVALID = 0x8f
64    TOPIC_NAME_INVALID = 0x90
65    PACKET_IDENTIFIER_IN_USE = 0x91
66    PACKET_IDENTIFIER_NOT_FOUND = 0x92
67    RECEIVE_MAXIMUM_EXCEEDED = 0x93
68    TOPIC_ALIAS_INVALID = 0x94
69    PACKET_TOO_LARGE = 0x95
70    MESSAGE_RATE_TOO_HIGH = 0x96
71    QUOTA_EXCEEDED = 0x97
72    ADMINISTRATIVE_ACTION = 0x98
73    PAYLOAD_FORMAT_INVALID = 0x99
74    RETAIN_NOT_SUPPORTED = 0x9a
75    QOS_NOT_SUPPORTED = 0x9b
76    USE_ANOTHER_SERVER = 0x9c
77    SERVER_MOVED = 0x9d
78    SHARED_SUBSCRIPTIONS_NOT_SUPPORTED = 0x9e
79    CONNECTION_RATE_EXCEEDED = 0x9f
80    MAXIMUM_CONNECT_TIME = 0xa0
81    SUBSCRIPTION_IDENTIFIERS_NOT_SUPPORTED = 0xa1
82    WILDCARD_SUBSCRIPTIONS_NOT_SUPPORTED = 0xa2
SUCCESS = <Reason.SUCCESS: 0>
GRANTED_QOS_1 = <Reason.GRANTED_QOS_1: 1>
GRANTED_QOS_2 = <Reason.GRANTED_QOS_2: 2>
DISCONNECT_WITH_WILL_MESSAGE = <Reason.DISCONNECT_WITH_WILL_MESSAGE: 4>
NO_MATCHING_SUBSCRIBERS = <Reason.NO_MATCHING_SUBSCRIBERS: 16>
NO_SUBSCRIPTION_EXISTED = <Reason.NO_SUBSCRIPTION_EXISTED: 17>
CONTINUE_AUTHENTICATION = <Reason.CONTINUE_AUTHENTICATION: 36>
RE_AUTHENTICATE = <Reason.RE_AUTHENTICATE: 37>
UNSPECIFIED_ERROR = <Reason.UNSPECIFIED_ERROR: 128>
MALFORMED_PACKET = <Reason.MALFORMED_PACKET: 129>
PROTOCOL_ERROR = <Reason.PROTOCOL_ERROR: 130>
IMPLEMENTATION_SPECIFIC_ERROR = <Reason.IMPLEMENTATION_SPECIFIC_ERROR: 131>
UNSUPPORTED_PROTOCOL_VERSION = <Reason.UNSUPPORTED_PROTOCOL_VERSION: 132>
CLIENT_IDENTIFIER_NOT_VALID = <Reason.CLIENT_IDENTIFIER_NOT_VALID: 133>
BAD_USER_NAME_OR_PASSWORD = <Reason.BAD_USER_NAME_OR_PASSWORD: 134>
NOT_AUTHORIZED = <Reason.NOT_AUTHORIZED: 135>
SERVER_UNAVAILABLE = <Reason.SERVER_UNAVAILABLE: 136>
SERVER_BUSY = <Reason.SERVER_BUSY: 137>
BANNED = <Reason.BANNED: 138>
SERVER_SHUTTING_DOWN = <Reason.SERVER_SHUTTING_DOWN: 139>
BAD_AUTHENTICATION_METHOD = <Reason.BAD_AUTHENTICATION_METHOD: 140>
KEEP_ALIVE_TIMEOUT = <Reason.KEEP_ALIVE_TIMEOUT: 141>
SESSION_TAKEN_OVER = <Reason.SESSION_TAKEN_OVER: 142>
TOPIC_FILTER_INVALID = <Reason.TOPIC_FILTER_INVALID: 143>
TOPIC_NAME_INVALID = <Reason.TOPIC_NAME_INVALID: 144>
PACKET_IDENTIFIER_IN_USE = <Reason.PACKET_IDENTIFIER_IN_USE: 145>
PACKET_IDENTIFIER_NOT_FOUND = <Reason.PACKET_IDENTIFIER_NOT_FOUND: 146>
RECEIVE_MAXIMUM_EXCEEDED = <Reason.RECEIVE_MAXIMUM_EXCEEDED: 147>
TOPIC_ALIAS_INVALID = <Reason.TOPIC_ALIAS_INVALID: 148>
PACKET_TOO_LARGE = <Reason.PACKET_TOO_LARGE: 149>
MESSAGE_RATE_TOO_HIGH = <Reason.MESSAGE_RATE_TOO_HIGH: 150>
QUOTA_EXCEEDED = <Reason.QUOTA_EXCEEDED: 151>
ADMINISTRATIVE_ACTION = <Reason.ADMINISTRATIVE_ACTION: 152>
PAYLOAD_FORMAT_INVALID = <Reason.PAYLOAD_FORMAT_INVALID: 153>
RETAIN_NOT_SUPPORTED = <Reason.RETAIN_NOT_SUPPORTED: 154>
QOS_NOT_SUPPORTED = <Reason.QOS_NOT_SUPPORTED: 155>
USE_ANOTHER_SERVER = <Reason.USE_ANOTHER_SERVER: 156>
SERVER_MOVED = <Reason.SERVER_MOVED: 157>
SHARED_SUBSCRIPTIONS_NOT_SUPPORTED = <Reason.SHARED_SUBSCRIPTIONS_NOT_SUPPORTED: 158>
CONNECTION_RATE_EXCEEDED = <Reason.CONNECTION_RATE_EXCEEDED: 159>
MAXIMUM_CONNECT_TIME = <Reason.MAXIMUM_CONNECT_TIME: 160>
SUBSCRIPTION_IDENTIFIERS_NOT_SUPPORTED = <Reason.SUBSCRIPTION_IDENTIFIERS_NOT_SUPPORTED: 161>
WILDCARD_SUBSCRIPTIONS_NOT_SUPPORTED = <Reason.WILDCARD_SUBSCRIPTIONS_NOT_SUPPORTED: 162>
class MqttError(builtins.Exception):
 85class MqttError(Exception):
 86
 87    def __init__(self,
 88                 reason: Reason,
 89                 description: str | None):
 90        super().__init__(reason, description)
 91        self._reason = reason
 92        self._description = description
 93
 94    @property
 95    def reason(self) -> Reason:
 96        return self._reason
 97
 98    @property
 99    def description(self) -> str | None:
100        return self._description

Common base class for all non-exit exceptions.

MqttError(reason: Reason, description: str | None)
87    def __init__(self,
88                 reason: Reason,
89                 description: str | None):
90        super().__init__(reason, description)
91        self._reason = reason
92        self._description = description
reason: Reason
94    @property
95    def reason(self) -> Reason:
96        return self._reason
description: str | None
 98    @property
 99    def description(self) -> str | None:
100        return self._description
class Subscription(typing.NamedTuple):
103class Subscription(typing.NamedTuple):
104    topic_filter: String
105    maximum_qos: QoS
106    no_local: bool
107    retain_as_published: bool
108    retain_handling: RetainHandling

Subscription(topic_filter, maximum_qos, no_local, retain_as_published, retain_handling)

Subscription( topic_filter: str, maximum_qos: QoS, no_local: bool, retain_as_published: bool, retain_handling: RetainHandling)

Create new instance of Subscription(topic_filter, maximum_qos, no_local, retain_as_published, retain_handling)

topic_filter: str

Alias for field number 0

maximum_qos: QoS

Alias for field number 1

no_local: bool

Alias for field number 2

retain_as_published: bool

Alias for field number 3

retain_handling: RetainHandling

Alias for field number 4

def is_error_reason(reason: Reason) -> bool:
111def is_error_reason(reason: Reason) -> bool:
112    return reason.value >= 0x80