hat.drivers.mqtt

 1from hat.drivers.mqtt.client import (Msg,
 2                                     MsgCb,
 3                                     connect,
 4                                     Client)
 5from hat.drivers.mqtt.common import (UInt32,
 6                                     String,
 7                                     Binary,
 8                                     QoS,
 9                                     RetainHandling,
10                                     Reason,
11                                     MqttError,
12                                     Subscription,
13                                     is_error_reason)
14
15
16__all__ = ['Msg',
17           'MsgCb',
18           'connect',
19           'Client',
20           'UInt32',
21           'String',
22           'Binary',
23           'QoS',
24           'RetainHandling',
25           'Reason',
26           'MqttError',
27           'Subscription',
28           'is_error_reason']
class Msg(typing.NamedTuple):
21class Msg(typing.NamedTuple):
22    topic: common.String
23    payload: str | util.Bytes
24    qos: common.QoS = common.QoS.AT_MOST_ONCE
25    retain: bool = True
26    message_expiry_interval: common.UInt32 | None = None
27    response_topic: common.String | None = None
28    correlation_data: common.Binary | None = None
29    user_properties: Collection[tuple[common.String, common.String]] = []
30    content_type: common.String | None = None

Msg(topic, payload, qos, retain, message_expiry_interval, response_topic, correlation_data, user_properties, content_type)

Msg( topic: str, payload: str | bytes | bytearray | memoryview, qos: QoS = <QoS.AT_MOST_ONCE: 0>, retain: bool = True, message_expiry_interval: int | None = None, response_topic: str | None = None, correlation_data: bytes | bytearray | memoryview | None = None, user_properties: Collection[tuple[str, str]] = [], content_type: str | None = None)

Create new instance of Msg(topic, payload, qos, retain, message_expiry_interval, response_topic, correlation_data, user_properties, content_type)

topic: str

Alias for field number 0

payload: str | bytes | bytearray | memoryview

Alias for field number 1

qos: QoS

Alias for field number 2

retain: bool

Alias for field number 3

message_expiry_interval: int | None

Alias for field number 4

response_topic: str | None

Alias for field number 5

correlation_data: bytes | bytearray | memoryview | None

Alias for field number 6

user_properties: Collection[tuple[str, str]]

Alias for field number 7

content_type: str | None

Alias for field number 8

MsgCb = typing.Callable[[ForwardRef('Client'), Msg], None | collections.abc.Awaitable[None]]
async def connect( addr: hat.drivers.tcp.Address, msg_cb: Optional[Callable[[Client, Msg], None | Awaitable[None]]] = None, will_msg: Msg | None = None, will_delay: int = 0, client_id: str = '', user_name: str | None = None, password: bytes | bytearray | memoryview | None = None, ping_delay: float | None = None, response_timeout: float = 30, **kwargs) -> Client:
 36async def connect(addr: tcp.Address,
 37                  msg_cb: MsgCb | None = None,
 38                  will_msg: Msg | None = None,
 39                  will_delay: common.UInt32 = 0,
 40                  client_id: common.String = '',
 41                  user_name: common.String | None = None,
 42                  password: common.Binary | None = None,
 43                  ping_delay: float | None = None,
 44                  response_timeout: float = 30,
 45                  **kwargs
 46                  ) -> 'Client':
 47    conn = await transport.connect(addr, **kwargs)
 48
 49    try:
 50        req = _create_connect_packet(will_msg=will_msg,
 51                                     will_delay=will_delay,
 52                                     ping_delay=ping_delay,
 53                                     client_id=client_id,
 54                                     user_name=user_name,
 55                                     password=password)
 56        await conn.send(req)
 57
 58        res = await aio.wait_for(conn.receive(), response_timeout)
 59
 60        if isinstance(res, transport.DisconnectPacket):
 61            raise common.MqttError(res.reason, res.reason_string)
 62
 63        if not isinstance(res, transport.ConnAckPacket):
 64            raise Exception('unexpected response')
 65
 66        if res.reason != common.Reason.SUCCESS:
 67            raise common.MqttError(res.reason, res.reason_string)
 68
 69    except BaseException:
 70        await aio.uncancellable(conn.async_close())
 71        raise
 72
 73    client = Client()
 74    client._msg_cb = msg_cb
 75    client._response_timeout = response_timeout
 76    client._loop = asyncio.get_running_loop()
 77    client._async_group = aio.Group()
 78    client._conn = conn
 79    client._sent_event = asyncio.Event()
 80    client._ping_future = None
 81    client._disconnect_reason = common.Reason.SUCCESS
 82    client._disconnect_reason_string = None
 83
 84    client._maximum_qos = res.maximum_qos
 85    client._client_id = (res.assigned_client_identifier
 86                         if res.assigned_client_identifier is not None
 87                         else client_id)
 88    client._keep_alive = (req.keep_alive if res.server_keep_alive is None
 89                          else res.server_keep_alive)
 90    client._response_information = res.response_information
 91
 92    try:
 93        client.async_group.spawn(aio.call_on_cancel, client._on_close)
 94
 95        client._identifier_registry = _IdentifierRegistry(
 96            client.async_group.create_subgroup())
 97
 98        client.async_group.spawn(client._receive_loop)
 99        if client._keep_alive > 0:
100            client.async_group.spawn(client._ping_loop)
101
102    except BaseException:
103        await aio.uncancellable(client.async_close())
104        raise
105
106    return client
class Client(hat.aio.group.Resource):
109class Client(aio.Resource):
110
111    @property
112    def async_group(self) -> aio.Group:
113        return self._async_group
114
115    @property
116    def info(self) -> tcp.ConnectionInfo:
117        return self._conn.info
118
119    @property
120    def maximum_qos(self) -> common.QoS:
121        return self._maximum_qos
122
123    @property
124    def client_id(self) -> common.String:
125        return self._client_id
126
127    @property
128    def response_information(self) -> common.String | None:
129        return self._response_information
130
131    async def publish(self, msg: Msg):
132        if msg.qos.value > self._maximum_qos.value:
133            raise Exception(f'maximum supported QoS is {self._maximum_qos}')
134
135        if msg.qos == common.QoS.AT_MOST_ONCE:
136            identifier = None
137
138        elif msg.qos in (common.QoS.AT_LEAST_ONCE,
139                         common.QoS.EXACLTY_ONCE):
140            identifier = await self._identifier_registry.allocate_identifier()
141
142        else:
143            raise ValueError('unsupported QoS')
144
145        try:
146            req = transport.PublishPacket(
147                duplicate=False,
148                qos=self._maximum_qos,
149                retain=msg.retain,
150                topic_name=msg.topic,
151                packet_identifier=identifier,
152                message_expiry_interval=msg.message_expiry_interval,
153                topic_alias=None,
154                response_topic=msg.response_topic,
155                correlation_data=msg.correlation_data,
156                user_properties=msg.user_properties,
157                subscription_identifiers=[],
158                content_type=msg.content_type,
159                payload=msg.payload)
160
161            if msg.qos == common.QoS.AT_MOST_ONCE:
162                await self._conn.send(req)
163                return
164
165            future = self._identifier_registry.create_future(identifier)
166            await self._conn.send(req)
167            res = await aio.wait_for(future, self._response_timeout)
168
169            if msg.qos == common.QoS.AT_LEAST_ONCE:
170                self._assert_res_type(res, transport.PubAckPacket)
171                if common.is_error_reason(res.reason):
172                    raise common.MqttError(res.reason, res.reason_string)
173
174                return
175
176            self._assert_res_type(res, transport.PubRecPacket)
177            if common.is_error_reason(res.reason):
178                raise common.MqttError(res.reason, res.reason_string)
179
180            req = transport.PubRelPacket(packet_identifier=identifier,
181                                         reason=common.Reason.SUCCESS,
182                                         reason_string=None,
183                                         user_properties=[])
184
185            future = self._identifier_registry.create_future(identifier)
186            await self._conn.send(req)
187            res = await aio.wait_for(future, self._response_timeout)
188
189            self._assert_res_type(res, transport.PubCompPacket)
190            if common.is_error_reason(res.reason):
191                raise common.MqttError(res.reason, res.reason_string)
192
193        except asyncio.TimeoutError:
194            mlog.error("response timeout exceeded")
195
196            self._set_disconnect_reason(
197                common.Reason.IMPLEMENTATION_SPECIFIC_ERROR,
198                "response timeout exceeded")
199            self.close()
200
201            raise ConnectionError()
202
203        finally:
204            if identifier is not None:
205                self._identifier_registry.release_identifier(identifier)
206
207    async def subscribe(self,
208                        subscriptions: Collection[common.Subscription]
209                        ) -> Collection[common.Reason]:
210        identifier = await self._identifier_registry.allocate_identifier()
211
212        try:
213            req = transport.SubscribePacket(packet_identifier=identifier,
214                                            subscription_identifier=None,
215                                            user_properties=[],
216                                            subscriptions=subscriptions)
217
218            future = self._identifier_registry.create_future(identifier)
219            await self._conn.send(req)
220            res = await aio.wait_for(future, self._response_timeout)
221
222            self._assert_res_type(res, transport.SubAckPacket)
223            return res.reasons
224
225        except asyncio.TimeoutError:
226            mlog.error("response timeout exceeded")
227
228            self._set_disconnect_reason(
229                common.Reason.IMPLEMENTATION_SPECIFIC_ERROR,
230                "response timeout exceeded")
231            self.close()
232
233            raise ConnectionError()
234
235        finally:
236            self._identifier_registry.release_identifier(identifier)
237
238    async def unsubscribe(self,
239                          topic_filters: Collection[common.String]
240                          ) -> Collection[common.Reason]:
241        identifier = await self._identifier_registry.allocate_identifier()
242
243        try:
244            req = transport.UnsubscribePacket(packet_identifier=identifier,
245                                              user_properties=[],
246                                              topic_filters=topic_filters)
247
248            future = self._identifier_registry.create_future(identifier)
249            await self._conn.send(req)
250            res = await aio.wait_for(future, self._response_timeout)
251
252            self._assert_res_type(res, transport.UnsubAckPacket)
253            return res.reasons
254
255        except asyncio.TimeoutError:
256            mlog.error("response timeout exceeded")
257
258            self._set_disconnect_reason(
259                common.Reason.IMPLEMENTATION_SPECIFIC_ERROR,
260                "response timeout exceeded")
261            self.close()
262
263            raise ConnectionError()
264
265        finally:
266            self._identifier_registry.release_identifier(identifier)
267
268    async def _on_close(self):
269        if self._conn.is_open:
270            with contextlib.suppress(Exception):
271                await self._conn.send(
272                    transport.DisconnectPacket(
273                        reason=self._disconnect_reason,
274                        session_expiry_interval=None,
275                        reason_string=self._disconnect_reason_string,
276                        user_properties=[],
277                        server_reference=None))
278
279        await self._conn.async_close()
280
281    def _set_disconnect_reason(self, reason, reason_string):
282        if common.is_error_reason(self._disconnect_reason):
283            return
284
285        self._disconnect_reason = reason
286        self._disconnect_reason_string = reason_string
287
288    async def _send(self, packet):
289        await self._conn.send(packet)
290        self._sent_event.set()
291
292    def _assert_res_type(self, res, cls):
293        if isinstance(res, cls):
294            return
295
296        mlog.error('received invalid response (expecting %s)', cls)
297
298        self._set_disconnect_reason(common.Reason.PROTOCOL_ERROR,
299                                    'invalid response type')
300        self.close()
301
302        raise ConnectionError()
303
304    async def _receive_loop(self):
305        try:
306            mlog.debug('starting receive loop')
307
308            while True:
309                packet = await self._conn.receive()
310
311                if isinstance(packet, transport.PublishPacket):
312                    await self._process_publish_packet(packet)
313
314                elif isinstance(packet, transport.PubRelPacket):
315                    await self._process_publish_release_packet(packet)
316
317                elif isinstance(packet, (transport.PubAckPacket,
318                                         transport.PubRecPacket,
319                                         transport.PubCompPacket,
320                                         transport.SubAckPacket,
321                                         transport.UnsubAckPacket)):
322                    if not self._identifier_registry.register_packet(packet):
323                        mlog.warning('packet identifier not expected - '
324                                     'dropping packet')
325
326                elif isinstance(packet, transport.PingResPacket):
327                    if self._ping_future and not self._ping_future.done():
328                        self._ping_future.set_result(None)
329
330                elif isinstance(packet, transport.DisconnectPacket):
331                    mlog.debug('received disconnect packet: %s: %s',
332                               packet.reason, packet.reason_string)
333
334                    self._conn.close()
335                    break
336
337                elif isinstance(packet, transport.AuthPacket):
338                    raise common.MqttError(
339                        common.Reason.IMPLEMENTATION_SPECIFIC_ERROR,
340                        'auth packet not supported')
341
342                else:
343                    raise common.MqttError(common.Reason.PROTOCOL_ERROR,
344                                           'unexpected packet')
345
346        except ConnectionError:
347            pass
348
349        except common.MqttError as e:
350            mlog.error('receive loop mqtt error: %s', e, exc_info=e)
351
352            self._set_disconnect_reason(e.reason, e.description)
353
354        except Exception as e:
355            mlog.error('receive loop error: %s', e, exc_info=e)
356
357            self._set_disconnect_reason(common.Reason.UNSPECIFIED_ERROR, None)
358
359        finally:
360            mlog.debug('stopping receive loop')
361
362            self.close()
363
364    async def _ping_loop(self):
365        try:
366            mlog.debug('starting ping loop')
367
368            while True:
369                self._sent_event.clear()
370
371                with contextlib.suppress(asyncio.TimeoutError):
372                    await aio.wait_for(self._sent_event.wait(),
373                                       self._keep_alive)
374                    continue
375
376                self._ping_future = self._loop.create_future()
377                await self._conn.send(transport.PingReqPacket())
378
379                await aio.wait_for(self._ping_future, self._response_timeout)
380
381        except ConnectionError:
382            pass
383
384        except asyncio.TimeoutError:
385            mlog.error('ping response timeout')
386
387            self._set_disconnect_reason(
388                common.Reason.IMPLEMENTATION_SPECIFIC_ERROR,
389                "response timeout exceeded")
390
391        except Exception as e:
392            mlog.error('ping loop error: %s', e, exc_info=e)
393
394            self._set_disconnect_reason(common.Reason.UNSPECIFIED_ERROR, None)
395
396        finally:
397            mlog.debug('stopping ping loop')
398
399            self.close()
400
401    async def _process_publish_packet(self, packet):
402        if self._msg_cb:
403            msg = Msg(
404                topic=packet.topic_name,
405                payload=packet.payload,
406                qos=packet.qos,
407                retain=packet.retain,
408                message_expiry_interval=packet.message_expiry_interval,
409                response_topic=packet.response_topic,
410                correlation_data=packet.correlation_data,
411                user_properties=packet.user_properties,
412                content_type=packet.content_type)
413
414            await aio.call(self._msg_cb, self, msg)
415
416        if msg.qos == common.QoS.AT_MOST_ONCE:
417            return
418
419        if msg.qos == common.QoS.AT_LEAST_ONCE:
420            res = transport.PubAckPacket(
421                packet_identifier=packet.packet_identifier,
422                reason=common.Reason.SUCCESS,
423                reason_string=None,
424                user_properties=[])
425
426            await self._send(res)
427            return
428
429        if msg.qos != common.QoS.EXACLTY_ONCE:
430            raise ValueError('unsupported QoS')
431
432        req = transport.PubRecPacket(
433            packet_identifier=packet.packet_identifier,
434            reason=common.Reason.SUCCESS,
435            reason_string=None,
436            user_properties=[])
437
438        await self._conn.send(req)
439
440    async def _process_publish_release_packet(self, packet):
441
442        # TODO should remember previous publish packet and check response
443        #      timeout
444
445        if common.is_error_reason(packet.reason):
446            mlog.warning('publish release error: %s: %s',
447                         packet.reason, packet.reason_string)
448            return
449
450        req = transport.PubCompPacket(
451            packet_identifier=packet.packet_identifier,
452            reason=common.Reason.SUCCESS,
453            reason_string=None,
454            user_properties=[])
455
456        await self._conn.send(req)

Resource with lifetime control based on Group.

async_group: hat.aio.group.Group
111    @property
112    def async_group(self) -> aio.Group:
113        return self._async_group

Group controlling resource's lifetime.

info: hat.drivers.tcp.ConnectionInfo
115    @property
116    def info(self) -> tcp.ConnectionInfo:
117        return self._conn.info
maximum_qos: QoS
119    @property
120    def maximum_qos(self) -> common.QoS:
121        return self._maximum_qos
client_id: str
123    @property
124    def client_id(self) -> common.String:
125        return self._client_id
response_information: str | None
127    @property
128    def response_information(self) -> common.String | None:
129        return self._response_information
async def publish(self, msg: Msg):
131    async def publish(self, msg: Msg):
132        if msg.qos.value > self._maximum_qos.value:
133            raise Exception(f'maximum supported QoS is {self._maximum_qos}')
134
135        if msg.qos == common.QoS.AT_MOST_ONCE:
136            identifier = None
137
138        elif msg.qos in (common.QoS.AT_LEAST_ONCE,
139                         common.QoS.EXACLTY_ONCE):
140            identifier = await self._identifier_registry.allocate_identifier()
141
142        else:
143            raise ValueError('unsupported QoS')
144
145        try:
146            req = transport.PublishPacket(
147                duplicate=False,
148                qos=self._maximum_qos,
149                retain=msg.retain,
150                topic_name=msg.topic,
151                packet_identifier=identifier,
152                message_expiry_interval=msg.message_expiry_interval,
153                topic_alias=None,
154                response_topic=msg.response_topic,
155                correlation_data=msg.correlation_data,
156                user_properties=msg.user_properties,
157                subscription_identifiers=[],
158                content_type=msg.content_type,
159                payload=msg.payload)
160
161            if msg.qos == common.QoS.AT_MOST_ONCE:
162                await self._conn.send(req)
163                return
164
165            future = self._identifier_registry.create_future(identifier)
166            await self._conn.send(req)
167            res = await aio.wait_for(future, self._response_timeout)
168
169            if msg.qos == common.QoS.AT_LEAST_ONCE:
170                self._assert_res_type(res, transport.PubAckPacket)
171                if common.is_error_reason(res.reason):
172                    raise common.MqttError(res.reason, res.reason_string)
173
174                return
175
176            self._assert_res_type(res, transport.PubRecPacket)
177            if common.is_error_reason(res.reason):
178                raise common.MqttError(res.reason, res.reason_string)
179
180            req = transport.PubRelPacket(packet_identifier=identifier,
181                                         reason=common.Reason.SUCCESS,
182                                         reason_string=None,
183                                         user_properties=[])
184
185            future = self._identifier_registry.create_future(identifier)
186            await self._conn.send(req)
187            res = await aio.wait_for(future, self._response_timeout)
188
189            self._assert_res_type(res, transport.PubCompPacket)
190            if common.is_error_reason(res.reason):
191                raise common.MqttError(res.reason, res.reason_string)
192
193        except asyncio.TimeoutError:
194            mlog.error("response timeout exceeded")
195
196            self._set_disconnect_reason(
197                common.Reason.IMPLEMENTATION_SPECIFIC_ERROR,
198                "response timeout exceeded")
199            self.close()
200
201            raise ConnectionError()
202
203        finally:
204            if identifier is not None:
205                self._identifier_registry.release_identifier(identifier)
async def subscribe( self, subscriptions: Collection[Subscription]) -> Collection[Reason]:
207    async def subscribe(self,
208                        subscriptions: Collection[common.Subscription]
209                        ) -> Collection[common.Reason]:
210        identifier = await self._identifier_registry.allocate_identifier()
211
212        try:
213            req = transport.SubscribePacket(packet_identifier=identifier,
214                                            subscription_identifier=None,
215                                            user_properties=[],
216                                            subscriptions=subscriptions)
217
218            future = self._identifier_registry.create_future(identifier)
219            await self._conn.send(req)
220            res = await aio.wait_for(future, self._response_timeout)
221
222            self._assert_res_type(res, transport.SubAckPacket)
223            return res.reasons
224
225        except asyncio.TimeoutError:
226            mlog.error("response timeout exceeded")
227
228            self._set_disconnect_reason(
229                common.Reason.IMPLEMENTATION_SPECIFIC_ERROR,
230                "response timeout exceeded")
231            self.close()
232
233            raise ConnectionError()
234
235        finally:
236            self._identifier_registry.release_identifier(identifier)
async def unsubscribe( self, topic_filters: Collection[str]) -> Collection[Reason]:
238    async def unsubscribe(self,
239                          topic_filters: Collection[common.String]
240                          ) -> Collection[common.Reason]:
241        identifier = await self._identifier_registry.allocate_identifier()
242
243        try:
244            req = transport.UnsubscribePacket(packet_identifier=identifier,
245                                              user_properties=[],
246                                              topic_filters=topic_filters)
247
248            future = self._identifier_registry.create_future(identifier)
249            await self._conn.send(req)
250            res = await aio.wait_for(future, self._response_timeout)
251
252            self._assert_res_type(res, transport.UnsubAckPacket)
253            return res.reasons
254
255        except asyncio.TimeoutError:
256            mlog.error("response timeout exceeded")
257
258            self._set_disconnect_reason(
259                common.Reason.IMPLEMENTATION_SPECIFIC_ERROR,
260                "response timeout exceeded")
261            self.close()
262
263            raise ConnectionError()
264
265        finally:
266            self._identifier_registry.release_identifier(identifier)
UInt32 = <class 'int'>
String = <class 'str'>
Binary = bytes | bytearray | memoryview
class QoS(enum.Enum):
27class QoS(enum.Enum):
28    AT_MOST_ONCE = 0
29    AT_LEAST_ONCE = 1
30    EXACLTY_ONCE = 2

An enumeration.

AT_MOST_ONCE = <QoS.AT_MOST_ONCE: 0>
AT_LEAST_ONCE = <QoS.AT_LEAST_ONCE: 1>
EXACLTY_ONCE = <QoS.EXACLTY_ONCE: 2>
class RetainHandling(enum.Enum):
33class RetainHandling(enum.Enum):
34    SEND_ON_SUBSCRIBE = 0
35    SEND_ON_NEW_SUBSCRIBE = 1
36    DONT_SEND = 2

An enumeration.

SEND_ON_SUBSCRIBE = <RetainHandling.SEND_ON_SUBSCRIBE: 0>
SEND_ON_NEW_SUBSCRIBE = <RetainHandling.SEND_ON_NEW_SUBSCRIBE: 1>
DONT_SEND = <RetainHandling.DONT_SEND: 2>
class Reason(enum.Enum):
39class Reason(enum.Enum):
40    SUCCESS = 0x00
41    GRANTED_QOS_1 = 0x01
42    GRANTED_QOS_2 = 0x02
43    DISCONNECT_WITH_WILL_MESSAGE = 0x04
44    NO_MATCHING_SUBSCRIBERS = 0x10
45    NO_SUBSCRIPTION_EXISTED = 0x11
46    CONTINUE_AUTHENTICATION = 0x24
47    RE_AUTHENTICATE = 0x25
48    UNSPECIFIED_ERROR = 0x80
49    MALFORMED_PACKET = 0x81
50    PROTOCOL_ERROR = 0x82
51    IMPLEMENTATION_SPECIFIC_ERROR = 0x83
52    UNSUPPORTED_PROTOCOL_VERSION = 0x84
53    CLIENT_IDENTIFIER_NOT_VALID = 0x85
54    BAD_USER_NAME_OR_PASSWORD = 0x86
55    NOT_AUTHORIZED = 0x87
56    SERVER_UNAVAILABLE = 0x88
57    SERVER_BUSY = 0x89
58    BANNED = 0x8a
59    SERVER_SHUTTING_DOWN = 0x8b
60    BAD_AUTHENTICATION_METHOD = 0x8c
61    KEEP_ALIVE_TIMEOUT = 0x8d
62    SESSION_TAKEN_OVER = 0x8e
63    TOPIC_FILTER_INVALID = 0x8f
64    TOPIC_NAME_INVALID = 0x90
65    PACKET_IDENTIFIER_IN_USE = 0x91
66    PACKET_IDENTIFIER_NOT_FOUND = 0x92
67    RECEIVE_MAXIMUM_EXCEEDED = 0x93
68    TOPIC_ALIAS_INVALID = 0x94
69    PACKET_TOO_LARGE = 0x95
70    MESSAGE_RATE_TOO_HIGH = 0x96
71    QUOTA_EXCEEDED = 0x97
72    ADMINISTRATIVE_ACTION = 0x98
73    PAYLOAD_FORMAT_INVALID = 0x99
74    RETAIN_NOT_SUPPORTED = 0x9a
75    QOS_NOT_SUPPORTED = 0x9b
76    USE_ANOTHER_SERVER = 0x9c
77    SERVER_MOVED = 0x9d
78    SHARED_SUBSCRIPTIONS_NOT_SUPPORTED = 0x9e
79    CONNECTION_RATE_EXCEEDED = 0x9f
80    MAXIMUM_CONNECT_TIME = 0xa0
81    SUBSCRIPTION_IDENTIFIERS_NOT_SUPPORTED = 0xa1
82    WILDCARD_SUBSCRIPTIONS_NOT_SUPPORTED = 0xa2

An enumeration.

SUCCESS = <Reason.SUCCESS: 0>
GRANTED_QOS_1 = <Reason.GRANTED_QOS_1: 1>
GRANTED_QOS_2 = <Reason.GRANTED_QOS_2: 2>
DISCONNECT_WITH_WILL_MESSAGE = <Reason.DISCONNECT_WITH_WILL_MESSAGE: 4>
NO_MATCHING_SUBSCRIBERS = <Reason.NO_MATCHING_SUBSCRIBERS: 16>
NO_SUBSCRIPTION_EXISTED = <Reason.NO_SUBSCRIPTION_EXISTED: 17>
CONTINUE_AUTHENTICATION = <Reason.CONTINUE_AUTHENTICATION: 36>
RE_AUTHENTICATE = <Reason.RE_AUTHENTICATE: 37>
UNSPECIFIED_ERROR = <Reason.UNSPECIFIED_ERROR: 128>
MALFORMED_PACKET = <Reason.MALFORMED_PACKET: 129>
PROTOCOL_ERROR = <Reason.PROTOCOL_ERROR: 130>
IMPLEMENTATION_SPECIFIC_ERROR = <Reason.IMPLEMENTATION_SPECIFIC_ERROR: 131>
UNSUPPORTED_PROTOCOL_VERSION = <Reason.UNSUPPORTED_PROTOCOL_VERSION: 132>
CLIENT_IDENTIFIER_NOT_VALID = <Reason.CLIENT_IDENTIFIER_NOT_VALID: 133>
BAD_USER_NAME_OR_PASSWORD = <Reason.BAD_USER_NAME_OR_PASSWORD: 134>
NOT_AUTHORIZED = <Reason.NOT_AUTHORIZED: 135>
SERVER_UNAVAILABLE = <Reason.SERVER_UNAVAILABLE: 136>
SERVER_BUSY = <Reason.SERVER_BUSY: 137>
BANNED = <Reason.BANNED: 138>
SERVER_SHUTTING_DOWN = <Reason.SERVER_SHUTTING_DOWN: 139>
BAD_AUTHENTICATION_METHOD = <Reason.BAD_AUTHENTICATION_METHOD: 140>
KEEP_ALIVE_TIMEOUT = <Reason.KEEP_ALIVE_TIMEOUT: 141>
SESSION_TAKEN_OVER = <Reason.SESSION_TAKEN_OVER: 142>
TOPIC_FILTER_INVALID = <Reason.TOPIC_FILTER_INVALID: 143>
TOPIC_NAME_INVALID = <Reason.TOPIC_NAME_INVALID: 144>
PACKET_IDENTIFIER_IN_USE = <Reason.PACKET_IDENTIFIER_IN_USE: 145>
PACKET_IDENTIFIER_NOT_FOUND = <Reason.PACKET_IDENTIFIER_NOT_FOUND: 146>
RECEIVE_MAXIMUM_EXCEEDED = <Reason.RECEIVE_MAXIMUM_EXCEEDED: 147>
TOPIC_ALIAS_INVALID = <Reason.TOPIC_ALIAS_INVALID: 148>
PACKET_TOO_LARGE = <Reason.PACKET_TOO_LARGE: 149>
MESSAGE_RATE_TOO_HIGH = <Reason.MESSAGE_RATE_TOO_HIGH: 150>
QUOTA_EXCEEDED = <Reason.QUOTA_EXCEEDED: 151>
ADMINISTRATIVE_ACTION = <Reason.ADMINISTRATIVE_ACTION: 152>
PAYLOAD_FORMAT_INVALID = <Reason.PAYLOAD_FORMAT_INVALID: 153>
RETAIN_NOT_SUPPORTED = <Reason.RETAIN_NOT_SUPPORTED: 154>
QOS_NOT_SUPPORTED = <Reason.QOS_NOT_SUPPORTED: 155>
USE_ANOTHER_SERVER = <Reason.USE_ANOTHER_SERVER: 156>
SERVER_MOVED = <Reason.SERVER_MOVED: 157>
SHARED_SUBSCRIPTIONS_NOT_SUPPORTED = <Reason.SHARED_SUBSCRIPTIONS_NOT_SUPPORTED: 158>
CONNECTION_RATE_EXCEEDED = <Reason.CONNECTION_RATE_EXCEEDED: 159>
MAXIMUM_CONNECT_TIME = <Reason.MAXIMUM_CONNECT_TIME: 160>
SUBSCRIPTION_IDENTIFIERS_NOT_SUPPORTED = <Reason.SUBSCRIPTION_IDENTIFIERS_NOT_SUPPORTED: 161>
WILDCARD_SUBSCRIPTIONS_NOT_SUPPORTED = <Reason.WILDCARD_SUBSCRIPTIONS_NOT_SUPPORTED: 162>
class MqttError(builtins.Exception):
 85class MqttError(Exception):
 86
 87    def __init__(self,
 88                 reason: Reason,
 89                 description: str | None):
 90        super().__init__(reason, description)
 91        self._reason = reason
 92        self._description = description
 93
 94    @property
 95    def reason(self) -> Reason:
 96        return self._reason
 97
 98    @property
 99    def description(self) -> str | None:
100        return self._description

Common base class for all non-exit exceptions.

MqttError(reason: Reason, description: str | None)
87    def __init__(self,
88                 reason: Reason,
89                 description: str | None):
90        super().__init__(reason, description)
91        self._reason = reason
92        self._description = description
reason: Reason
94    @property
95    def reason(self) -> Reason:
96        return self._reason
description: str | None
 98    @property
 99    def description(self) -> str | None:
100        return self._description
class Subscription(typing.NamedTuple):
103class Subscription(typing.NamedTuple):
104    topic_filter: String
105    maximum_qos: QoS
106    no_local: bool
107    retain_as_published: bool
108    retain_handling: RetainHandling

Subscription(topic_filter, maximum_qos, no_local, retain_as_published, retain_handling)

Subscription( topic_filter: str, maximum_qos: QoS, no_local: bool, retain_as_published: bool, retain_handling: RetainHandling)

Create new instance of Subscription(topic_filter, maximum_qos, no_local, retain_as_published, retain_handling)

topic_filter: str

Alias for field number 0

maximum_qos: QoS

Alias for field number 1

no_local: bool

Alias for field number 2

retain_as_published: bool

Alias for field number 3

retain_handling: RetainHandling

Alias for field number 4

def is_error_reason(reason: Reason) -> bool:
111def is_error_reason(reason: Reason) -> bool:
112    return reason.value >= 0x80