hat.drivers.mqtt
1from hat.drivers.mqtt.client import (Msg, 2 MsgCb, 3 connect, 4 Client) 5from hat.drivers.mqtt.common import (UInt32, 6 String, 7 Binary, 8 QoS, 9 RetainHandling, 10 Reason, 11 MqttError, 12 Subscription, 13 is_error_reason) 14 15 16__all__ = ['Msg', 17 'MsgCb', 18 'connect', 19 'Client', 20 'UInt32', 21 'String', 22 'Binary', 23 'QoS', 24 'RetainHandling', 25 'Reason', 26 'MqttError', 27 'Subscription', 28 'is_error_reason']
class
Msg(typing.NamedTuple):
21class Msg(typing.NamedTuple): 22 topic: common.String 23 payload: str | util.Bytes 24 qos: common.QoS = common.QoS.AT_MOST_ONCE 25 retain: bool = True 26 message_expiry_interval: common.UInt32 | None = None 27 response_topic: common.String | None = None 28 correlation_data: common.Binary | None = None 29 user_properties: Collection[tuple[common.String, common.String]] = [] 30 content_type: common.String | None = None
Msg(topic, payload, qos, retain, message_expiry_interval, response_topic, correlation_data, user_properties, content_type)
Msg( topic: str, payload: str | bytes | bytearray | memoryview, qos: QoS = <QoS.AT_MOST_ONCE: 0>, retain: bool = True, message_expiry_interval: int | None = None, response_topic: str | None = None, correlation_data: bytes | bytearray | memoryview | None = None, user_properties: Collection[tuple[str, str]] = [], content_type: str | None = None)
Create new instance of Msg(topic, payload, qos, retain, message_expiry_interval, response_topic, correlation_data, user_properties, content_type)
MsgCb =
typing.Callable[[ForwardRef('Client'), Msg], typing.Optional[typing.Awaitable[NoneType]]]
async def
connect( addr: hat.drivers.tcp.Address, msg_cb: Optional[Callable[[Client, Msg], Optional[Awaitable[NoneType]]]] = None, will_msg: Msg | None = None, will_delay: int = 0, client_id: str = '', user_name: str | None = None, password: bytes | bytearray | memoryview | None = None, ping_delay: float | None = None, response_timeout: float = 30, **kwargs) -> Client:
36async def connect(addr: tcp.Address, 37 msg_cb: MsgCb | None = None, 38 will_msg: Msg | None = None, 39 will_delay: common.UInt32 = 0, 40 client_id: common.String = '', 41 user_name: common.String | None = None, 42 password: common.Binary | None = None, 43 ping_delay: float | None = None, 44 response_timeout: float = 30, 45 **kwargs 46 ) -> 'Client': 47 conn = await transport.connect(addr, **kwargs) 48 49 try: 50 req = _create_connect_packet(will_msg=will_msg, 51 will_delay=will_delay, 52 ping_delay=ping_delay, 53 client_id=client_id, 54 user_name=user_name, 55 password=password) 56 await conn.send(req) 57 58 res = await aio.wait_for(conn.receive(), response_timeout) 59 60 if isinstance(res, transport.DisconnectPacket): 61 raise common.MqttError(res.reason, res.reason_string) 62 63 if not isinstance(res, transport.ConnAckPacket): 64 raise Exception('unexpected response') 65 66 if res.reason != common.Reason.SUCCESS: 67 raise common.MqttError(res.reason, res.reason_string) 68 69 except BaseException: 70 await aio.uncancellable(conn.async_close()) 71 raise 72 73 client = Client() 74 client._msg_cb = msg_cb 75 client._response_timeout = response_timeout 76 client._loop = asyncio.get_running_loop() 77 client._async_group = aio.Group() 78 client._conn = conn 79 client._sent_event = asyncio.Event() 80 client._ping_future = None 81 client._disconnect_reason = common.Reason.SUCCESS 82 client._disconnect_reason_string = None 83 84 client._maximum_qos = res.maximum_qos 85 client._client_id = (res.assigned_client_identifier 86 if res.assigned_client_identifier is not None 87 else client_id) 88 client._keep_alive = (req.keep_alive if res.server_keep_alive is None 89 else res.server_keep_alive) 90 client._response_information = res.response_information 91 92 try: 93 client.async_group.spawn(aio.call_on_cancel, client._on_close) 94 95 client._identifier_registry = _IdentifierRegistry( 96 client.async_group.create_subgroup()) 97 98 client.async_group.spawn(client._receive_loop) 99 if client._keep_alive > 0: 100 client.async_group.spawn(client._ping_loop) 101 102 except BaseException: 103 await aio.uncancellable(client.async_close()) 104 raise 105 106 return client
class
Client(hat.aio.group.Resource):
109class Client(aio.Resource): 110 111 @property 112 def async_group(self) -> aio.Group: 113 return self._async_group 114 115 @property 116 def info(self) -> tcp.ConnectionInfo: 117 return self._conn.info 118 119 @property 120 def maximum_qos(self) -> common.QoS: 121 return self._maximum_qos 122 123 @property 124 def client_id(self) -> common.String: 125 return self._client_id 126 127 @property 128 def response_information(self) -> common.String | None: 129 return self._response_information 130 131 async def publish(self, msg: Msg): 132 if msg.qos.value > self._maximum_qos.value: 133 raise Exception(f'maximum supported QoS is {self._maximum_qos}') 134 135 if msg.qos == common.QoS.AT_MOST_ONCE: 136 identifier = None 137 138 elif msg.qos in (common.QoS.AT_LEAST_ONCE, 139 common.QoS.EXACLTY_ONCE): 140 identifier = await self._identifier_registry.allocate_identifier() 141 142 else: 143 raise ValueError('unsupported QoS') 144 145 try: 146 req = transport.PublishPacket( 147 duplicate=False, 148 qos=self._maximum_qos, 149 retain=msg.retain, 150 topic_name=msg.topic, 151 packet_identifier=identifier, 152 message_expiry_interval=msg.message_expiry_interval, 153 topic_alias=None, 154 response_topic=msg.response_topic, 155 correlation_data=msg.correlation_data, 156 user_properties=msg.user_properties, 157 subscription_identifiers=[], 158 content_type=msg.content_type, 159 payload=msg.payload) 160 161 if msg.qos == common.QoS.AT_MOST_ONCE: 162 await self._conn.send(req) 163 return 164 165 future = self._identifier_registry.create_future(identifier) 166 await self._conn.send(req) 167 res = await aio.wait_for(future, self._response_timeout) 168 169 if msg.qos == common.QoS.AT_LEAST_ONCE: 170 self._assert_res_type(res, transport.PubAckPacket) 171 if common.is_error_reason(res.reason): 172 raise common.MqttError(res.reason, res.reason_string) 173 174 return 175 176 self._assert_res_type(res, transport.PubRecPacket) 177 if common.is_error_reason(res.reason): 178 raise common.MqttError(res.reason, res.reason_string) 179 180 req = transport.PubRelPacket(packet_identifier=identifier, 181 reason=common.Reason.SUCCESS, 182 reason_string=None, 183 user_properties=[]) 184 185 future = self._identifier_registry.create_future(identifier) 186 await self._conn.send(req) 187 res = await aio.wait_for(future, self._response_timeout) 188 189 self._assert_res_type(res, transport.PubCompPacket) 190 if common.is_error_reason(res.reason): 191 raise common.MqttError(res.reason, res.reason_string) 192 193 except asyncio.TimeoutError: 194 mlog.error("response timeout exceeded") 195 196 self._set_disconnect_reason( 197 common.Reason.IMPLEMENTATION_SPECIFIC_ERROR, 198 "response timeout exceeded") 199 self.close() 200 201 raise ConnectionError() 202 203 finally: 204 if identifier is not None: 205 self._identifier_registry.release_identifier(identifier) 206 207 async def subscribe(self, 208 subscriptions: Collection[common.Subscription] 209 ) -> Collection[common.Reason]: 210 identifier = await self._identifier_registry.allocate_identifier() 211 212 try: 213 req = transport.SubscribePacket(packet_identifier=identifier, 214 subscription_identifier=None, 215 user_properties=[], 216 subscriptions=subscriptions) 217 218 future = self._identifier_registry.create_future(identifier) 219 await self._conn.send(req) 220 res = await aio.wait_for(future, self._response_timeout) 221 222 self._assert_res_type(res, transport.SubAckPacket) 223 return res.reasons 224 225 except asyncio.TimeoutError: 226 mlog.error("response timeout exceeded") 227 228 self._set_disconnect_reason( 229 common.Reason.IMPLEMENTATION_SPECIFIC_ERROR, 230 "response timeout exceeded") 231 self.close() 232 233 raise ConnectionError() 234 235 finally: 236 self._identifier_registry.release_identifier(identifier) 237 238 async def unsubscribe(self, 239 topic_filters: Collection[common.String] 240 ) -> Collection[common.Reason]: 241 identifier = await self._identifier_registry.allocate_identifier() 242 243 try: 244 req = transport.UnsubscribePacket(packet_identifier=identifier, 245 user_properties=[], 246 topic_filters=topic_filters) 247 248 future = self._identifier_registry.create_future(identifier) 249 await self._conn.send(req) 250 res = await aio.wait_for(future, self._response_timeout) 251 252 self._assert_res_type(res, transport.UnsubAckPacket) 253 return res.reasons 254 255 except asyncio.TimeoutError: 256 mlog.error("response timeout exceeded") 257 258 self._set_disconnect_reason( 259 common.Reason.IMPLEMENTATION_SPECIFIC_ERROR, 260 "response timeout exceeded") 261 self.close() 262 263 raise ConnectionError() 264 265 finally: 266 self._identifier_registry.release_identifier(identifier) 267 268 async def _on_close(self): 269 if self._conn.is_open: 270 with contextlib.suppress(Exception): 271 await self._conn.send( 272 transport.DisconnectPacket( 273 reason=self._disconnect_reason, 274 session_expiry_interval=None, 275 reason_string=self._disconnect_reason_string, 276 user_properties=[], 277 server_reference=None)) 278 279 await self._conn.async_close() 280 281 def _set_disconnect_reason(self, reason, reason_string): 282 if common.is_error_reason(self._disconnect_reason): 283 return 284 285 self._disconnect_reason = reason 286 self._disconnect_reason_string = reason_string 287 288 async def _send(self, packet): 289 await self._conn.send(packet) 290 self._sent_event.set() 291 292 def _assert_res_type(self, res, cls): 293 if isinstance(res, cls): 294 return 295 296 mlog.error('received invalid response (expecting %s)', cls) 297 298 self._set_disconnect_reason(common.Reason.PROTOCOL_ERROR, 299 'invalid response type') 300 self.close() 301 302 raise ConnectionError() 303 304 async def _receive_loop(self): 305 try: 306 mlog.debug('starting receive loop') 307 308 while True: 309 packet = await self._conn.receive() 310 311 if isinstance(packet, transport.PublishPacket): 312 await self._process_publish_packet(packet) 313 314 elif isinstance(packet, transport.PubRelPacket): 315 await self._process_publish_release_packet(packet) 316 317 elif isinstance(packet, (transport.PubAckPacket, 318 transport.PubRecPacket, 319 transport.PubCompPacket, 320 transport.SubAckPacket, 321 transport.UnsubAckPacket)): 322 if not self._identifier_registry.register_packet(packet): 323 mlog.warning('packet identifier not expected - ' 324 'dropping packet') 325 326 elif isinstance(packet, transport.PingResPacket): 327 if self._ping_future and not self._ping_future.done(): 328 self._ping_future.set_result(None) 329 330 elif isinstance(packet, transport.DisconnectPacket): 331 mlog.debug('received disconnect packet: %s: %s', 332 packet.reason, packet.reason_string) 333 334 self._conn.close() 335 break 336 337 elif isinstance(packet, transport.AuthPacket): 338 raise common.MqttError( 339 common.Reason.IMPLEMENTATION_SPECIFIC_ERROR, 340 'auth packet not supported') 341 342 else: 343 raise common.MqttError(common.Reason.PROTOCOL_ERROR, 344 'unexpected packet') 345 346 except ConnectionError: 347 pass 348 349 except common.MqttError as e: 350 mlog.error('receive loop mqtt error: %s', e, exc_info=e) 351 352 self._set_disconnect_reason(e.reason, e.description) 353 354 except Exception as e: 355 mlog.error('receive loop error: %s', e, exc_info=e) 356 357 self._set_disconnect_reason(common.Reason.UNSPECIFIED_ERROR, None) 358 359 finally: 360 mlog.debug('stopping receive loop') 361 362 self.close() 363 364 async def _ping_loop(self): 365 try: 366 mlog.debug('starting ping loop') 367 368 while True: 369 self._sent_event.clear() 370 371 with contextlib.suppress(asyncio.TimeoutError): 372 await aio.wait_for(self._sent_event.wait(), 373 self._keep_alive) 374 continue 375 376 self._ping_future = self._loop.create_future() 377 await self._conn.send(transport.PingReqPacket()) 378 379 await aio.wait_for(self._ping_future, self._response_timeout) 380 381 except ConnectionError: 382 pass 383 384 except asyncio.TimeoutError: 385 mlog.error('ping response timeout') 386 387 self._set_disconnect_reason( 388 common.Reason.IMPLEMENTATION_SPECIFIC_ERROR, 389 "response timeout exceeded") 390 391 except Exception as e: 392 mlog.error('ping loop error: %s', e, exc_info=e) 393 394 self._set_disconnect_reason(common.Reason.UNSPECIFIED_ERROR, None) 395 396 finally: 397 mlog.debug('stopping ping loop') 398 399 self.close() 400 401 async def _process_publish_packet(self, packet): 402 if self._msg_cb: 403 msg = Msg( 404 topic=packet.topic_name, 405 payload=packet.payload, 406 qos=packet.qos, 407 retain=packet.retain, 408 message_expiry_interval=packet.message_expiry_interval, 409 response_topic=packet.response_topic, 410 correlation_data=packet.correlation_data, 411 user_properties=packet.user_properties, 412 content_type=packet.content_type) 413 414 await aio.call(self._msg_cb, self, msg) 415 416 if msg.qos == common.QoS.AT_MOST_ONCE: 417 return 418 419 if msg.qos == common.QoS.AT_LEAST_ONCE: 420 res = transport.PubAckPacket( 421 packet_identifier=packet.packet_identifier, 422 reason=common.Reason.SUCCESS, 423 reason_string=None, 424 user_properties=[]) 425 426 await self._send(res) 427 return 428 429 if msg.qos != common.QoS.EXACLTY_ONCE: 430 raise ValueError('unsupported QoS') 431 432 req = transport.PubRecPacket( 433 packet_identifier=packet.packet_identifier, 434 reason=common.Reason.SUCCESS, 435 reason_string=None, 436 user_properties=[]) 437 438 await self._conn.send(req) 439 440 async def _process_publish_release_packet(self, packet): 441 442 # TODO should remember previous publish packet and check response 443 # timeout 444 445 if common.is_error_reason(packet.reason): 446 mlog.warning('publish release error: %s: %s', 447 packet.reason, packet.reason_string) 448 return 449 450 req = transport.PubCompPacket( 451 packet_identifier=packet.packet_identifier, 452 reason=common.Reason.SUCCESS, 453 reason_string=None, 454 user_properties=[]) 455 456 await self._conn.send(req)
Resource with lifetime control based on Group
.
maximum_qos: QoS
131 async def publish(self, msg: Msg): 132 if msg.qos.value > self._maximum_qos.value: 133 raise Exception(f'maximum supported QoS is {self._maximum_qos}') 134 135 if msg.qos == common.QoS.AT_MOST_ONCE: 136 identifier = None 137 138 elif msg.qos in (common.QoS.AT_LEAST_ONCE, 139 common.QoS.EXACLTY_ONCE): 140 identifier = await self._identifier_registry.allocate_identifier() 141 142 else: 143 raise ValueError('unsupported QoS') 144 145 try: 146 req = transport.PublishPacket( 147 duplicate=False, 148 qos=self._maximum_qos, 149 retain=msg.retain, 150 topic_name=msg.topic, 151 packet_identifier=identifier, 152 message_expiry_interval=msg.message_expiry_interval, 153 topic_alias=None, 154 response_topic=msg.response_topic, 155 correlation_data=msg.correlation_data, 156 user_properties=msg.user_properties, 157 subscription_identifiers=[], 158 content_type=msg.content_type, 159 payload=msg.payload) 160 161 if msg.qos == common.QoS.AT_MOST_ONCE: 162 await self._conn.send(req) 163 return 164 165 future = self._identifier_registry.create_future(identifier) 166 await self._conn.send(req) 167 res = await aio.wait_for(future, self._response_timeout) 168 169 if msg.qos == common.QoS.AT_LEAST_ONCE: 170 self._assert_res_type(res, transport.PubAckPacket) 171 if common.is_error_reason(res.reason): 172 raise common.MqttError(res.reason, res.reason_string) 173 174 return 175 176 self._assert_res_type(res, transport.PubRecPacket) 177 if common.is_error_reason(res.reason): 178 raise common.MqttError(res.reason, res.reason_string) 179 180 req = transport.PubRelPacket(packet_identifier=identifier, 181 reason=common.Reason.SUCCESS, 182 reason_string=None, 183 user_properties=[]) 184 185 future = self._identifier_registry.create_future(identifier) 186 await self._conn.send(req) 187 res = await aio.wait_for(future, self._response_timeout) 188 189 self._assert_res_type(res, transport.PubCompPacket) 190 if common.is_error_reason(res.reason): 191 raise common.MqttError(res.reason, res.reason_string) 192 193 except asyncio.TimeoutError: 194 mlog.error("response timeout exceeded") 195 196 self._set_disconnect_reason( 197 common.Reason.IMPLEMENTATION_SPECIFIC_ERROR, 198 "response timeout exceeded") 199 self.close() 200 201 raise ConnectionError() 202 203 finally: 204 if identifier is not None: 205 self._identifier_registry.release_identifier(identifier)
207 async def subscribe(self, 208 subscriptions: Collection[common.Subscription] 209 ) -> Collection[common.Reason]: 210 identifier = await self._identifier_registry.allocate_identifier() 211 212 try: 213 req = transport.SubscribePacket(packet_identifier=identifier, 214 subscription_identifier=None, 215 user_properties=[], 216 subscriptions=subscriptions) 217 218 future = self._identifier_registry.create_future(identifier) 219 await self._conn.send(req) 220 res = await aio.wait_for(future, self._response_timeout) 221 222 self._assert_res_type(res, transport.SubAckPacket) 223 return res.reasons 224 225 except asyncio.TimeoutError: 226 mlog.error("response timeout exceeded") 227 228 self._set_disconnect_reason( 229 common.Reason.IMPLEMENTATION_SPECIFIC_ERROR, 230 "response timeout exceeded") 231 self.close() 232 233 raise ConnectionError() 234 235 finally: 236 self._identifier_registry.release_identifier(identifier)
238 async def unsubscribe(self, 239 topic_filters: Collection[common.String] 240 ) -> Collection[common.Reason]: 241 identifier = await self._identifier_registry.allocate_identifier() 242 243 try: 244 req = transport.UnsubscribePacket(packet_identifier=identifier, 245 user_properties=[], 246 topic_filters=topic_filters) 247 248 future = self._identifier_registry.create_future(identifier) 249 await self._conn.send(req) 250 res = await aio.wait_for(future, self._response_timeout) 251 252 self._assert_res_type(res, transport.UnsubAckPacket) 253 return res.reasons 254 255 except asyncio.TimeoutError: 256 mlog.error("response timeout exceeded") 257 258 self._set_disconnect_reason( 259 common.Reason.IMPLEMENTATION_SPECIFIC_ERROR, 260 "response timeout exceeded") 261 self.close() 262 263 raise ConnectionError() 264 265 finally: 266 self._identifier_registry.release_identifier(identifier)
UInt32 =
<class 'int'>
String =
<class 'str'>
Binary =
bytes | bytearray | memoryview
class
QoS(enum.Enum):
An enumeration.
AT_MOST_ONCE =
<QoS.AT_MOST_ONCE: 0>
AT_LEAST_ONCE =
<QoS.AT_LEAST_ONCE: 1>
EXACLTY_ONCE =
<QoS.EXACLTY_ONCE: 2>
class
RetainHandling(enum.Enum):
33class RetainHandling(enum.Enum): 34 SEND_ON_SUBSCRIBE = 0 35 SEND_ON_NEW_SUBSCRIBE = 1 36 DONT_SEND = 2
An enumeration.
SEND_ON_SUBSCRIBE =
<RetainHandling.SEND_ON_SUBSCRIBE: 0>
SEND_ON_NEW_SUBSCRIBE =
<RetainHandling.SEND_ON_NEW_SUBSCRIBE: 1>
DONT_SEND =
<RetainHandling.DONT_SEND: 2>
class
Reason(enum.Enum):
39class Reason(enum.Enum): 40 SUCCESS = 0x00 41 GRANTED_QOS_1 = 0x01 42 GRANTED_QOS_2 = 0x02 43 DISCONNECT_WITH_WILL_MESSAGE = 0x04 44 NO_MATCHING_SUBSCRIBERS = 0x10 45 NO_SUBSCRIPTION_EXISTED = 0x11 46 CONTINUE_AUTHENTICATION = 0x24 47 RE_AUTHENTICATE = 0x25 48 UNSPECIFIED_ERROR = 0x80 49 MALFORMED_PACKET = 0x81 50 PROTOCOL_ERROR = 0x82 51 IMPLEMENTATION_SPECIFIC_ERROR = 0x83 52 UNSUPPORTED_PROTOCOL_VERSION = 0x84 53 CLIENT_IDENTIFIER_NOT_VALID = 0x85 54 BAD_USER_NAME_OR_PASSWORD = 0x86 55 NOT_AUTHORIZED = 0x87 56 SERVER_UNAVAILABLE = 0x88 57 SERVER_BUSY = 0x89 58 BANNED = 0x8a 59 SERVER_SHUTTING_DOWN = 0x8b 60 BAD_AUTHENTICATION_METHOD = 0x8c 61 KEEP_ALIVE_TIMEOUT = 0x8d 62 SESSION_TAKEN_OVER = 0x8e 63 TOPIC_FILTER_INVALID = 0x8f 64 TOPIC_NAME_INVALID = 0x90 65 PACKET_IDENTIFIER_IN_USE = 0x91 66 PACKET_IDENTIFIER_NOT_FOUND = 0x92 67 RECEIVE_MAXIMUM_EXCEEDED = 0x93 68 TOPIC_ALIAS_INVALID = 0x94 69 PACKET_TOO_LARGE = 0x95 70 MESSAGE_RATE_TOO_HIGH = 0x96 71 QUOTA_EXCEEDED = 0x97 72 ADMINISTRATIVE_ACTION = 0x98 73 PAYLOAD_FORMAT_INVALID = 0x99 74 RETAIN_NOT_SUPPORTED = 0x9a 75 QOS_NOT_SUPPORTED = 0x9b 76 USE_ANOTHER_SERVER = 0x9c 77 SERVER_MOVED = 0x9d 78 SHARED_SUBSCRIPTIONS_NOT_SUPPORTED = 0x9e 79 CONNECTION_RATE_EXCEEDED = 0x9f 80 MAXIMUM_CONNECT_TIME = 0xa0 81 SUBSCRIPTION_IDENTIFIERS_NOT_SUPPORTED = 0xa1 82 WILDCARD_SUBSCRIPTIONS_NOT_SUPPORTED = 0xa2
An enumeration.
SUCCESS =
<Reason.SUCCESS: 0>
GRANTED_QOS_1 =
<Reason.GRANTED_QOS_1: 1>
GRANTED_QOS_2 =
<Reason.GRANTED_QOS_2: 2>
DISCONNECT_WITH_WILL_MESSAGE =
<Reason.DISCONNECT_WITH_WILL_MESSAGE: 4>
NO_MATCHING_SUBSCRIBERS =
<Reason.NO_MATCHING_SUBSCRIBERS: 16>
NO_SUBSCRIPTION_EXISTED =
<Reason.NO_SUBSCRIPTION_EXISTED: 17>
CONTINUE_AUTHENTICATION =
<Reason.CONTINUE_AUTHENTICATION: 36>
RE_AUTHENTICATE =
<Reason.RE_AUTHENTICATE: 37>
UNSPECIFIED_ERROR =
<Reason.UNSPECIFIED_ERROR: 128>
MALFORMED_PACKET =
<Reason.MALFORMED_PACKET: 129>
PROTOCOL_ERROR =
<Reason.PROTOCOL_ERROR: 130>
IMPLEMENTATION_SPECIFIC_ERROR =
<Reason.IMPLEMENTATION_SPECIFIC_ERROR: 131>
UNSUPPORTED_PROTOCOL_VERSION =
<Reason.UNSUPPORTED_PROTOCOL_VERSION: 132>
CLIENT_IDENTIFIER_NOT_VALID =
<Reason.CLIENT_IDENTIFIER_NOT_VALID: 133>
BAD_USER_NAME_OR_PASSWORD =
<Reason.BAD_USER_NAME_OR_PASSWORD: 134>
NOT_AUTHORIZED =
<Reason.NOT_AUTHORIZED: 135>
SERVER_UNAVAILABLE =
<Reason.SERVER_UNAVAILABLE: 136>
SERVER_BUSY =
<Reason.SERVER_BUSY: 137>
BANNED =
<Reason.BANNED: 138>
SERVER_SHUTTING_DOWN =
<Reason.SERVER_SHUTTING_DOWN: 139>
BAD_AUTHENTICATION_METHOD =
<Reason.BAD_AUTHENTICATION_METHOD: 140>
KEEP_ALIVE_TIMEOUT =
<Reason.KEEP_ALIVE_TIMEOUT: 141>
SESSION_TAKEN_OVER =
<Reason.SESSION_TAKEN_OVER: 142>
TOPIC_FILTER_INVALID =
<Reason.TOPIC_FILTER_INVALID: 143>
TOPIC_NAME_INVALID =
<Reason.TOPIC_NAME_INVALID: 144>
PACKET_IDENTIFIER_IN_USE =
<Reason.PACKET_IDENTIFIER_IN_USE: 145>
PACKET_IDENTIFIER_NOT_FOUND =
<Reason.PACKET_IDENTIFIER_NOT_FOUND: 146>
RECEIVE_MAXIMUM_EXCEEDED =
<Reason.RECEIVE_MAXIMUM_EXCEEDED: 147>
TOPIC_ALIAS_INVALID =
<Reason.TOPIC_ALIAS_INVALID: 148>
PACKET_TOO_LARGE =
<Reason.PACKET_TOO_LARGE: 149>
MESSAGE_RATE_TOO_HIGH =
<Reason.MESSAGE_RATE_TOO_HIGH: 150>
QUOTA_EXCEEDED =
<Reason.QUOTA_EXCEEDED: 151>
ADMINISTRATIVE_ACTION =
<Reason.ADMINISTRATIVE_ACTION: 152>
PAYLOAD_FORMAT_INVALID =
<Reason.PAYLOAD_FORMAT_INVALID: 153>
RETAIN_NOT_SUPPORTED =
<Reason.RETAIN_NOT_SUPPORTED: 154>
QOS_NOT_SUPPORTED =
<Reason.QOS_NOT_SUPPORTED: 155>
USE_ANOTHER_SERVER =
<Reason.USE_ANOTHER_SERVER: 156>
SERVER_MOVED =
<Reason.SERVER_MOVED: 157>
SHARED_SUBSCRIPTIONS_NOT_SUPPORTED =
<Reason.SHARED_SUBSCRIPTIONS_NOT_SUPPORTED: 158>
CONNECTION_RATE_EXCEEDED =
<Reason.CONNECTION_RATE_EXCEEDED: 159>
MAXIMUM_CONNECT_TIME =
<Reason.MAXIMUM_CONNECT_TIME: 160>
SUBSCRIPTION_IDENTIFIERS_NOT_SUPPORTED =
<Reason.SUBSCRIPTION_IDENTIFIERS_NOT_SUPPORTED: 161>
WILDCARD_SUBSCRIPTIONS_NOT_SUPPORTED =
<Reason.WILDCARD_SUBSCRIPTIONS_NOT_SUPPORTED: 162>
class
MqttError(builtins.Exception):
85class MqttError(Exception): 86 87 def __init__(self, 88 reason: Reason, 89 description: str | None): 90 super().__init__(reason, description) 91 self._reason = reason 92 self._description = description 93 94 @property 95 def reason(self) -> Reason: 96 return self._reason 97 98 @property 99 def description(self) -> str | None: 100 return self._description
Common base class for all non-exit exceptions.
MqttError(reason: Reason, description: str | None)
class
Subscription(typing.NamedTuple):
103class Subscription(typing.NamedTuple): 104 topic_filter: String 105 maximum_qos: QoS 106 no_local: bool 107 retain_as_published: bool 108 retain_handling: RetainHandling
Subscription(topic_filter, maximum_qos, no_local, retain_as_published, retain_handling)
Subscription( topic_filter: str, maximum_qos: QoS, no_local: bool, retain_as_published: bool, retain_handling: RetainHandling)
Create new instance of Subscription(topic_filter, maximum_qos, no_local, retain_as_published, retain_handling)