hat.drivers.mqtt
1from hat.drivers.mqtt.client import (Msg, 2 MsgCb, 3 connect, 4 Client) 5from hat.drivers.mqtt.common import (UInt32, 6 String, 7 Binary, 8 QoS, 9 RetainHandling, 10 Reason, 11 MqttError, 12 Subscription, 13 is_error_reason) 14 15 16__all__ = ['Msg', 17 'MsgCb', 18 'connect', 19 'Client', 20 'UInt32', 21 'String', 22 'Binary', 23 'QoS', 24 'RetainHandling', 25 'Reason', 26 'MqttError', 27 'Subscription', 28 'is_error_reason']
class
Msg(typing.NamedTuple):
21class Msg(typing.NamedTuple): 22 topic: common.String 23 payload: str | util.Bytes 24 qos: common.QoS = common.QoS.AT_MOST_ONCE 25 retain: bool = True 26 message_expiry_interval: common.UInt32 | None = None 27 response_topic: common.String | None = None 28 correlation_data: common.Binary | None = None 29 user_properties: Collection[tuple[common.String, common.String]] = [] 30 content_type: common.String | None = None
Msg(topic, payload, qos, retain, message_expiry_interval, response_topic, correlation_data, user_properties, content_type)
Msg( topic: str, payload: str | bytes | bytearray | memoryview, qos: QoS = <QoS.AT_MOST_ONCE: 0>, retain: bool = True, message_expiry_interval: int | None = None, response_topic: str | None = None, correlation_data: bytes | bytearray | memoryview | None = None, user_properties: Collection[tuple[str, str]] = [], content_type: str | None = None)
Create new instance of Msg(topic, payload, qos, retain, message_expiry_interval, response_topic, correlation_data, user_properties, content_type)
MsgCb =
typing.Callable[[ForwardRef('Client'), Msg], None | collections.abc.Awaitable[None]]
async def
connect( addr: hat.drivers.tcp.Address, msg_cb: Optional[Callable[[Client, Msg], None | Awaitable[None]]] = None, will_msg: Msg | None = None, will_delay: int = 0, client_id: str = '', user_name: str | None = None, password: bytes | bytearray | memoryview | None = None, ping_delay: float | None = None, response_timeout: float = 30, **kwargs) -> Client:
36async def connect(addr: tcp.Address, 37 msg_cb: MsgCb | None = None, 38 will_msg: Msg | None = None, 39 will_delay: common.UInt32 = 0, 40 client_id: common.String = '', 41 user_name: common.String | None = None, 42 password: common.Binary | None = None, 43 ping_delay: float | None = None, 44 response_timeout: float = 30, 45 **kwargs 46 ) -> 'Client': 47 conn = await transport.connect(addr, **kwargs) 48 49 try: 50 req = _create_connect_packet(will_msg=will_msg, 51 will_delay=will_delay, 52 ping_delay=ping_delay, 53 client_id=client_id, 54 user_name=user_name, 55 password=password) 56 await conn.send(req) 57 58 res = await aio.wait_for(conn.receive(), response_timeout) 59 60 if isinstance(res, transport.DisconnectPacket): 61 raise common.MqttError(res.reason, res.reason_string) 62 63 if not isinstance(res, transport.ConnAckPacket): 64 raise Exception('unexpected response') 65 66 if res.reason != common.Reason.SUCCESS: 67 raise common.MqttError(res.reason, res.reason_string) 68 69 except BaseException: 70 await aio.uncancellable(conn.async_close()) 71 raise 72 73 client = Client() 74 client._msg_cb = msg_cb 75 client._response_timeout = response_timeout 76 client._loop = asyncio.get_running_loop() 77 client._async_group = aio.Group() 78 client._conn = conn 79 client._sent_event = asyncio.Event() 80 client._ping_future = None 81 client._disconnect_reason = common.Reason.SUCCESS 82 client._disconnect_reason_string = None 83 client._log = _create_logger(conn.info) 84 85 client._maximum_qos = res.maximum_qos 86 client._client_id = (res.assigned_client_identifier 87 if res.assigned_client_identifier is not None 88 else client_id) 89 client._keep_alive = (req.keep_alive if res.server_keep_alive is None 90 else res.server_keep_alive) 91 client._response_information = res.response_information 92 93 try: 94 client.async_group.spawn(aio.call_on_cancel, client._on_close) 95 96 client._identifier_registry = _IdentifierRegistry( 97 client.async_group.create_subgroup()) 98 99 client.async_group.spawn(client._receive_loop) 100 if client._keep_alive > 0: 101 client.async_group.spawn(client._ping_loop) 102 103 except BaseException: 104 await aio.uncancellable(client.async_close()) 105 raise 106 107 return client
class
Client(hat.aio.group.Resource):
110class Client(aio.Resource): 111 112 @property 113 def async_group(self) -> aio.Group: 114 return self._async_group 115 116 @property 117 def info(self) -> tcp.ConnectionInfo: 118 return self._conn.info 119 120 @property 121 def maximum_qos(self) -> common.QoS: 122 return self._maximum_qos 123 124 @property 125 def client_id(self) -> common.String: 126 return self._client_id 127 128 @property 129 def response_information(self) -> common.String | None: 130 return self._response_information 131 132 async def publish(self, msg: Msg): 133 if msg.qos.value > self._maximum_qos.value: 134 raise Exception(f'maximum supported QoS is {self._maximum_qos}') 135 136 if msg.qos == common.QoS.AT_MOST_ONCE: 137 identifier = None 138 139 elif msg.qos in (common.QoS.AT_LEAST_ONCE, 140 common.QoS.EXACLTY_ONCE): 141 identifier = await self._identifier_registry.allocate_identifier() 142 143 else: 144 raise ValueError('unsupported QoS') 145 146 try: 147 req = transport.PublishPacket( 148 duplicate=False, 149 qos=self._maximum_qos, 150 retain=msg.retain, 151 topic_name=msg.topic, 152 packet_identifier=identifier, 153 message_expiry_interval=msg.message_expiry_interval, 154 topic_alias=None, 155 response_topic=msg.response_topic, 156 correlation_data=msg.correlation_data, 157 user_properties=msg.user_properties, 158 subscription_identifiers=[], 159 content_type=msg.content_type, 160 payload=msg.payload) 161 162 if msg.qos == common.QoS.AT_MOST_ONCE: 163 await self._conn.send(req) 164 return 165 166 future = self._identifier_registry.create_future(identifier) 167 await self._conn.send(req) 168 res = await aio.wait_for(future, self._response_timeout) 169 170 if msg.qos == common.QoS.AT_LEAST_ONCE: 171 self._assert_res_type(res, transport.PubAckPacket) 172 if common.is_error_reason(res.reason): 173 raise common.MqttError(res.reason, res.reason_string) 174 175 return 176 177 self._assert_res_type(res, transport.PubRecPacket) 178 if common.is_error_reason(res.reason): 179 raise common.MqttError(res.reason, res.reason_string) 180 181 req = transport.PubRelPacket(packet_identifier=identifier, 182 reason=common.Reason.SUCCESS, 183 reason_string=None, 184 user_properties=[]) 185 186 future = self._identifier_registry.create_future(identifier) 187 await self._conn.send(req) 188 res = await aio.wait_for(future, self._response_timeout) 189 190 self._assert_res_type(res, transport.PubCompPacket) 191 if common.is_error_reason(res.reason): 192 raise common.MqttError(res.reason, res.reason_string) 193 194 except asyncio.TimeoutError: 195 self._log.error("response timeout exceeded") 196 197 self._set_disconnect_reason( 198 common.Reason.IMPLEMENTATION_SPECIFIC_ERROR, 199 "response timeout exceeded") 200 self.close() 201 202 raise ConnectionError() 203 204 finally: 205 if identifier is not None: 206 self._identifier_registry.release_identifier(identifier) 207 208 async def subscribe(self, 209 subscriptions: Collection[common.Subscription] 210 ) -> Collection[common.Reason]: 211 identifier = await self._identifier_registry.allocate_identifier() 212 213 try: 214 req = transport.SubscribePacket(packet_identifier=identifier, 215 subscription_identifier=None, 216 user_properties=[], 217 subscriptions=subscriptions) 218 219 future = self._identifier_registry.create_future(identifier) 220 await self._conn.send(req) 221 res = await aio.wait_for(future, self._response_timeout) 222 223 self._assert_res_type(res, transport.SubAckPacket) 224 return res.reasons 225 226 except asyncio.TimeoutError: 227 self._log.error("response timeout exceeded") 228 229 self._set_disconnect_reason( 230 common.Reason.IMPLEMENTATION_SPECIFIC_ERROR, 231 "response timeout exceeded") 232 self.close() 233 234 raise ConnectionError() 235 236 finally: 237 self._identifier_registry.release_identifier(identifier) 238 239 async def unsubscribe(self, 240 topic_filters: Collection[common.String] 241 ) -> Collection[common.Reason]: 242 identifier = await self._identifier_registry.allocate_identifier() 243 244 try: 245 req = transport.UnsubscribePacket(packet_identifier=identifier, 246 user_properties=[], 247 topic_filters=topic_filters) 248 249 future = self._identifier_registry.create_future(identifier) 250 await self._conn.send(req) 251 res = await aio.wait_for(future, self._response_timeout) 252 253 self._assert_res_type(res, transport.UnsubAckPacket) 254 return res.reasons 255 256 except asyncio.TimeoutError: 257 self._log.error("response timeout exceeded") 258 259 self._set_disconnect_reason( 260 common.Reason.IMPLEMENTATION_SPECIFIC_ERROR, 261 "response timeout exceeded") 262 self.close() 263 264 raise ConnectionError() 265 266 finally: 267 self._identifier_registry.release_identifier(identifier) 268 269 async def _on_close(self): 270 if self._conn.is_open: 271 with contextlib.suppress(Exception): 272 await self._conn.send( 273 transport.DisconnectPacket( 274 reason=self._disconnect_reason, 275 session_expiry_interval=None, 276 reason_string=self._disconnect_reason_string, 277 user_properties=[], 278 server_reference=None)) 279 280 await self._conn.async_close() 281 282 def _set_disconnect_reason(self, reason, reason_string): 283 if common.is_error_reason(self._disconnect_reason): 284 return 285 286 self._disconnect_reason = reason 287 self._disconnect_reason_string = reason_string 288 289 async def _send(self, packet): 290 await self._conn.send(packet) 291 self._sent_event.set() 292 293 def _assert_res_type(self, res, cls): 294 if isinstance(res, cls): 295 return 296 297 self._log.error('received invalid response (expecting %s)', cls) 298 299 self._set_disconnect_reason(common.Reason.PROTOCOL_ERROR, 300 'invalid response type') 301 self.close() 302 303 raise ConnectionError() 304 305 async def _receive_loop(self): 306 try: 307 self._log.debug('starting receive loop') 308 309 while True: 310 packet = await self._conn.receive() 311 312 if isinstance(packet, transport.PublishPacket): 313 await self._process_publish_packet(packet) 314 315 elif isinstance(packet, transport.PubRelPacket): 316 await self._process_publish_release_packet(packet) 317 318 elif isinstance(packet, (transport.PubAckPacket, 319 transport.PubRecPacket, 320 transport.PubCompPacket, 321 transport.SubAckPacket, 322 transport.UnsubAckPacket)): 323 if not self._identifier_registry.register_packet(packet): 324 self._log.warning('packet identifier not expected - ' 325 'dropping packet') 326 327 elif isinstance(packet, transport.PingResPacket): 328 if self._ping_future and not self._ping_future.done(): 329 self._ping_future.set_result(None) 330 331 elif isinstance(packet, transport.DisconnectPacket): 332 self._log.debug('received disconnect packet: %s: %s', 333 packet.reason, packet.reason_string) 334 335 self._conn.close() 336 break 337 338 elif isinstance(packet, transport.AuthPacket): 339 raise common.MqttError( 340 common.Reason.IMPLEMENTATION_SPECIFIC_ERROR, 341 'auth packet not supported') 342 343 else: 344 raise common.MqttError(common.Reason.PROTOCOL_ERROR, 345 'unexpected packet') 346 347 except ConnectionError: 348 pass 349 350 except common.MqttError as e: 351 self._log.error('receive loop mqtt error: %s', e, exc_info=e) 352 353 self._set_disconnect_reason(e.reason, e.description) 354 355 except Exception as e: 356 self._log.error('receive loop error: %s', e, exc_info=e) 357 358 self._set_disconnect_reason(common.Reason.UNSPECIFIED_ERROR, None) 359 360 finally: 361 self._log.debug('stopping receive loop') 362 363 self.close() 364 365 async def _ping_loop(self): 366 try: 367 self._log.debug('starting ping loop') 368 369 while True: 370 self._sent_event.clear() 371 372 with contextlib.suppress(asyncio.TimeoutError): 373 await aio.wait_for(self._sent_event.wait(), 374 self._keep_alive) 375 continue 376 377 self._ping_future = self._loop.create_future() 378 await self._conn.send(transport.PingReqPacket()) 379 380 await aio.wait_for(self._ping_future, self._response_timeout) 381 382 except ConnectionError: 383 pass 384 385 except asyncio.TimeoutError: 386 self._log.error('ping response timeout') 387 388 self._set_disconnect_reason( 389 common.Reason.IMPLEMENTATION_SPECIFIC_ERROR, 390 "response timeout exceeded") 391 392 except Exception as e: 393 self._log.error('ping loop error: %s', e, exc_info=e) 394 395 self._set_disconnect_reason(common.Reason.UNSPECIFIED_ERROR, None) 396 397 finally: 398 self._log.debug('stopping ping loop') 399 400 self.close() 401 402 async def _process_publish_packet(self, packet): 403 if self._msg_cb: 404 msg = Msg( 405 topic=packet.topic_name, 406 payload=packet.payload, 407 qos=packet.qos, 408 retain=packet.retain, 409 message_expiry_interval=packet.message_expiry_interval, 410 response_topic=packet.response_topic, 411 correlation_data=packet.correlation_data, 412 user_properties=packet.user_properties, 413 content_type=packet.content_type) 414 415 await aio.call(self._msg_cb, self, msg) 416 417 if msg.qos == common.QoS.AT_MOST_ONCE: 418 return 419 420 if msg.qos == common.QoS.AT_LEAST_ONCE: 421 res = transport.PubAckPacket( 422 packet_identifier=packet.packet_identifier, 423 reason=common.Reason.SUCCESS, 424 reason_string=None, 425 user_properties=[]) 426 427 await self._send(res) 428 return 429 430 if msg.qos != common.QoS.EXACLTY_ONCE: 431 raise ValueError('unsupported QoS') 432 433 req = transport.PubRecPacket( 434 packet_identifier=packet.packet_identifier, 435 reason=common.Reason.SUCCESS, 436 reason_string=None, 437 user_properties=[]) 438 439 await self._conn.send(req) 440 441 async def _process_publish_release_packet(self, packet): 442 443 # TODO should remember previous publish packet and check response 444 # timeout 445 446 if common.is_error_reason(packet.reason): 447 self._log.warning('publish release error: %s: %s', 448 packet.reason, packet.reason_string) 449 return 450 451 req = transport.PubCompPacket( 452 packet_identifier=packet.packet_identifier, 453 reason=common.Reason.SUCCESS, 454 reason_string=None, 455 user_properties=[]) 456 457 await self._conn.send(req)
Resource with lifetime control based on Group.
maximum_qos: QoS
132 async def publish(self, msg: Msg): 133 if msg.qos.value > self._maximum_qos.value: 134 raise Exception(f'maximum supported QoS is {self._maximum_qos}') 135 136 if msg.qos == common.QoS.AT_MOST_ONCE: 137 identifier = None 138 139 elif msg.qos in (common.QoS.AT_LEAST_ONCE, 140 common.QoS.EXACLTY_ONCE): 141 identifier = await self._identifier_registry.allocate_identifier() 142 143 else: 144 raise ValueError('unsupported QoS') 145 146 try: 147 req = transport.PublishPacket( 148 duplicate=False, 149 qos=self._maximum_qos, 150 retain=msg.retain, 151 topic_name=msg.topic, 152 packet_identifier=identifier, 153 message_expiry_interval=msg.message_expiry_interval, 154 topic_alias=None, 155 response_topic=msg.response_topic, 156 correlation_data=msg.correlation_data, 157 user_properties=msg.user_properties, 158 subscription_identifiers=[], 159 content_type=msg.content_type, 160 payload=msg.payload) 161 162 if msg.qos == common.QoS.AT_MOST_ONCE: 163 await self._conn.send(req) 164 return 165 166 future = self._identifier_registry.create_future(identifier) 167 await self._conn.send(req) 168 res = await aio.wait_for(future, self._response_timeout) 169 170 if msg.qos == common.QoS.AT_LEAST_ONCE: 171 self._assert_res_type(res, transport.PubAckPacket) 172 if common.is_error_reason(res.reason): 173 raise common.MqttError(res.reason, res.reason_string) 174 175 return 176 177 self._assert_res_type(res, transport.PubRecPacket) 178 if common.is_error_reason(res.reason): 179 raise common.MqttError(res.reason, res.reason_string) 180 181 req = transport.PubRelPacket(packet_identifier=identifier, 182 reason=common.Reason.SUCCESS, 183 reason_string=None, 184 user_properties=[]) 185 186 future = self._identifier_registry.create_future(identifier) 187 await self._conn.send(req) 188 res = await aio.wait_for(future, self._response_timeout) 189 190 self._assert_res_type(res, transport.PubCompPacket) 191 if common.is_error_reason(res.reason): 192 raise common.MqttError(res.reason, res.reason_string) 193 194 except asyncio.TimeoutError: 195 self._log.error("response timeout exceeded") 196 197 self._set_disconnect_reason( 198 common.Reason.IMPLEMENTATION_SPECIFIC_ERROR, 199 "response timeout exceeded") 200 self.close() 201 202 raise ConnectionError() 203 204 finally: 205 if identifier is not None: 206 self._identifier_registry.release_identifier(identifier)
208 async def subscribe(self, 209 subscriptions: Collection[common.Subscription] 210 ) -> Collection[common.Reason]: 211 identifier = await self._identifier_registry.allocate_identifier() 212 213 try: 214 req = transport.SubscribePacket(packet_identifier=identifier, 215 subscription_identifier=None, 216 user_properties=[], 217 subscriptions=subscriptions) 218 219 future = self._identifier_registry.create_future(identifier) 220 await self._conn.send(req) 221 res = await aio.wait_for(future, self._response_timeout) 222 223 self._assert_res_type(res, transport.SubAckPacket) 224 return res.reasons 225 226 except asyncio.TimeoutError: 227 self._log.error("response timeout exceeded") 228 229 self._set_disconnect_reason( 230 common.Reason.IMPLEMENTATION_SPECIFIC_ERROR, 231 "response timeout exceeded") 232 self.close() 233 234 raise ConnectionError() 235 236 finally: 237 self._identifier_registry.release_identifier(identifier)
239 async def unsubscribe(self, 240 topic_filters: Collection[common.String] 241 ) -> Collection[common.Reason]: 242 identifier = await self._identifier_registry.allocate_identifier() 243 244 try: 245 req = transport.UnsubscribePacket(packet_identifier=identifier, 246 user_properties=[], 247 topic_filters=topic_filters) 248 249 future = self._identifier_registry.create_future(identifier) 250 await self._conn.send(req) 251 res = await aio.wait_for(future, self._response_timeout) 252 253 self._assert_res_type(res, transport.UnsubAckPacket) 254 return res.reasons 255 256 except asyncio.TimeoutError: 257 self._log.error("response timeout exceeded") 258 259 self._set_disconnect_reason( 260 common.Reason.IMPLEMENTATION_SPECIFIC_ERROR, 261 "response timeout exceeded") 262 self.close() 263 264 raise ConnectionError() 265 266 finally: 267 self._identifier_registry.release_identifier(identifier)
UInt32 =
<class 'int'>
String =
<class 'str'>
Binary =
bytes | bytearray | memoryview
class
QoS(enum.Enum):
AT_MOST_ONCE =
<QoS.AT_MOST_ONCE: 0>
AT_LEAST_ONCE =
<QoS.AT_LEAST_ONCE: 1>
EXACLTY_ONCE =
<QoS.EXACLTY_ONCE: 2>
class
RetainHandling(enum.Enum):
33class RetainHandling(enum.Enum): 34 SEND_ON_SUBSCRIBE = 0 35 SEND_ON_NEW_SUBSCRIBE = 1 36 DONT_SEND = 2
SEND_ON_SUBSCRIBE =
<RetainHandling.SEND_ON_SUBSCRIBE: 0>
SEND_ON_NEW_SUBSCRIBE =
<RetainHandling.SEND_ON_NEW_SUBSCRIBE: 1>
DONT_SEND =
<RetainHandling.DONT_SEND: 2>
class
Reason(enum.Enum):
39class Reason(enum.Enum): 40 SUCCESS = 0x00 41 GRANTED_QOS_1 = 0x01 42 GRANTED_QOS_2 = 0x02 43 DISCONNECT_WITH_WILL_MESSAGE = 0x04 44 NO_MATCHING_SUBSCRIBERS = 0x10 45 NO_SUBSCRIPTION_EXISTED = 0x11 46 CONTINUE_AUTHENTICATION = 0x24 47 RE_AUTHENTICATE = 0x25 48 UNSPECIFIED_ERROR = 0x80 49 MALFORMED_PACKET = 0x81 50 PROTOCOL_ERROR = 0x82 51 IMPLEMENTATION_SPECIFIC_ERROR = 0x83 52 UNSUPPORTED_PROTOCOL_VERSION = 0x84 53 CLIENT_IDENTIFIER_NOT_VALID = 0x85 54 BAD_USER_NAME_OR_PASSWORD = 0x86 55 NOT_AUTHORIZED = 0x87 56 SERVER_UNAVAILABLE = 0x88 57 SERVER_BUSY = 0x89 58 BANNED = 0x8a 59 SERVER_SHUTTING_DOWN = 0x8b 60 BAD_AUTHENTICATION_METHOD = 0x8c 61 KEEP_ALIVE_TIMEOUT = 0x8d 62 SESSION_TAKEN_OVER = 0x8e 63 TOPIC_FILTER_INVALID = 0x8f 64 TOPIC_NAME_INVALID = 0x90 65 PACKET_IDENTIFIER_IN_USE = 0x91 66 PACKET_IDENTIFIER_NOT_FOUND = 0x92 67 RECEIVE_MAXIMUM_EXCEEDED = 0x93 68 TOPIC_ALIAS_INVALID = 0x94 69 PACKET_TOO_LARGE = 0x95 70 MESSAGE_RATE_TOO_HIGH = 0x96 71 QUOTA_EXCEEDED = 0x97 72 ADMINISTRATIVE_ACTION = 0x98 73 PAYLOAD_FORMAT_INVALID = 0x99 74 RETAIN_NOT_SUPPORTED = 0x9a 75 QOS_NOT_SUPPORTED = 0x9b 76 USE_ANOTHER_SERVER = 0x9c 77 SERVER_MOVED = 0x9d 78 SHARED_SUBSCRIPTIONS_NOT_SUPPORTED = 0x9e 79 CONNECTION_RATE_EXCEEDED = 0x9f 80 MAXIMUM_CONNECT_TIME = 0xa0 81 SUBSCRIPTION_IDENTIFIERS_NOT_SUPPORTED = 0xa1 82 WILDCARD_SUBSCRIPTIONS_NOT_SUPPORTED = 0xa2
SUCCESS =
<Reason.SUCCESS: 0>
GRANTED_QOS_1 =
<Reason.GRANTED_QOS_1: 1>
GRANTED_QOS_2 =
<Reason.GRANTED_QOS_2: 2>
DISCONNECT_WITH_WILL_MESSAGE =
<Reason.DISCONNECT_WITH_WILL_MESSAGE: 4>
NO_MATCHING_SUBSCRIBERS =
<Reason.NO_MATCHING_SUBSCRIBERS: 16>
NO_SUBSCRIPTION_EXISTED =
<Reason.NO_SUBSCRIPTION_EXISTED: 17>
CONTINUE_AUTHENTICATION =
<Reason.CONTINUE_AUTHENTICATION: 36>
RE_AUTHENTICATE =
<Reason.RE_AUTHENTICATE: 37>
UNSPECIFIED_ERROR =
<Reason.UNSPECIFIED_ERROR: 128>
MALFORMED_PACKET =
<Reason.MALFORMED_PACKET: 129>
PROTOCOL_ERROR =
<Reason.PROTOCOL_ERROR: 130>
IMPLEMENTATION_SPECIFIC_ERROR =
<Reason.IMPLEMENTATION_SPECIFIC_ERROR: 131>
UNSUPPORTED_PROTOCOL_VERSION =
<Reason.UNSUPPORTED_PROTOCOL_VERSION: 132>
CLIENT_IDENTIFIER_NOT_VALID =
<Reason.CLIENT_IDENTIFIER_NOT_VALID: 133>
BAD_USER_NAME_OR_PASSWORD =
<Reason.BAD_USER_NAME_OR_PASSWORD: 134>
NOT_AUTHORIZED =
<Reason.NOT_AUTHORIZED: 135>
SERVER_UNAVAILABLE =
<Reason.SERVER_UNAVAILABLE: 136>
SERVER_BUSY =
<Reason.SERVER_BUSY: 137>
BANNED =
<Reason.BANNED: 138>
SERVER_SHUTTING_DOWN =
<Reason.SERVER_SHUTTING_DOWN: 139>
BAD_AUTHENTICATION_METHOD =
<Reason.BAD_AUTHENTICATION_METHOD: 140>
KEEP_ALIVE_TIMEOUT =
<Reason.KEEP_ALIVE_TIMEOUT: 141>
SESSION_TAKEN_OVER =
<Reason.SESSION_TAKEN_OVER: 142>
TOPIC_FILTER_INVALID =
<Reason.TOPIC_FILTER_INVALID: 143>
TOPIC_NAME_INVALID =
<Reason.TOPIC_NAME_INVALID: 144>
PACKET_IDENTIFIER_IN_USE =
<Reason.PACKET_IDENTIFIER_IN_USE: 145>
PACKET_IDENTIFIER_NOT_FOUND =
<Reason.PACKET_IDENTIFIER_NOT_FOUND: 146>
RECEIVE_MAXIMUM_EXCEEDED =
<Reason.RECEIVE_MAXIMUM_EXCEEDED: 147>
TOPIC_ALIAS_INVALID =
<Reason.TOPIC_ALIAS_INVALID: 148>
PACKET_TOO_LARGE =
<Reason.PACKET_TOO_LARGE: 149>
MESSAGE_RATE_TOO_HIGH =
<Reason.MESSAGE_RATE_TOO_HIGH: 150>
QUOTA_EXCEEDED =
<Reason.QUOTA_EXCEEDED: 151>
ADMINISTRATIVE_ACTION =
<Reason.ADMINISTRATIVE_ACTION: 152>
PAYLOAD_FORMAT_INVALID =
<Reason.PAYLOAD_FORMAT_INVALID: 153>
RETAIN_NOT_SUPPORTED =
<Reason.RETAIN_NOT_SUPPORTED: 154>
QOS_NOT_SUPPORTED =
<Reason.QOS_NOT_SUPPORTED: 155>
USE_ANOTHER_SERVER =
<Reason.USE_ANOTHER_SERVER: 156>
SERVER_MOVED =
<Reason.SERVER_MOVED: 157>
SHARED_SUBSCRIPTIONS_NOT_SUPPORTED =
<Reason.SHARED_SUBSCRIPTIONS_NOT_SUPPORTED: 158>
CONNECTION_RATE_EXCEEDED =
<Reason.CONNECTION_RATE_EXCEEDED: 159>
MAXIMUM_CONNECT_TIME =
<Reason.MAXIMUM_CONNECT_TIME: 160>
SUBSCRIPTION_IDENTIFIERS_NOT_SUPPORTED =
<Reason.SUBSCRIPTION_IDENTIFIERS_NOT_SUPPORTED: 161>
WILDCARD_SUBSCRIPTIONS_NOT_SUPPORTED =
<Reason.WILDCARD_SUBSCRIPTIONS_NOT_SUPPORTED: 162>
class
MqttError(builtins.Exception):
85class MqttError(Exception): 86 87 def __init__(self, 88 reason: Reason, 89 description: str | None): 90 super().__init__(reason, description) 91 self._reason = reason 92 self._description = description 93 94 @property 95 def reason(self) -> Reason: 96 return self._reason 97 98 @property 99 def description(self) -> str | None: 100 return self._description
Common base class for all non-exit exceptions.
MqttError(reason: Reason, description: str | None)
class
Subscription(typing.NamedTuple):
103class Subscription(typing.NamedTuple): 104 topic_filter: String 105 maximum_qos: QoS 106 no_local: bool 107 retain_as_published: bool 108 retain_handling: RetainHandling
Subscription(topic_filter, maximum_qos, no_local, retain_as_published, retain_handling)
Subscription( topic_filter: str, maximum_qos: QoS, no_local: bool, retain_as_published: bool, retain_handling: RetainHandling)
Create new instance of Subscription(topic_filter, maximum_qos, no_local, retain_as_published, retain_handling)