最新要闻
- 焦点热讯:“爸爸和继母离婚了,我还能跟继母一起生活吗?”法院判决:可以
- 金马游乐:控股股东、实际控制人邓志毅累计质押约815万股,占其所持股份32.89% 世界百事通
- 环球今热点:西安安居乐筑科技路店什么时候装修好?
- 大S将起诉汪小菲,反对公开孩子容貌,汪小菲以后见孩子难上加难|天天时快讯
- 当前热讯:逾期48年还不起会影响征信吗
- 普者黑旅游景区交通指南-世界热点
- 短讯!山东 | 济南市歌舞剧院:策划执行大型演出,展现铁军风采
- 被子晒错等于白晒!被子的晾晒收纳指南来了!很多人做错~ 天天新要闻
- 520兰州城关区结婚登记要预约吗
- qq怎么批量删除说说的历史_qq怎么批量删除说说 世界观天下
- 催产素的副作用有哪些_催产素的副作用
- 速讯:无线鼠标为啥不好使_为什么无线鼠标没反应
- 涉及145所大学,英罢工教师期末不阅卷,学生遭殃_天天报道
- 土壤分层剖面图高清图_土壤分层
- 中育贝拉国际高中艺术课程都有哪些专业设置? -->_消息
- 喝中药能吃牛肉吗?
5G
首例5g乳腺手术价格是多少钱?首例5g乳腺手术成功率是多少?
电信光纤多少钱一年?电信光纤价格表
- 首例5g乳腺手术价格是多少钱?首例5g乳腺手术成功率是多少?
- 电信光纤多少钱一年?电信光纤价格表
- 5g流量消耗会不会很快?手机打开5g好还是关闭5G好?
- 5g怎么变成4g信号?5g手机排名前十名一览
- 5g和4g的区别在哪里?5g首批城市名单
- 5g是谁最先研发出来的?5g是什么意思?
科技
Python MQTT客户端 paho-mqtt_全球信息
Python有许多优秀的MQTT客户端,比较有代表性的有paho-mqtt、hbmqtt、gmqtt等,各有特色
paho-mqtt 有着最优秀的文档,代码风格易于理解,同时有着强大的基金会支持,目前新版本支持 MQTT 5.0
(相关资料图)
hbmqtt 使用 asyncio 库实现,可以优化网络 I/O 带来的延迟,但是代码风格不友好,文档较少,不支持 MQTT 5.0,且不再维护,被原作者弃用,有一个分支amqtt正在由不同的人积极开发
gmqtt 同样通过 asyncio 库实现,相比 HBMQTT ,代码风格友好,最重要的是支持 MQTT 5.0
paho-mqtt安装paho-mqtt 可以说是 Python MQTT 开源客户端库中的佼佼者, 支持5.0、3.1.1 和 3.1 的MQTT 协议,由 Eclipse 基金会主导开发.在基金会的支持下,以每年一个版本的速度更新,稳定、持续,本文使用新版本paho-mqtt v1.6.1
pypi地址
开源地址
参考文章
http://www.steves-internet-guide.com/ (墙外)
使用pip安装
pip3 install paho-mqtt# virtualenv安装和完整代码参考官方文档
paho-mqtt已知的一些限制
paho-mqtt使用截止1.6.1版本,当 clean_session 为 False 时,session 只存储在内存中,不持久化。这意味着当客户端重新启动时(不仅仅是重新连接,通常是因为程序重新启动而重新创建对象)会话丢失。这可能导致消息丢失。
客户端会话的以下部分丢失:
已从服务器收到但尚未完全确认的 QoS 2 消息。
由于客户端将盲目地确认任何 PUBCOMP(QoS 2 事务的最后一条消息),它不会挂起但会丢失此 QoS 2 消息。
已发送到服务器但尚未完全确认的 QoS 1 和 QoS 2 消息。
这意味着传递给 publish() 的消息可能会丢失。这可以通过注意传递给 publish() 的所有消息都有相应的 on_publish() 调用来缓解。
这也意味着代理可能在会话中有 Qos2 消息。由于客户端从一个空会话开始,它不知道它并将重新使用中间。这还没有确定。
此外,当 clean_session 为 True 时,该库将在网络重新连接时重新发布 QoS > 0 消息。这意味着 QoS > 0 消息不会丢失。但是标准规定我们是否应该丢弃发送了发布数据包的任何消息。我们的选择意味着我们不符合标准,并且 QoS 2 有可能被接收两次。如果只需要一次交付的 QoS 2 保证,应该 clean_session = False
使用paho-mqtt实现客户端相关功能简单步骤如下:
构造Client客户端实例使用connect相关方法将创建的客户端连接到代理使用loop相关方法维护和broker的通信使用subscribe()方法订阅主题、接收消息使用publish()方法发送消息使用disconnect()断开连接Client客户端使用客户端连接代理、订阅等,首先我们需要先创建一个客户端,paho-mqtt使用Client()创建客户端实例
Client类的构造参数# Client 源码 参数如下def __init__(self, client_id="", clean_session=None, userdata=None, protocol=MQTTv311, transport="tcp", reconnect_on_failure=True):
Client类构造参数讲解# 参数示意client_id = "客户端ID,str类型,可自定义""""如果长度为零或者为空,则会随机生成一个,这种情况下,clean_session参数必须为True清除会话,因为持久会话需要client_id为唯一标识来恢复会话"""clean_session = "会话清除状态,bool类型,设为True,broker在断开连接的时候删除该client的信息,为False,则相当于是持久会话""""clean_session在官方文档示例默认值为True,查看1.6.1版本源码默认值为None,看逻辑应该是支持mqttv5之后做的的变化源码中判断如果protocol是MQTT V5版本 clean_session不是None的话则抛出ValueError,就是mqtt5的clean_session必须持久会话,如果不是mqtt5的话如果clean_session is None 则将clean_session设置为True,这块就与官方文档的默认逻辑对应上了"""userdata = "client用户数据,传递给回调函数,可以是任意类型,可以使用Clinet的 user_data_set()函数进行更新数据"protocol = "客户端协议的版本,默认是MQTTv311就是3.1.1版本,也可以是MQTTv31、MQTTv5版本""""protocol的参数在源码中是以下对应关系,理论上直接传入对应int值或者导入MQTTv** 字段传入都可MQTTv31 = 3MQTTv311 = 4MQTTv5 = 5"""transport = "底层传输协议,默认是使用原始tcp,值可以设置为websockets通过ws发送mqtt"reconnect_on_failure = "bool类型, 连接失败后是否自动重新connect,默认为True""""官方文档没有reconnect_on_failure相关示例和解释,不确定老版本有没有该字段"""
创建客户端# -*- coding: utf-8 -*-# @Time: 2023/5/10 16:09# @Author: LiQi# @Describe:import paho.mqtt.client as mqtt # 导入clinet 别名 mqtt# 创建一个客户端实例赋值client,client_id自定义,其他参数根据需要设定client = mqtt.Client(client_id="muziqi")
重置客户端"""paho-mqtt提供reinitialise方法重新初始化已经构造好的客户端"""# 上面创建的客户端clinet,直接调用reinitialiseclient.reinitialise()"""reinitialise方法拥有下面三个参数client_id="", clean_session=True, userdata=None"""
Clinet选项函数选项函数的作用是给构造好的客户端添加附加选项,一般情况下在使用content连接broker前完成,比如设置Client的证书、回调、账号密码等,根据具体业务使用,设置完成后使用conten连接到broker
max_inflight_messages_set当QoS> 0的时候,可以存在的部分完成(已经发送,还没有确认成功的消息)的消息的最大数量,这些消息可以同时通过网络流,默认为20.增加此值将消耗更多内存,但可以增加吞吐量
# inflight参数为要修改的数量,最小为1,小于1则抛出ValueErrorclient.max_inflight_messages_set(inflight)
max_queued_messages_set设置传出消息队列中等待处理的QoS> 0的传出消息的最大数量,默认为0表示无限制,当队列已满时,其他传出的消息都将被丢弃,实际使用中0实际上并不是无限,目前实现最大可为65555,包括队列中的65535条消息+inflight的默认20条消息,如果inflight默认值被消息,队列中的实际消息数量被减少
# queue_size是要修改的最大数量client.max_queued_messages_set(queue_size)
message_retry_set-废弃QoS>0 的消息,如果发送之后超过一定时间broker没有响应,重发消息的时间,单位是s,默认5s
""" 源码注释该方法不再使用,在2.0版本删除 """
ws_set_options设置WebSocket的连接选项,构造Client时transport="websockets"才会使用,connect相关方法之前调用
import paho.mqtt.client as mqtt# 传输协议设置为wsclient = mqtt.Client(client_id="muziqi",transport="websockets")"""path:broker的mqtt 路径,以/开头的字符串headers:头部信息可以是字典或者是可调用对象。如果是字典的话字典中额外的项被添加到websocket头中。如果是一个可调用的对象,默认的websocket的头部信息被传递到这个函数,将函数结果当做新的头不新鲜"""client.ws_set_options(path="/mqtt", headers=None)
tls_set配置网络加密和身份验证选项,启用SSL/TLS支持,connect相关方法之前调用
# 以下所有参数默认值均为None,根据业务自身选用client.tls_set(ca_certs, certfile, keyfile, cert_reqs, tls_version, ciphers, keyfile_password)"""ca_certs: CA 根证书的路径,如果这是给定的唯一选项,则客户端将以与 Web 浏览器类似的方式运行,要求代理拥有由ca_certs中的证书颁发机构签名的证书,并将使用 TLS v1.2 进行通信,但不会尝试任何形式的身份验证。这提供了基本的网络加密,具体取决于代理的配置方式。默认情况下,在 Python 2.7.9+ 或 3.4+ 上,使用系统的默认证书颁发机构。在较旧的 Python 版本上,此参数是必需的certfile和keyfile: 客户端私钥和证书的路径,分别指向 PEM 编码的客户端证书和私钥的字符串,如果这些参数不是None那么它们将被用作基于 TLS 的身份验证的客户端信息,对此功能的支持取决于代理cert_reqs: 设置客户端对 broker 证书的需求,默认是 ssl.CERT_REQUIRED ,表示 broker 必须提供一个证书tls_version:使用的SSL/TLS协议版本,默认是TLS v1.2 一般不做修改ciphers:字符串,设置允许使用哪些加密方法,默认是Nonekeyfile_password:如果certfile和keyfile加密了需要密码解密,可以使用该参数传递,如果不提供的话需要在终端输入,目前无法定义回调以提供密码"""
tls_set_context配置网络加密和身份验证上下文,启用SSL/TLS支持,connect相关方法之前调用
client.tls_set_context(context)"""参数释义ssl.SSLContext对象,在python3.4之后,默认情况下由ssl.create_default_context()提供,一般无需自定义设置,业务需要不确定是否使用该方法的话,用tls_set or 默认上下文即可"""
tls_insecure_set配置对服务器证书中的服务器主机名进行验证,在connect相关方法之前和 tls_set 或 tls_set_context之后调用
client.tls_insecure_set(value)"""value是bool类型,设置为True代币不使用加密,Flase使用加密值设置为True,可能让恶意第三方通过 DNS 欺骗等方式冒充你的服务器测试程序可以使用该方法来不进行验证"""# 源码根据tls_set选项的cert_reqs字段来设置默认值if cert_reqs != ssl.CERT_NONE: self.tls_insecure_set(False)else: self.tls_insecure_set(True)
enable_logger使用 python 日志包 logging启用日志记录,可以跟后面的on_log日志回调方法同时使用
client.enable_logger(logger)"""logger参数用来指定记录器如果指定了记录器,那么将使用该logging.Logger对象,否则将自动创建一个"""
Paho日志记录级别和标准日志级别对应关系如下
disable_logger禁用python 日志包 记录日志,对on_log回调没有影响
client.disable_logger()"""disable_logger相当于是把enable_logger记录器清除两个方法的源码也很简单 ↓↓↓"""def enable_logger(self, logger=None): if logger is None: if self._logger is not None: return logger = logging.getLogger(__name__) self._logger = loggerdef disable_logger(self): self._logger = None
username_pw_set设置用户名和可选的代理身份验证密码,密码根据broker验证进行设定,可为空,在connect相关方法前调用
client.username_pw_set(username,password=None)
user_data_set将在生成事件时传递给回调方法的私有用户数据,可以使用这个方法更新构造Client时候的userdata字段值
client.user_data_set(userdata)
will_set设置遗嘱消息,client没有使用disconnect方法以外断开连接,broker推送遗嘱消息
client.will_set(topic,payload,qos,retain,properties)"""topic:遗嘱发布主题payload:默认值为None,遗嘱消息内容qos:默认值为0,消息质量级别retain:bool,默认False,是否将遗嘱消息设置为保留消息properties:默认值为None,支持MQTTv5后新增的字段,用来设置遗嘱属性"""
遗嘱消息逻辑、报文、上述字段、属性可参考https://www.cnblogs.com/Mickey-7/p/17385599.html>
reconnect_delay_set客户端断开重新连接延迟的设置
client.reconnect_delay_set(min_delay,max_delay)"""min_delay:默认1max_delay:默认值120连接丢失的时候,默认等待min_delay,每次尝试重连后下次min_delay都将翻倍,上限时间是max_delay连接成功将重置min_delay,Client收到broker的connack报文视为完全连接成功"""
Client连接/重新连接/断开连接连接方法-connectconnect方法的作用是将构造好、设置好选项的的client连接到broker,这是一个阻塞函数
client.connect(host, port, keepalive, bind_address, bind_port,clean_start,properties)"""host:远程代理的主机名或 IP 地址prot:要连接的服务器主机的端口,默认为1883,如果是基于SSL/TLS的MQTT的默认端口为8883,如果选项函数使用tls_set或者tls_set_context,可能需要手动提供端口keepalive:心跳间隔时间,默认为60,单位是秒bind_address: 默认值是空字符串,如果client的本地有多个地址,可以使用该参数绑定一个地址,表示来源bind_port:默认值是0, 与bind_address一样,本地有多个地址,可以使用该参数绑定一个端口,表示来源clean_start:会话设置,默认值 MQTT_CLEAN_START_FIRST_ONLY = 3, mqtt v5版本仅在第一次成功连接时使用干净启动标志,mqttv5如果不是3则ValueErrorproperties:设置会话属性"""
clean_start和properties参考:https://www.cnblogs.com/Mickey-7/p/17361919.html
连接方法-connect_async异步连接方法,与loop_start结合使用以及非阻塞的方式连接,如果一直没有调用loop_start,不会完成连接
client.connect_async(host, port, keepalive, bind_address, bind_port,clean_start,properties)"""参数参考connect方法"""
连接方法-connect_srvDNS连接,使用 SRV DNS查找连接到代理以获取代理地址
client.connect_srv(domain, keepalive, bind_address,clean_start, properties)"""domain:用于搜索SRV记录的DNS域;如果没有则尝试本地域名其余参数参考connect方法"""
重新连接-reconnect在使用connect相关方法连接过之后,想要重新连接可以使用reconnect方法,完全复用之前connect相关的参数进行连接
client.reconnect()
断开连接-disconnect主动彻底断开和broker的连接,调用该方法不会等待排队的消息被发送
client.disconnect()
网络循环网络循环的函数有四种,运行在后台,处理收发的消息,如果不使用的话,传入的数据不会被处理,传出的数据不会发送
loop定期调用用以处理消息,通过 select() 函数阻塞,直到有消息需要收发,并根据消息类型触发对应的回调函数
run_status = True:while run_status:clinet.loop(timeout,max_packets) """timeout:默认值1.0,阻塞超时时间,单位S,不能超过心跳时间keepalive,否则client会定时从broker 断开max_packets:默认值是1,不用设置,已经过时不使用"""
loop_start / loop_stop实现loop的调用和停止,在connect相关方法之前或之后调用loop_start,会在后台运行一个线程字段调用loop,无需编码实现loop循环,
每个客户端必须有一个loop循环
# 在connect相关方法之前或之后调用,会在后台运行一个线程调用loop,连接中断的时候会自动尝试重新连接,无需编码实现loop循环client.loop_start()# 停止loop循环的后台线程client.loop_stop()
loop_forever网络循环的阻塞形式,loop_forever 调用会阻塞主线程,永远不会停止,直到客户端调用disconnect,会自动重新连接
client.loop_forever(timeout,max_packets,retry_first_connection)"""retry_first_connection:bool,默认False,在第一次连接尝试失败后是否应该进行重试,与reconnect_on_failure不同,它只影响第一次连接,如果设置为True,当第一次连接失败时,paho-mqtt会抛出OSError/WebsocketConnectionError异常"""
loop和loop_forever的区别loop方法:默认调用一下只循环一次,然后返回。这意味着它只会执行一次通信循环,不使用start方法的话需要无限while 并且在完成后将返回到调用程序。
loop_forever方法:它是一个无限循环,如果没有意外情况,将一直保持运行。 它在连接丢失或出现其他错误时会自动重新连接。因此,它可以用于长期运行的应用程序,以保持MQTT客户端连接
loop_start和loop_forever的区别loop_start函数是一个非阻塞的函数,可以启动一个线程来处理MQTT客户端的网络通信和事件处理。该函数返回后,MQTT客户端将会在后台线程中运行,不会阻塞当前线程。在调用loop_start函数后,可以分别调用connect、publish、subscribe等方法来进行MQTT通信。但是在使用完MQTT客户端后,需要调用loop_stop函数来停止后台线程
loop_forever函数是一个阻塞的函数,可以启动一个线程来处理MQTT客户端的网络通信和事件处理。该函数会一直阻塞当前线程,直到MQTT客户端接收到disconnect消息或者调用了disconnect函数。在调用loop_forever函数后,不能再使用connect、publish、subscribe等方法来进行MQTT通信,因为当前线程已经被阻塞。只有在调用了disconnect函数后,才会解除阻塞
因此,loop_start和loop_forever的区别在于是否阻塞当前线程。如果需要在MQTT客户端后台运行并且继续使用当前线程进行其他操作,可以使用loop_start函数;如果需要一直阻塞当前线程直到MQTT客户端退出,可以使用loop_forever函数
发布消息publish客户端发送消息到broker,broker将消息转发到订阅匹配主题的客户端
client.publish(topic, payload, qos, retain, properties)"""topic:消息发布的主题payload:默认None,要发送的实际消息,如果没有或者为None,将发送长度为0的消息,payload传递int或者float会被默认转换为str 发送真正类型的int、float需要使用struct.pack()创建 qps:消息之类默认0retain:是否设置保留消息properties:设置保留消息的属性"""
发布消息的返回消息发送之后会返回一个MQTTMessageInfo对象,有以下属性和方法
rc:发布的结果。可以是MQTT_ERR_SUCCESS表示成功,MQTT_ERR_NO_CONN表示客户端当前未连接,或者当使用max_queued_messages_set时,MQTT_ERR_QUEUE_SIZE表示消息既没有排队也没有发送mid:发布请求的消息ID。 mid值可以通过检查on_publish()回调中的mid参数来跟踪发布请求,如果已定义,更容易使用wait_for_publishwait_for_publish() :将阻塞直到消息被发布。如果消息未排队(rc == MQTT_ERR_QUEUE_SIZE),将引发 ValueError,如果发布时出现错误,则引发 RuntimeError,很可能是由于客户端未连接is_published:如果消息已发布返回True。如果消息未排队(rc == MQTT_ERR_QUEUE_SIZE),它将引发ValueError,如果发布时出现错误,则可能是由于客户端未连接而引发RuntimeErro订阅、退订主题subscribe如果主题为None,长度为零或无效(包含通配符),如果qos不是0、1 或 2 之一,或者有效负载的长度大于 268435455 字节,则会引发ValueError
客户端订阅主题方法,可以通过3种方式订阅,mqttv5还有3种方式
client.subscribe(topic, qos, options, properties)topic = "订阅的主题"qos = "服务质量默认为0"options:"订阅选项,mqtt v5 使用""""订阅的选项必须是SubscribeOptions对象,位于 from paho.mqtt.subscribeoptions import SubscribeOptionsSubscribeOptions的构造参数如下qos:与MQTT v3.1.1中相同。noLocal:True或False。如果设置为True,则订阅者不会收到自己的发布。retainAsPublished:True或False。如果设置为True,则收到的发布的retain标志将按发布者设置。retainHandling:RETAIN_SEND_ON_SUBSCRIBE,RETAIN_SEND_IF_NEW_SUB或RETAIN_DO_NOT_SEND 控制代理何时发送保留消息: RETAIN_SEND_ON_SUBSCRIBE:在任何成功的订阅请求上 RETAIN_SEND_IF_NEW_SUB:仅在订阅请求是新的情况下 RETAIN_DO_NOT_SEND:从不发送保留消息"""properties = "设置MQTT v5.0属性的properties实例,可选-如果不设置,则不发送任何属性"
字符串和整数订阅
client.subscribe("my/topic",2)"""topic:指定要订阅的订阅主题的字符串。qos:订阅所需的服务质量级别,默认为0。选项和属性:未使用"""
字符串和整数的元组
client.subscribe(("my/topic",1))"""topic: (topic, qos)的元组qos、选项和属性:未使用"""
字符串和整数元组的列表
client.subscribe([("my/topcic",0),("my/topic2",1)])"""这允许在单个 SUBSCRIPTION 命令中进行多个主题订阅,这比使用多次调用subscribe()更有效(topic, qos)的元组列表。topic 和 qos 都必须存在于所有元组中qos、选项和属性:未使用"""
字符串和订阅选项(仅MQTT v5.0)
from paho.mqtt.subscribeoptions import SubscribeOptionsclient.subscribe("my/topic",options=SubscribeOptions(qos=2))"""topic:订阅主题。qos:未使用。选项:MQTT v5.0订阅选项。properties:属性未设置"""
字符串和订阅选项元组(仅MQTT v5.0)
from paho.mqtt.subscribeoptions import SubscribeOptionsclient.subscribe(("my/topic", SubscribeOptions(qos=1)))"""topic: (topic, SubscribeOptions)的元组。主题和订阅选项必须出现在元组中qos、options、roperties未使用"""
字符串和订阅选项元组列表(仅MQTT v5.0)
from paho.mqtt.subscribeoptions import SubscribeOptionsclient.subscribe([("my/topic", SubscribeOptions(qos=1)),("my/topic2", SubscribeOptions(qos=2))])"""允许在单个SUBSCRIPTION中进行多个主题订阅命令,比使用多个调用更有效topic:格式元组(topic, SubscribeOptions)的列表。主题和订阅选项必须出现在所有元组中。qos和options:未使用。properties:未设置"""
订阅函数的返回unsubscribe返回一个元组(result, mid),其中result为MQTT_ERR_SUCCESS表示成功或(MQTT_ERR_NO_CONN, None)客户端当前未连接。
mid是订阅请求的消息 ID。mid 值可用于通过检查on_subscribe()回调中的 mid 参数(如果已定义)来跟踪订阅请求。
如果qos不是 0、1 或 2,或者主题为None或字符串长度为零,或者主题不是字符串、元组或列表,则引发ValueError
取消订阅一个或多个主题
client.unsubscribe(topic,properties)"""topic:一个主题字符串或主题字符串列表,取消订阅的订阅主题properties:默认值为None,mqttv5的订阅属性"""
取消订阅函数的返回回调方法返回一个元组(result, mid),其中result是MQTT_ERR_SUCCESS表示成功,或者(MQTT_ERR_NO_CONN, None)如果客户端当前未连接。mid是取消订阅请求的消息 ID。mid 值可用于通过检查on_unsubscribe()回调中的 mid 参数(如果已定义)来跟踪取消订阅请求。
如果主题为None或字符串长度为零,或者不是字符串或列表,则引发ValueError 。
回调是为响应事件而调用的函数,使用回调需要做两件事情,1.创建回调函数,2.将函数分配给回调
on_connect客户端连接、重新连接broker后,连接的回调方法,当客户端收到broker的CONNACK响应的时候,会触发on_connect()回调
on_connect(client, userdata, flags, rc)"""client:触发回调的客户端实例userdata:在Client()或者user_data_set()中设置的用户数据flags:一个包含来自代理的响应标志的字典,仅使用 clean session 设置为 0。如果 clean session=0 的客户端重新连接到它之前连接过 的代理,则此标志指示代理是否仍然具有客户端的会话信息。如果为 1,会话仍然存在rc:连接结果,0:连接成功 1:连接被拒绝 - 协议版本不正确 2:连接被拒绝 - 客户端标识符无效 3:连接被拒绝 - 服务器不可用 4:连接被拒绝 - 用户名或密码错误 5:连接被拒绝 - 未授权 6-255:当前未使用"""
使用示例
# 定义回调的方法 def on_connect(client, userdata, flags, rc): print("Connected with result code: " + str(rc))# 设置客户端的回调on_connect属性为定义的on_connect方法client.on_connect = on_connect
on_disconnect当客户端与代理断开时触发
on_disconnect(client, userdata, rc)"""client:触发的客户端实例userdata:用户数据rc:断开连接结果,如果是0,则回调是为了响应disconnect()调用而调用的。如果有任何其他值,则断开连接是意外的,例如可能由网络错误引起的"""
使用示例
# 定义回调函数def on_disconnect(client, userdata, rc): print("Unexpected disconnection %s" % rc) # 设置客户端的回调on_disconnect属性为定义的on_disconnect方法client.on_disconnect = on_disconnect
on_publish当Clinet发送消息到broker,会触发on_publish()回调
on_publish(client, userdata, mid)"""mid:消息ID"""
使用示例
def on_publish(client, userdata, mid): print(mid,"message publish")"""触发该回调对于 QoS 级别 1 和 2 的消息,意味着适当的握手已经完成对于 QoS 0,这仅表示消息已离开客户端。mid变量匹配从相应的publish()调用返回的 mid 变量,以允许跟踪传出消息。这个回调很重要,因为即使 publish() 调用返回成功,也并不总是意味着消息已发送"""
on_message 和 message_callback_addon_message,当 client 接收到已订阅的话题的消息并且与message_callback_add自定义主题筛选不匹配的时候,会调用 on_message() 回调函数
on_message(client, userdata, message)"""message:MQTTMessage类的实例。有topic, payload, qos, retain属性"""
使用示例
def on_message(client, userdata, message): print("主题:" + msg.topic + " 消息:"+ str(message.payload.decode("utf-8"))) client.on_message = on_message
message_callback_add,用于处理特定订阅过滤器的传入消息,包括使用通配符,message_callback_add自定义是自定义的一个主题筛选过滤器,比如我用message_callback_add 筛选主题 A,有A的消息触发message_callback_add回调,否则触发on_message
message_callback_add(sub, callback)"""sub:此回调进行匹配的订阅筛选器callback:要使用的回调"""
使用示例
# 自定义处理action主题的回调def action_message(client,userdata,message):print("action topic:"+message.topic)client.message_callback_add("topic",action_message)
如果想要删除使用message_callback_add注册的特定回调
client.message_callback_remove("topic")
on_subscribe当代理确认订阅时,将生成on_subscribe()回调
on_subscribe(client, userdata, mid, granted_qos)"""granted_qos:一个整数列表,给出代理为每个不同订阅请求授予的 QoS 级别"""
使用示例
def on_subscribe(client, userdata, mid, granted_qos):print(f"On Subscribed: qos ={granted_qos}") client.on_subscribe = on_subscribe
on_unsubscribe当代理确认取消订阅时,生成on_unsubscribe()回调
def on_unsubscribe(client, userdata, mid): print("un unsubscribe " + userdata) client.on_unsubscribe = on_unsubscribe
on_log当客户端有日志信息时调用
def on_log(client, userdata, level, buf): """ level:消息级别,MQTT_LOG_INFO、MQTT_LOG_NOTICE、MQTT_LOG_WARNING、MQTT_LOG_ERR和MQTT_LOG_DEBUG之一 buf:日志消息 """ # 可以与标准 Python 日志记录同时使用,可以通过enable_logger方法启用 print(buf) client.on_log = on_log
on_socket_open当套接字打开时调用。使用它来向外部事件循环注册套接字以进行读取
def on_socket_open(client, userdata, sock):print(f"{socket} open")client.on_socket_open = on_socket_open
on_socket_close当套接字即将关闭时调用。使用它从外部事件循环中注销套接字以进行读取
def on_socket_close(client, userdata, sock):print(f"{socket} close")client.on_socket_close = on_socket_close
on_socket_register_write当对套接字的写操作失败时调用,因为它会阻塞,例如输出缓冲区已满。使用它来向外部事件循环注册套接字以进行写入
def on_socket_register_write(client, userdata, sock): print(f"{socket} on_socket_register_write") client.on_socket_register_write = on_socket_register_write
on_socket_unregister_write当对套接字的写操作在先前失败后成功时调用。使用它从外部事件循环中注销套接字以进行写入。
def on_socket_unregister_write(client, userdata, sock): print(f"{sock} on_socket_register_write")client.on_socket_unregister_write = on_socket_unregister_write
外包事件循环支持# 循环读取-当套接字准备好读取时调用loop_read()# 循环写入-当套接字准备好写入时调用loop_write()# 每隔几秒调用一次以处理消息重试和 pingloop_misc()# 返回客户端中使用的套接字对象,以允许与其他事件循环进行交互,对于基于选择的循环比较有用socket()# 如果有数据等待写入,则返回 true,以允许将客户端与其他事件循环连接起来,对于基于选择的循环比较有用want_write()"""回调按以下顺序调优1. on_socket_open 2. on_socket_register_write - 零次或多次3. on_socket_unregister_write - 零次或多次4. on_socket_close"""
全局辅助函数客户端模块提供了一些全局辅助函数,对客户端行为提供一些行为操作
检查辅助函数# 导入import paho.mqtt.client as mqtt
topic_matches_subimport paho.mqtt.client as mqtt#检查主题与订阅是否匹配mqtt.topic_matches_sub(sub, topic)
connack_string# 返回与 CONNACK 结果关联的错误字符串,例如在连接回调中传入rc字段输出对应信息mqtt.connack_string(connack_code)
error_string# 返回与 Paho MQTT 错误号关联的错误字符串mqtt.error_string(mqtt_errno)
发布辅助函数允许以一次性方式直接发布消息。在有一条/多条消息要发布到代理的情况下很有用,然后断开连接而不需要其他任何东西,都对mqttv5协议支持,但是目前不允许在连接或者发布消息的时候设置属性
single向代理发布一条消息,然后干净地断开连接
# 导包from paho.mqtt.publish import single# 参数def single(topic, payload=None, qos=0, retain=False, hostname="localhost", port=1883, client_id="", keepalive=60, will=None, auth=None, tls=None, protocol=paho.MQTTv311, transport="tcp", proxy_args=None): """topci:主题payload:内容qos:服务质量retain:是否设置保留消息hostname:/port:/client_id:要使用的 MQTT 客户端 ID。如果为“”或 None,Paho 库将自动生成一个客户端 IDkeepalive:心跳间隔will:包含客户端遗嘱参数的字典,{"topic": "topic", "qos":1},主题是必须的,其他参数都是可选的auth:包含客户端身份验证参数的字典,{"username":"liqi","password":"123456"} 默认为无,表示不使用身份验证,如果设置的话用户名是必需的,密码是可选的 tls:包含客户端 TLS 配置参数的字典,{"ca_certs":"ca_certs"} ca_certs 是必需的,所有其他参数都是可选的,与tls_set方法参数一致 protocol:mqtt协议版本transport:默认tcp,设置为“websockets”以通过 WebSockets 发送 MQTTproxy_args:配置MQTT连接代理。启用对SOCKS或HTTP代理参数是字典类型 {"proxy_type":"代理类型,必选,socks.HTTP, socks.SOCKS4, or socks.SOCKS5", "proxy_addr":"代理服务器的IP地址或DNS名称,必选", "proxy_rdns":"bool,可选,是否应该执行代理查找,远程(True,默认)或本地(False)", "proxy_username":"可选,SOCKS5代理的用户名,SOCKS4代理的用户id", "proxy_password":"可选,SOCKS5代理密码" } 该参数对应的使用方法表示在connect() or connect_async()之前使用 """
multiple向代理发布多条消息,然后完全断开连接
# 导包from paho.mqtt.publish import multiple# 参数def multiple(msgs, hostname="localhost", port=1883, client_id="", keepalive=60, will=None, auth=None, tls=None, protocol=paho.MQTTv311, transport="tcp", proxy_args=None):"""msgs:要发布的消息列表。每个消息要么是一个字典,要么是一个元组,如果是字典,则只有主题必须存在。默认值将用于任何缺少的参数 如果是元组,参数按字段顺序对应其他参数与single一致"""
订阅辅助函数允许直接订阅和处理消息,都包括对 MQTT v5.0 的支持,但目前不允许在连接或订阅时设置任何属性
simple订阅一组主题并返回收到的消息。这是一个阻塞函数
# 导包from paho.mqtt.subscribe import simple# 参数def simple(topics, qos=0, msg_count=1, retained=True, hostname="localhost", port=1883, client_id="", keepalive=60, will=None, auth=None, tls=None, protocol=paho.MQTTv311, transport="tcp", clean_session=True, proxy_args=None):"""topics:是客户端将订阅的主题字符串。如果订阅多个主题,可以是字符串或字符串列表msg_count:消息计数,从代理检索的消息数。默认为 1。如果为 1,将返回单个 MQTTMessage 对象。如果 >1,将返回 MQTTMessages 列表retained:设置为 True 以保留消息,设置为 False 以忽略设置了保留标志的消息clean_session:会话设置其他参数参考single"""
使用示例
msg = simple("my/test")rint("%s %s" % (msg.topic, msg.payload))
callback订阅一组主题并使用用户提供的回调处理收到的消息。
# 导包from paho.mqtt.subscribe import callback# 参数def callback(callback, topics, qos=0, userdata=None, hostname="localhost", port=1883, client_id="", keepalive=60, will=None, auth=None, tls=None, protocol=paho.MQTTv311, transport="tcp", clean_session=True, proxy_args=None): """callback:回调,定义一个回调,将用于收到的每条消息,形式与on_message类似userdata:用户数据,将在收到消息时传递给 on_message 回调其他参数参考simple"""
使用示例
# 定义一个回调def on_message_print(client, userdata, message) print("%s %s" % (message.topic, message.payload))callback(callback=on_message_print, topics="my/test")
关键词:
-
Python MQTT客户端 paho-mqtt_全球信息
Python中MQTTPython有许多优秀的MQTT客户端,比较有代表性的有paho-mqtt、hbmqtt、gmqtt等,各有特色paho-mq
来源: Python MQTT客户端 paho-mqtt_全球信息
天天快资讯丨小孩睡觉尿床怎么回事?济南童康儿童医院可以治疗遗尿症吗?
头部血管瘤严重吗?注意事项有哪些?_焦点快看
首次亮相CMEF 中核粒子积极探索国产化之路
口碑见证 | 知心“季妈妈”,暖心陪伴获赞赏 全球即时
焦点热讯:“爸爸和继母离婚了,我还能跟继母一起生活吗?”法院判决:可以
每日快报!笔记本电脑关机快捷键是哪个_笔记本电脑关机快捷键
铀矿是可再生资源吗?铀矿上市公司有哪些?
工行闪酷卡是什么卡?工行闪酷卡有什么用?
塔防小游戏在线玩无需登录无需下载_塔防射箭小游戏 环球微头条
劲嘉股份(002191)5月15日主力资金净卖出654.33万元_世界观焦点
高档白酒三大标准是什么?高档白酒有哪些品牌?
丰谷酒是哪里产的?丰谷酒价格表一览2023
金马游乐:控股股东、实际控制人邓志毅累计质押约815万股,占其所持股份32.89% 世界百事通
股市风向标是什么意思?影响股市的指标有哪些?
特斯拉上海工厂2022年的产值1839亿元 同比增长49.7%
斗战神赏金模式怎么开?斗战神赏金模式能出活动材料吗?
番茄虾球是哪个地方的特色菜?家常菜番茄虾球的做法
芷江是哪个省哪个市?芷江的美食有哪些?
火烧赤壁打一词牌名是什么?词牌名格式大全及示例
成语八面玲珑最初是形容什么的?成语八面玲珑是褒义词还是贬义词?
“北向互换通”今日先行开通 开市半小时成交金额超18亿元-全球速讯
拉萨:树绿鸟欢生态好
环球今热点:西安安居乐筑科技路店什么时候装修好?
大众集团软件公司“地震”,或影响纯电新车上市
村镇银行应坚持线上线下融合 全面提升网点运营效率
天天观察:正宗炸肉丸子的做法_炸肉丸子的具体做法
大S将起诉汪小菲,反对公开孩子容貌,汪小菲以后见孩子难上加难|天天时快讯
每日观察!恭喜梅西!拉总官宣决定,可讨回欠薪+树立雕像,不受姆巴佩气了
【天天报资讯】男生眼睛小怎么变大 眼睛小怎么变大
盘点追星蔡徐坤90后总裁商业版图 追星蔡徐坤90后总裁实控40家企业_全球新动态
当前热讯:逾期48年还不起会影响征信吗
得意扬扬和得意洋洋有什么区别_得意扬扬和得意洋洋
嗓子疼流鼻涕是流感吗?能用连花清瘟治疗吗?
山东第一医科大学附属省立医院与平邑县中医医院举行医疗联合体签约揭牌仪式
世界快消息!江苏宝应:创新“融合”式服务 “一窗办结”更贴心
衡水学院广播电视学就业费
普者黑旅游景区交通指南-世界热点
陕西西安:千年古都 爱乐之城 当前速讯
环球热消息:最低1.606元/W!广东电力2023年第一批光伏组件采购中标候选人公示
“520”正值周六,在杨浦结婚登记预约方式→|焦点短讯
短讯!山东 | 济南市歌舞剧院:策划执行大型演出,展现铁军风采
今日看点:逆天战神1-70集免费看合集
新西兰航空购买8架新的波音787-9梦想飞机等 今日快讯
刷新行业天花板 魅族20系列提供3年超长质保-快播报
被子晒错等于白晒!被子的晾晒收纳指南来了!很多人做错~ 天天新要闻
太奇葩!20万费用支付不起?这家猎头将券商告上法庭,谁的过错?法院这样判
恶心!顾客买肉饼咬下去发现全是活蛆,商家:赔偿10元,网友怒了
【环球时快讯】我借给我朋友的钱。现在迟迟不还给我。算不算诈骗
川蜀专家杨明杰:“大医精诚”,为患者排忧解难是我的职责!|焦点短讯
“号源爆满,患者点赞”——北京骨科专家来院坐诊受欢迎 天天微速讯
重庆丰益肛肠医院:大便次数多该怎么办?-快报
迎5.12护士节,昆明东大肛肠医院积极参与健康中国公益活动 环球看热讯
贵阳精神病患街头持剪刀伤人,伤者均无生命危险-天天日报
520兰州城关区结婚登记要预约吗
《塞尔达传说王国之泪》新手村魔像打法教程 新手村BOSS方块魔像怎么打?
即时:便利店行业如何实现从跟随到引领?
qq怎么批量删除说说的历史_qq怎么批量删除说说 世界观天下
联想推出小新铝合金散热支架 Z2,首发价 79 元
2023年5月15日至16日云南省部分风景区天气预报
浙江省职工迎亚运英语口语大赛总决赛在杭州举办
催产素的副作用有哪些_催产素的副作用
【全球新视野】李家超:互换通将与债券通产生协同效应 释放投资内地资本市场潜力
天天信息:浙江送变电公司:应用焊接机器人提升施工质效
河南泌阳:支持起诉,为病弱老人维权
速讯:无线鼠标为啥不好使_为什么无线鼠标没反应
每日快播:夏季泡木耳用冷水还是热水?你做对了吗?
涉及145所大学,英罢工教师期末不阅卷,学生遭殃_天天报道
全球通讯!新技术密集落地!北上资金豪掷超20亿元加仓锂电龙头
每日简讯:日发精机(002520)5月12日主力资金净卖出221.05万元
通讯!浙c是哪里的车牌区号多少 浙c是哪里的车牌
土壤分层剖面图高清图_土壤分层
“去中国化”就是去机遇去发展-天天播报
线下演唱会回归 北京警方:提防购票诈骗三大套路|世界速看料
中育贝拉国际高中艺术课程都有哪些专业设置? -->_消息
全球观速讯丨挥别日化 广州浪奇二次创业
个税2021年税率表5000_个税2021年税率表_每日播报
上甘岭战役真实伤亡数字_上甘岭战役真实伤亡 观点
喝中药能吃牛肉吗?
哈兰德继续刷新进球纪录 曼城下轮取胜就提前2轮夺冠_世界速看料
满易贷贷款逾期3天延迟还款会影响征信吗 当前滚动
世界最新:中秋祝福语简洁大气图片_中秋祝福语简洁大气
不只输出产品更要输出技术、资本 新能源产业链公司加速“出海”
环球通讯!羽毛球场地标准尺寸是多少平方 羽毛球场地标准尺寸是多少
好利来的冰山和熔岩有什么区别?
2023上海博物馆日徐汇区免费(半价)开放名单
密保邮箱格式怎么写_密保邮箱大全|全球新视野
湖北汽车工业学院-科技学院(十堰汽车工业学院)-天天速递
每日头条!分辨率300dpi是几乘几_300dpi分辨率是多少像素
34度算低烧吗_35度算低烧吗 环球速递
【世界报资讯】怎么制作微课视频配音_怎么制作微课视频
世界微资讯!药物流产多少钱啊_药物流产多少钱
5
键盘上的三个灯一直闪是怎么回事_键盘上的三个灯 世界新资讯
当前快讯:“领跑”的中国力量|以岭连花清瘟入选首批“品牌建设领跑者”工程
天天信息:爱在母亲节│津力达陪伴左右,守护母亲血糖“甜度”不超标
环球今日报丨爱在母亲节│津力达陪伴左右,守护母亲血糖“甜度”不超标
四川友谊医院雷蕾做双眼皮怎么样? 世界资讯
驰援是什么意思啊(驰援是什么意思)
环球微头条丨萧的入门指法_怎么入门萧的指法