From 01ef89ec437f12e1bb4dfc6ac8dc5c19542c98e2 Mon Sep 17 00:00:00 2001 From: emqx-ci-robot Date: Wed, 30 Oct 2024 04:37:00 +0000 Subject: [PATCH] sync blog --- README-ZH.md | 2 +- README.md | 1 + en/202410/mqttx-1-11-0-release-notes.md | 189 ++++++++++++ zh/202008/how-to-use-mqtt-in-python.md | 264 ---------------- zh/202410/how-to-use-mqtt-in-python.md | 391 ++++++++++++++++++++++++ 5 files changed, 582 insertions(+), 265 deletions(-) create mode 100644 en/202410/mqttx-1-11-0-release-notes.md delete mode 100644 zh/202008/how-to-use-mqtt-in-python.md create mode 100644 zh/202410/how-to-use-mqtt-in-python.md diff --git a/README-ZH.md b/README-ZH.md index f2a0f245..9a9f103b 100644 --- a/README-ZH.md +++ b/README-ZH.md @@ -98,6 +98,7 @@ Get to know the preferred protocol in IoT from beginner to master. ## [MQTT Programming](https://www.emqx.com/zh/blog/category/mqtt-programming) Best practice of MQTT in various clients. +- [如何在 Python3 中使用 MQTT 客户端库 Paho Client](https://www.emqx.com/zh/blog/how-to-use-mqtt-in-python) ([Edit](https://github.com/emqx/blog/blob/main/zh/202410/how-to-use-mqtt-in-python.md)) - [如何在 Rust 中通过 Rumqttc 实现 MQTT 通信](https://www.emqx.com/zh/blog/how-to-use-mqtt-in-rust) ([Edit](https://github.com/emqx/blog/blob/main/zh/202409/how-to-use-mqtt-in-rust.md)) - [在 Flutter 项目中使用 MQTT](https://www.emqx.com/zh/blog/using-mqtt-in-flutter) ([Edit](https://github.com/emqx/blog/blob/main/zh/202409/using-mqtt-in-flutter.md)) - [开发指南:使用 MQTTNet 库构建 .Net 物联网 MQTT 应用程序](https://www.emqx.com/zh/blog/connecting-to-serverless-mqtt-broker-with-mqttnet-in-csharp) ([Edit](https://github.com/emqx/blog/blob/main/zh/202402/connecting-to-serverless-mqtt-broker-with-mqttnet-in-csharp.md)) @@ -120,7 +121,6 @@ Best practice of MQTT in various clients. - [Python MQTT 客户端对比](https://www.emqx.com/zh/blog/comparision-of-python-mqtt-client) ([Edit](https://github.com/emqx/blog/blob/main/zh/202009/comparision-of-python-mqtt-client.md)) - [如何在 Vue 项目中使用 MQTT](https://www.emqx.com/zh/blog/how-to-use-mqtt-in-vue) ([Edit](https://github.com/emqx/blog/blob/main/zh/202009/how-to-use-mqtt-in-vue.md)) - [如何在 Golang 中使用 MQTT](https://www.emqx.com/zh/blog/how-to-use-mqtt-in-golang) ([Edit](https://github.com/emqx/blog/blob/main/zh/202009/how-to-use-mqtt-in-golang.md)) -- [如何在 Python 中使用 MQTT](https://www.emqx.com/zh/blog/how-to-use-mqtt-in-python) ([Edit](https://github.com/emqx/blog/blob/main/zh/202008/how-to-use-mqtt-in-python.md)) - [在树莓派上使用 MQTT](https://www.emqx.com/zh/blog/use-mqtt-with-raspberry-pi) ([Edit](https://github.com/emqx/blog/blob/main/zh/202008/use-mqtt-with-raspberry-pi.md)) - [Android 使用 Kotlin 连接 MQTT](https://www.emqx.com/zh/blog/android-connects-mqtt-using-kotlin) ([Edit](https://github.com/emqx/blog/blob/main/zh/202006/android-connects-mqtt-using-kotlin.md)) - [ESP8266 连接到免费的公共 MQTT 服务器](https://www.emqx.com/zh/blog/esp8266-connects-to-the-public-mqtt-broker) ([Edit](https://github.com/emqx/blog/blob/main/zh/202005/esp8266-connects-to-the-public-mqtt-broker.md)) diff --git a/README.md b/README.md index b875cedf..872dacd2 100644 --- a/README.md +++ b/README.md @@ -551,6 +551,7 @@ Build a reliable, efficient and industry-specific Internet of Vehicles platform ## [MQTTX](https://www.emqx.com/en/blog/category/mqttx) MQTTX is a Fully Open-source MQTT 5.0 cross-platform Desktop Client, makes it easy and quick to create multiple simultaneous online MQTT client connections, test the connection, publish, and subscribe functions of MQTT/TCP, MQTT/TLS, MQTT/WebSocket as well as other MQTT protocol features. +- [MQTTX 1.11.0 Released: Topic Tree and Avro Schema](https://www.emqx.com/en/blog/mqttx-1-11-0-release-notes) ([Edit](https://github.com/emqx/blog/blob/main/en/202410/mqttx-1-11-0-release-notes.md)) - [MQTTX 1.10.1 Released: One-Click CLI Installation via Desktop](https://www.emqx.com/en/blog/mqttx-1-10-1-release-notes) ([Edit](https://github.com/emqx/blog/blob/main/en/202407/mqttx-1-10-1-release-notes.md)) - [MQTTX Web Online Site Migration Announcement](https://www.emqx.com/en/blog/mqttx-web-migration-announcement) ([Edit](https://github.com/emqx/blog/blob/main/en/202406/mqttx-web-migration-announcement.md)) - [MQTTX 1.10.0 Released: Advanced File Management and Configuration in CLI](https://www.emqx.com/en/blog/mqttx-1-10-0-release-notes) ([Edit](https://github.com/emqx/blog/blob/main/en/202406/mqttx-1-10-0-release-notes.md)) diff --git a/en/202410/mqttx-1-11-0-release-notes.md b/en/202410/mqttx-1-11-0-release-notes.md new file mode 100644 index 00000000..9dd3b901 --- /dev/null +++ b/en/202410/mqttx-1-11-0-release-notes.md @@ -0,0 +1,189 @@ +We’re thrilled to announce that MQTTX 1.11.0 is now available! + +This release introduces Topic Tree visualization, a powerful new way to organize and monitor [MQTT topics](https://www.emqx.com/en/blog/advanced-features-of-mqtt-topics) hierarchically. We've also added support for Avro Schema and Message Pack formats in both Desktop and CLI clients, expanding MQTTX's data format handling capabilities. These updates provide users with enhanced tools for understanding their MQTT message flows and managing complex data structures. + +> ***Download the latest version here:*** [***https://mqttx.app/downloads***](https://mqttx.app/downloads**) + +## Topic Tree + +Starting from v1.11.0, MQTTX Desktop introduces the **Viewer** feature, providing MQTT data visualization capabilities to help users better understand and monitor message flows. + +The **Topic Tree**, currently in `Beta` version, is a powerful visualization tool that transforms MQTT topics and messages into an intuitive hierarchical structure. It displays your MQTT topic relationships and message flows in a tree-like format, making it easier to understand and manage your MQTT data. + +Located in the **Viewer** menu, the **Topic Tree** automatically updates as your subscribed topics receive messages. The interface features a search box for quick topic filtering, operation buttons for tree manipulation, and a detailed information panel that provides message-specific data. + +The Topic Tree organizes nodes into three levels: + +- Root nodes show client connection details, including client ID and host information. +- Intermediate nodes represent topic hierarchies with their subtopics. +- Leaf nodes contain message details, including reception time, Retain flag, and QoS value. + +![MQTT Topic Tree](https://assets.emqx.com/images/c6712e791d2df6025b60a1059d110986.png) + +Users can switch to the visual tree diagram mode for a different perspective. This mode offers a more graphical representation of the topic structure, providing adjustable expansion levels (default is 4 layers) and interactive features. Users can hover over nodes to display topic paths, message counts, sub-topic counts, and message content. + +![Visualize Topic Tree](https://assets.emqx.com/images/21f0b0ce3a37e7686c3f72f745cde078.png) + +> Note: The Topic Tree feature is currently in Beta and may have incomplete functionality. We welcome your feedback and suggestions. If you encounter any issues or have ideas for improvement, please visit https://github.com/emqx/MQTTX/issues to share your thoughts. + +## Avro Schema Support + +> This feature is made possible thanks to a community contribution from [**@LAST7**](https://github.com/LAST7) + +MQTTX 1.11.0 introduces Avro Schema support, expanding its message format testing capabilities. This enhancement enables developers to thoroughly test their MQTT applications that use Avro for data serialization, ensuring messages are correctly encoded and decoded in their communication flows. + +Before publishing messages to your production environment, you can now validate that your JSON data is properly converted to Avro format and verify that subscribers can successfully decode Avro messages back to JSON. This testing capability helps identify potential serialization issues early in your development cycle, saving time and reducing debugging efforts in production. + +### Desktop + +The Desktop client provides an intuitive interface for Avro schema testing in the `Script` → `Schema` page. Here you can: + +- Define your Avro schema using the standard `.avsc` format. +- Test JSON to Avro encoding before publishing. +- Validate Avro to JSON decoding for subscribed messages. +- Immediately see conversion results in the testing panel. + +Example schema: + +```json +{ + "type": "record", + "name": "Person", + "fields": [ + { "name": "id", "type": "int" }, + { "name": "name", "type": "string" } + ] +} +``` + +![`Script` → `Schema` page](https://assets.emqx.com/images/aef3b5b57c6b2b509814289767d6928b.png) + +### CLI + +For automated testing scenarios or command-line operations, MQTTX CLI integrates Avro support through the `-Ap` or `--avsc-path` option: + +| -Ap, --avsc-path | the path to the .avsc file that defines the avro schema for AVRO encoding | +| :---------------------- | :----------------------------------------------------------- | +| | | + +``` +# Publish: Test your JSON to Avro encoding +mqttx pub -t 'msg/avro' -m '{"id": 1, "name": "hello"}' -Ap ./Person.avsc + +# Subscribe: Verify Avro to JSON decoding +mqttx sub -t 'msg/avro' -Ap ./Person.avsc +``` + +This feature enhances MQTTX's capability for testing structured data communication. It's ideal for IoT device development and data pipeline construction, ensuring accurate Avro Schema message handling in MQTT applications. + +## Message Pack Format Support + +> This feature is made possible thanks to a community contribution from [**@lantica**](https://github.com/lantica) + +MQTTX 1.11.0 now supports Message Pack, a highly efficient binary serialization format. This addition enables you to work with MQTT applications that demand more efficient data transmission while preserving structured data formats. By supporting Message Pack alongside JSON, Base64, and CBOR, MQTTX broadens its capacity to handle diverse MQTT messaging scenarios. + +### Desktop + +The Desktop client fully integrates Message Pack format into both publishing and subscribing interfaces: + +- Publishing: Send MQTT messages in Message Pack format, automatically converting your JSON data into a compact binary format. +- Subscribing: Receive Message Pack formatted MQTT messages and view them as readable JSON, simplifying message inspection and comprehension. + +![Message Pack](https://assets.emqx.com/images/e876bab4b35b1c0a9c1448801000ea07.png) + +### CLI + +The CLI supports Message Pack format through the `-f` or `--format` option: + +```shell +# Send messages in Message Pack format +mqttx pub -t 'msg/msgpack' -m '{"value": 123}' -f msgpack + +# Receive Message Pack formatted messages +mqttx sub -t 'msg/msgpack' -f msgpack +``` + +This enhancement boosts MQTTX's versatility as a testing tool, particularly for IoT scenarios where message size efficiency is crucial. Whether you're testing systems that already use Message Pack or exploring more efficient message formats for your MQTT communications, MQTTX now offers the capability to work with this compact binary format. + +## WebSocket Headers Support in CLI + +MQTTX 1.11.0 introduces custom WebSocket headers support for the CLI client—a feature highly requested by the community. This enhancement allows users to add custom headers when establishing WebSocket connections, which is particularly useful for scenarios requiring special authentication or custom data transmission. + +Use the `-wh` or `--ws-header` option to specify custom WebSocket headers: + +```shell +mqttx sub -t test -wh "Authorization: Bearer token123" -wh "Custom-Header: value" -l ws -p 8083 +``` + +> Note: This feature is only available in the CLI client running in a Node.js environment. Due to browser security restrictions, the Desktop and Web versions cannot modify WebSocket headers—a known limitation in browser WebSocket implementations. + +This feature addresses a common need in WebSocket MQTT connections, particularly in scenarios such as: + +- Adding authentication tokens +- Including custom identification headers +- Setting up special connection parameters + +This addition makes MQTTX CLI a more flexible tool for testing MQTT over WebSocket connections, especially in enterprise environments where custom headers are required for security or routing purposes. + +## Configurable QoS 0 Message Storage + +MQTTX 1.11.0 introduces an option to ignore QoS 0 messages, optimizing performance for high-volume message flows. QoS 0 messages—the most basic form of message delivery without guaranteed delivery—can now be configured to display without local storage. + +In the Settings page under the Advanced section, you'll find the "Ignore QoS 0 Message" option, disabled by default. When enabled: + +- QoS 0 messages are displayed but not stored locally. +- Storage overhead is reduced in high-frequency messaging scenarios. +- Previously saved QoS 0 messages remain intact. + +This feature is especially useful when testing MQTT applications that generate large volumes of QoS 0 messages. It allows you to focus on messages requiring delivery guarantees while maintaining optimal performance and reducing storage space usage. + +![Ignore QoS 0 messages](https://assets.emqx.com/images/9a65db1c7939f1049c1f104bbd26f745.png) + +## Other Improvements + +### Enhanced Client Configuration + +- **Empty Client ID Support**: Introduced support for empty client IDs in MQTT 5.0, enabling brokers to automatically allocate client IDs. This enhancement allows for a more accurate simulation of MQTT 5.0 scenarios. +- **Default Session Expiry**: Adjusted the default session expiry interval to 7200 seconds (2 hours) for MQTT 5.0 connections with clean start set to false. This change aligns with EMQX's default configuration and optimizes server-side resource management. + +### Improved MQTT Operations + +- **Wildcard Topic Matching**: Resolved an issue where wildcard topic matching with '#' incorrectly matched unintended topics (e.g., 'foo/#' matching 'foobar/#'). +- **Version Notation**: Updated MQTT version notation from '5' to '5.0' in CLI, enhancing clarity and consistency. + +### AI Copilot Enhancements + +- **Default Model Update**: Switched the default model to GPT-4o, delivering improved performance and capabilities. +- **New Model Support**: Expanded support to include additional GPT models, such as GPT-4o-mini and o1-preview. +- **DeepSeek Integration**: Incorporated DeepSeek model support for users in China, offering a cost-effective language model alternative. + +### General Fixes + +- Resolved connection name style issues. +- Improved benchmark subscription topics logging. +- Corrected some typos throughout the Desktop client. + +## Roadmap + +- **Topic Tree Milestone-2**: Enhance organization and visualization of topics. +- **Payload Chart Visualization Enhancement - MQTTX Viewer**: + - **Diff View**: Compare different messages or payloads easily. + - **Dashboard View:** Offer a customizable overview of MQTT activities for personalized insights. + - **JSON View**: Improve handling and display of JSON formatted data. + - **System Topic View**: Specialized view for system-related [MQTT topics](https://www.emqx.com/en/blog/advanced-features-of-mqtt-topics). +- **Support for Configurable Disconnect Properties (MQTT 5.0)**: Enhance connection management with customizable disconnection settings. +- **IoT Scenario Data Simulation**: Bring this feature to the desktop client to ease IoT scenario testing. +- **Sparkplug B Support**: Extend MQTTX functionalities to include support for Sparkplug B. +- **MQTT GUI Debug Functionality**: New features to aid in debugging MQTT communications. +- **Plugin Functionality**: Introduction of a plugin system supporting protocol extensions like [CoAP](https://www.emqx.com/en/blog/coap-protocol) and [MQTT-SN](https://www.emqx.com/en/blog/connecting-mqtt-sn-devices-using-emqx). +- **JSON Schema**: Encoding and decoding capabilities for JSON Schema. +- **Script Test Automation (Flow)**: Simplify the creation and management of automated testing workflows. + + + +
+
+ Try MQTTX for Free +
+ Get Started → +
diff --git a/zh/202008/how-to-use-mqtt-in-python.md b/zh/202008/how-to-use-mqtt-in-python.md deleted file mode 100644 index 4b41b12d..00000000 --- a/zh/202008/how-to-use-mqtt-in-python.md +++ /dev/null @@ -1,264 +0,0 @@ -[Python](https://www.python.org/) 是一种广泛使用的解释型、高级编程、通用型编程语言。Python 的设计哲学强调代码的可读性和简洁的语法(尤其是使用空格缩进划分代码块,而非使用大括号或者关键词)。Python 让开发者能够用更少的代码表达想法,不管是小型还是大型程序,该语言都试图让程序的结构清晰明了。[^1] - -[MQTT](https://mqtt.org/) 是一种基于发布/订阅模式的 **轻量级物联网消息传输协议** ,可以用极少的代码和带宽为联网设备提供实时可靠的消息服务,它广泛应用于物联网、移动互联网、智能硬件、[车联网](https://www.emqx.com/zh/blog/category/internet-of-vehicles)、电力能源等行业。 - -本文主要介绍如何在 Python 项目中使用 **paho-mqtt** 客户端库 ,实现客户端与 MQTT 服务器的连接、订阅、取消订阅、收发消息等功能。 - - - -## 项目初始化 - -本项目使用 Python 3.6 进行开发测试,读者可用如下命令确认 Python 的版本。 - -``` -➜ ~ python3 --version -Python 3.6.7 -``` - -#### 选择 MQTT 客户端库 - -[paho-mqtt](https://www.eclipse.org/paho/clients/python/) 是目前 Python 中使用较多的 MQTT 客户端库,它在 Python 2.7 或 3.x 上为客户端类提供了对 MQTT v3.1 和 v3.1.1 的支持。它还提供了一些帮助程序功能,使将消息发布到 MQTT 服务器变得非常简单。 - -#### Pip 安装 Paho MQTT 客户端 - -Pip 是 Python 包管理工具,该工具提供了对 Python 包的查找、下载、安装、卸载的功能。 - -```bash -pip3 install -i https://pypi.doubanio.com/simple paho-mqtt -``` - - - -## Python MQTT 使用 - -### 连接 MQTT 服务器 - -本文将使用 EMQX 提供的 [免费公共 MQTT 服务器](https://www.emqx.com/zh/mqtt/public-mqtt5-broker),该服务基于 EMQX 的 [MQTT 物联网云平台](https://www.emqx.com/en/cloud) 创建。服务器接入信息如下: - -- Broker: **broker.emqx.io** -- TCP Port: **1883** -- Websocket Port: **8083** - -#### 导入 Paho MQTT客户端 - -```python -from paho.mqtt import client as mqtt_client -``` - -#### 设置 MQTT Broker 连接参数 - -设置 MQTT Broker 连接地址,端口以及 topic,同时我们调用 Python `random.randint` 函数随机生成 MQTT 客户端 id。 - -```python -broker = 'broker.emqx.io' -port = 1883 -topic = "/python/mqtt" -client_id = f'python-mqtt-{random.randint(0, 1000)}' -``` - -#### 编写 MQTT 连接函数 - -编写连接回调函数 `on_connect`,该函数将在客户端连接后被调用,在该函数中可以依据 `rc` 来判断客户端是否连接成功。通常同时我们将创建一个 MQTT 客户端,该客户端将连接到 `broker.emqx.io`。 - -```python -def connect_mqtt(): - def on_connect(client, userdata, flags, rc): - if rc == 0: - print("Connected to MQTT Broker!") - else: - print("Failed to connect, return code %d\n", rc) - # Set Connecting Client ID - client = mqtt_client.Client(client_id) - client.on_connect = on_connect - client.connect(broker, port) - return client -``` - -### 发布消息 - -首先定义一个 while 循环语句,在循环中我们将设置每秒调用 MQTT 客户端 `publish` 函数向 `/python/mqtt` 主题发送消息。 - -```python - def publish(client): - msg_count = 0 - while True: - time.sleep(1) - msg = f"messages: {msg_count}" - result = client.publish(topic, msg) - # result: [0, 1] - status = result[0] - if status == 0: - print(f"Send `{msg}` to topic `{topic}`") - else: - print(f"Failed to send message to topic {topic}") - msg_count += 1 -``` - -### 订阅消息 - -编写消息回调函数 `on_message`,该函数将在客户端从 MQTT Broker 收到消息后被调用,在该函数中我们将打印出订阅的 topic 名称以及接收到的消息内容。 - -```python -def subscribe(client: mqtt_client): - def on_message(client, userdata, msg): - print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic") - - client.subscribe(topic) - client.on_message = on_message -``` - -### 完整代码 - -**消息发布代码** - -```python -# python 3.6 - -import random -import time - -from paho.mqtt import client as mqtt_client - - -broker = 'broker.emqx.io' -port = 1883 -topic = "/python/mqtt" -# generate client ID with pub prefix randomly -client_id = f'python-mqtt-{random.randint(0, 1000)}' - - -def connect_mqtt(): - def on_connect(client, userdata, flags, rc): - if rc == 0: - print("Connected to MQTT Broker!") - else: - print("Failed to connect, return code %d\n", rc) - - client = mqtt_client.Client(client_id) - client.on_connect = on_connect - client.connect(broker, port) - return client - - -def publish(client): - msg_count = 0 - while True: - time.sleep(1) - msg = f"messages: {msg_count}" - result = client.publish(topic, msg) - # result: [0, 1] - status = result[0] - if status == 0: - print(f"Send `{msg}` to topic `{topic}`") - else: - print(f"Failed to send message to topic {topic}") - msg_count += 1 - - -def run(): - client = connect_mqtt() - client.loop_start() - publish(client) - - -if __name__ == '__main__': - run() - -``` - -**消息订阅代码** - -```python -# python3.6 - -import random - -from paho.mqtt import client as mqtt_client - - -broker = 'broker.emqx.io' -port = 1883 -topic = "/python/mqtt" -# generate client ID with pub prefix randomly -client_id = f'python-mqtt-{random.randint(0, 100)}' - - -def connect_mqtt() -> mqtt_client: - def on_connect(client, userdata, flags, rc): - if rc == 0: - print("Connected to MQTT Broker!") - else: - print("Failed to connect, return code %d\n", rc) - - client = mqtt_client.Client(client_id) - client.on_connect = on_connect - client.connect(broker, port) - return client - - -def subscribe(client: mqtt_client): - def on_message(client, userdata, msg): - print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic") - - client.subscribe(topic) - client.on_message = on_message - - -def run(): - client = connect_mqtt() - subscribe(client) - client.loop_forever() - - -if __name__ == '__main__': - run() -``` - - - - -## 测试 - -#### 消息发布 - -运行 MQTT 消息发布代码,我们将看到客户端连接成功,并且成功将消息发布。 - -```bash -python3 pub.py -``` - -![pub.png](https://assets.emqx.com/images/e4b472134f5c648220a04d29472bfecb.png) - -#### 消息订阅 - -运行 MQTT 消息订阅代码,我们将看到客户端连接成功,并且成功接收到发布的消息。 - -```bash -python3 sub.py -``` - -![sub.png](https://assets.emqx.com/images/770ba0ce419f2430db94d9e90cb30250.png) - - - -## 总结 - -至此,我们完成了使用 **paho-mqtt** 客户端连接到 [公共 MQTT 服务器](https://www.emqx.com/zh/mqtt/public-mqtt5-broker),并实现了测试客户端与 MQTT 服务器的连接、消息发布和订阅。 - -与 C ++ 或 Java 之类的高级语言不同,Python 比较适合设备侧的业务逻辑实现,使用 Python 您可以减少代码上的逻辑复杂度,降低与设备的交互成本。我们相信在物联网领域 Python 将会有更广泛的应用。 - -接下来,读者可访问 EMQ 提供的 [MQTT 入门与进阶](https://www.emqx.com/zh/mqtt-guide)系列文章学习 MQTT 主题及通配符、保留消息、遗嘱消息等相关概念,探索 MQTT 的更多高级应用,开启 MQTT 应用及服务开发。 - - - -[^1]: https://zh.wikipedia.org/wiki/Python - - - -
-
- 免费试用 EMQX Cloud -
全托管的云原生 MQTT 消息服务
-
- 开始试用 → -
diff --git a/zh/202410/how-to-use-mqtt-in-python.md b/zh/202410/how-to-use-mqtt-in-python.md new file mode 100644 index 00000000..e0675e96 --- /dev/null +++ b/zh/202410/how-to-use-mqtt-in-python.md @@ -0,0 +1,391 @@ +## 引言 + +[MQTT](https://www.emqx.com/zh/blog/the-easiest-guide-to-getting-started-with-mqtt) 是一种轻量级的消息传输协议,采用发布/订阅模式,使用最少的代码和带宽提供可靠的实时通信。它特别适用于资源有限和低带宽网络的设备,因此在物联网、移动互联网、车联网和电力行业得到了广泛应用。 + +Python 因其灵活性、易用性和丰富的库而在物联网中被广泛使用。由于能够处理大量数据,Python 非常适合智能家居自动化、环境监测和工业控制。它与微控制器兼容,使其成为开发物联网设备的重要工具。 + +本文主要介绍如何在 Python 项目中使用 paho-mqtt 客户端,实施与 MQTT 代理之间的连接、订阅、消息传递等功能。 + +## 为什么选择 Paho MQTT Python 客户端? + +Paho Python 客户端提供了一个支持 [MQTT 5.0](https://www.emqx.com/zh/blog/introduction-to-mqtt-5)、3.1.1 和 3.1 的客户端,适用于 Python 2.7 或 3.x。它还提供了一些辅助函数,使得向 [MQTT 服务器](https://www.emqx.com/zh/blog/the-ultimate-guide-to-mqtt-broker-comparison)发布单条消息变得非常简单。 + +作为 Python 社区中最受欢迎的 MQTT 客户端库,Paho MQTT Python 客户端具有以下优点: + +- 开源且得到社区支持; +- 简洁易用的 API,便于连接 MQTT 服务器并进行消息的发布/订阅; +- 支持多种安全机制; +- 积极开发和维护,以适应快速发展的物联网环境。 + +## Python MQTT 项目准备 + +### Python 版本 + +该项目在 Python 3.11 中开发和测试。请确认您安装了正确的 Python 版本,可以使用以下命令: + +```shell +$ python3 --version +Python 3.11.8 +``` + +### 安装 Paho MQTT 客户端 + +paho-mqtt 在 2024 年 2 月发布了 2.0.0 版本,相比 1.X 版本有一些重要更新。本文主要演示 1.X 版本的代码,同时也会提供 2.0.0 版本的相应代码,供读者选择合适的 paho-mqtt 版本。 + +> 有关 2.0.0 版本的详细变更,请参阅文档:[Migrations — Eclipse paho-mqtt documentation](https://eclipse.dev/paho/files/paho.mqtt.python/html/migrations.html) + +**使用 Pip 安装 paho-mqtt 1.X** + +```shell +pip3 install "paho-mqtt<2.0.0" +``` + +**使用 Pip 安装 paho-mqtt 2.X** + +```shell +pip3 install paho-mqtt +``` + +> 如果您需要安装 Pip 的帮助,请参考官方文档:[Installation - pip documentation v24.2](https://pip.pypa.io/en/stable/installation/) 。该资源提供了在不同操作系统和环境中安装 Pip 的详细说明。 + +## 准备 MQTT Broker + +在开始之前,请确保您有一个 [MQTT Broker](https://www.emqx.com/zh/blog/the-ultimate-guide-to-mqtt-broker-comparison) 用于通信和测试。我们建议使用 EMQX Platform 的 Serverless 版本。 + +EMQX Platform 是一个全托管的 MQTT 消息云服务,可以无缝连接您的物联网设备到任何云端,无需维护基础设施。EMQX Serverless 在安全、可扩展的集群上提供 MQTT 服务,并采用按量计费的定价模式,是适合快速开启 MQTT 项目的灵活经济的解决方案。 + +
+
+ 免费试用 EMQX Platform +
+ 开始试用 → +
+ +为简化流程,本文将使用[免费的公共 MQTT 服务器](https://www.emqx.com/zh/mqtt/public-mqtt5-broker): + +- 服务器:`http://broker.emqx.io` +- TCP 端口:`1883` +- WebSocket 端口:`8083` +- SSL/TLS 端口:`8883` +- 安全 WebSocket 端口:`8084` + +## Paho MQTT Python 客户端使用 + +导入 Paho MQTT 客户端: + +```python +from paho.mqtt import client as mqtt_client +``` + +### 创建 MQTT 连接 + +#### TCP 连接 + +我们需要指定 MQTT 连接的代理地址、端口和主题。此外,我们可以使用 Python 的 `random.randint` 函数生成随机的客户端 ID。 + +```python +broker = 'broker.emqx.io' +port = 1883 +topic = "python/mqtt" +client_id = f'python-mqtt-{random.randint(0, 1000)}' +# username = 'emqx' +# password = 'public' +``` + +> *了解更多请查看博客:* [*创建 MQTT 连接时如何设置参数*](https://www.emqx.com/zh/blog/how-to-set-parameters-when-establishing-an-mqtt-connection) + +接下来,我们需要编写 `on_connect` 回调函数,以便连接代理。此函数在客户端成功连接后被调用,我们可以使用 `rc` 参数检查连接状态。通常,我们还会创建一个同时连接到 `broker.emqx.io` 的客户端对象。 + +```shell +def connect_mqtt(): + def on_connect(client, userdata, flags, rc): + # For paho-mqtt 2.0.0, you need to add the properties parameter. + # def on_connect(client, userdata, flags, rc, properties): + if rc == 0: + print("Connected to MQTT Broker!") + else: + print("Failed to connect, return code %d\n", rc) + # Set Connecting Client ID + client = mqtt_client.Client(client_id) + + # For paho-mqtt 2.0.0, you need to set callback_api_version. + # client = mqtt_client.Client(client_id=client_id, callback_api_version=mqtt_client.CallbackAPIVersion.VERSION2) + + # client.username_pw_set(username, password) + client.on_connect = on_connect + client.connect(broker, port) + return client +``` + +#### 自动重连 + +在 [MQTT 客户端库](https://www.emqx.com/zh/mqtt-client-sdk)中,自动重连功能确保在不稳定的网络条件下,设备与代理之间可靠的通信,无需人工干预。当网络连接中断或代理暂时不可用时,客户端可以恢复发布或订阅主题,这对于汽车系统和医疗设备等高可靠性应用至关重要。 + +Paho MQTT 客户端的自动重连代码如下: + +```python +FIRST_RECONNECT_DELAY = 1 +RECONNECT_RATE = 2 +MAX_RECONNECT_COUNT = 12 +MAX_RECONNECT_DELAY = 60 + +def on_disconnect(client, userdata, rc): + logging.info("Disconnected with result code: %s", rc) + reconnect_count, reconnect_delay = 0, FIRST_RECONNECT_DELAY + while reconnect_count < MAX_RECONNECT_COUNT: + logging.info("Reconnecting in %d seconds...", reconnect_delay) + time.sleep(reconnect_delay) + + try: + client.reconnect() + logging.info("Reconnected successfully!") + return + except Exception as err: + logging.error("%s. Reconnect failed. Retrying...", err) + + reconnect_delay *= RECONNECT_RATE + reconnect_delay = min(reconnect_delay, MAX_RECONNECT_DELAY) + reconnect_count += 1 + logging.info("Reconnect failed after %s attempts. Exiting...", reconnect_count) +``` + +然后,将其设置为客户端对象的 `on_disconnect`。 + +```python +client.on_disconnect = on_disconnect +``` + +客户端自动重连的完整代码请见:[GitHub](https://github.com/emqx/MQTT-Client-Examples/blob/master/mqtt-client-Python3/pub_sub_tcp.py). + +#### TLS/SSL + +在 MQTT 中使用 TLS 可以确保信息的机密性和完整性,防止信息泄露和篡改。TLS 认证可以分为单向认证和双向认证。 + +#### 单向认证 + +Paho MQTT 客户端的单向认证代码如下: + +```python +def connect_mqtt(): + client = mqtt_client.Client(CLIENT_ID) + client.tls_set(ca_certs='./broker.emqx.io-ca.crt') +``` + +**双向认证** + +Paho MQTT 客户端的双向认证代码如下: + +```python +def connect_mqtt(): + client = mqtt_client.Client(CLIENT_ID) + client.tls_set( + ca_certs='./server-ca.crt', + certfile='./client.crt', + keyfile='./client.key' + ) +``` + +### 发布消息 + +创建一个 while 循环,每秒向主题 `/python/mqtt` 发送一条消息,并在发送 5 条消息后退出循环。 + +```python + def publish(client): + msg_count = 1 + while True: + time.sleep(1) + msg = f"messages: {msg_count}" + result = client.publish(topic, msg) + # result: [0, 1] + status = result[0] + if status == 0: + print(f"Send `{msg}` to topic `{topic}`") + else: + print(f"Failed to send message to topic {topic}") + msg_count += 1 + if msg_count > 5: + break +``` + +### 订阅 + +创建消息回调函数 `on_message`,当客户端收到来自 MQTT Broker 的消息时触发。我们将在此函数中打印订阅主题的名称和收到的消息。 + +```python +def subscribe(client: mqtt_client): + def on_message(client, userdata, msg): + print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic") + + client.subscribe(topic) + client.on_message = on_message +``` + +## 完整代码 + +### MQTT 消息发布代码 + +```python +# python 3.11 + +import random +import time + +from paho.mqtt import client as mqtt_client + + +broker = 'broker.emqx.io' +port = 1883 +topic = "python/mqtt" +# Generate a Client ID with the publish prefix. +client_id = f'publish-{random.randint(0, 1000)}' +# username = 'emqx' +# password = 'public' + +def connect_mqtt(): + def on_connect(client, userdata, flags, rc): + if rc == 0: + print("Connected to MQTT Broker!") + else: + print("Failed to connect, return code %d\n", rc) + + client = mqtt_client.Client(client_id) + # client.username_pw_set(username, password) + client.on_connect = on_connect + client.connect(broker, port) + return client + + +def publish(client): + msg_count = 1 + while True: + time.sleep(1) + msg = f"messages: {msg_count}" + result = client.publish(topic, msg) + # result: [0, 1] + status = result[0] + if status == 0: + print(f"Send `{msg}` to topic `{topic}`") + else: + print(f"Failed to send message to topic {topic}") + msg_count += 1 + if msg_count > 5: + break + + +def run(): + client = connect_mqtt() + client.loop_start() + publish(client) + client.loop_stop() + + +if __name__ == '__main__': + run() +``` + +### MQTT 订阅代码 + +```python +# python 3.11 + +import random + +from paho.mqtt import client as mqtt_client + + +broker = 'broker.emqx.io' +port = 1883 +topic = "python/mqtt" +# Generate a Client ID with the subscribe prefix. +client_id = f'subscribe-{random.randint(0, 100)}' +# username = 'emqx' +# password = 'public' + + +def connect_mqtt() -> mqtt_client: + def on_connect(client, userdata, flags, rc): + if rc == 0: + print("Connected to MQTT Broker!") + else: + print("Failed to connect, return code %d\n", rc) + + client = mqtt_client.Client(client_id) + # client.username_pw_set(username, password) + client.on_connect = on_connect + client.connect(broker, port) + return client + + +def subscribe(client: mqtt_client): + def on_message(client, userdata, msg): + print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic") + + client.subscribe(topic) + client.on_message = on_message + + +def run(): + client = connect_mqtt() + subscribe(client) + client.loop_forever() + + +if __name__ == '__main__': + run() +``` + +## 测试 + +#### 订阅 + +运行 MQTT 订阅脚本 `sub.py`,我们将看到客户端成功连接并开始等待发布者发布消息。 + +```shell +python3 sub.py +``` + +![Subscribe to MQTT Topic](https://assets.emqx.com/images/f6fa795ecafac8e476b12018345ecf60.png) + +#### 发布消息 + +运行 MQTT 消息发布脚本 `pub.py`,我们会看到客户端成功连接并发布了 5 条消息。同时 `sub.py` 也会成功收到 5 条消息。 + +```shell +python3 pub.py +``` + +![Publish MQTT Messages](https://assets.emqx.com/images/cff08d70fe77b9a2391672f3816ba260.png) + +## Paho MQTT Python 客户端的常见问题 + +### 如果不执行 loop_stop() 会发生什么? + +Loop_stop() 用于停止 MQTT 客户端的消息循环并将其标记为已停止。此过程可确保客户端正常关闭,从而降低消息丢失、连接泄漏和异常程序行为等问题的风险。 + +例如,在本文提供的 pub.py 示例中,删除 `client.loop_stop()` 方法可能会导致 `sub.py` 脚本接收到的消息少于 5 条。 + +因此,正确使用 loop_stop() 方法来确保 MQTT 客户端正常关闭并防止由于未关闭连接而可能出现的任何潜在问题至关重要。 + +### connect_async() 是用来做什么的? + +`connect_async()` 在 MQTT 客户端应用程序需要长期 MQTT 连接或需要在后台保持 MQTT 连接处于活动状态而不阻塞主线程的情况下很有用。其主要使用场景有: + +- **长期 MQTT 连接**:`connect_async()` 有助于防止需要长期 MQTT 连接的 MQTT 客户端应用程序停滞或无响应,例如在工业应用程序中。 +- **网络连接不稳定**:在网络连接不确定或不稳定的环境中,可以使用 `connect_async()` 通过重试和延迟建立连接来提高应用程序的可靠性。 +- **频繁的连接和参数更改**:当连接参数或其他设置频繁更改时,`connect_async()` 有助于提高应用程序响应能力并防止卡顿。 +- **后台 MQTT 连接**:`connect_async()` 允许在应用程序运行其他进程时在后台建立 MQTT 连接,从而增强用户体验。 + +## 结语 + +本文介绍了如何使用 paho-mqtt 客户端连接到免费的公共 MQTT Broker。我们成功实现了连接过程,使用 `publish()` 方法将消息从测试客户端发送到 Broker,并使用 `subscribe()` 方法从 Broker 订阅消息。 + +接下来,您可以查看由 EMQ 提供的《[MQTT 教程](https://www.emqx.com/zh/mqtt-guide)》系列,了解 MQTT 协议的特性,探索更多 MQTT 的高级应用,进行 MQTT 应用与服务开发。 + + + +
+
+ 咨询 EMQ 技术专家 +
+ 联系我们 → +