1 MQTT概述
1.1 MQTT概念
MQTT (Message Queue Telemetry Transport),翻译成中文就是,遥测传输协议,其主要提供了订阅/发布两种消息模式,更为简约、轻量,易于使用,特别适合于受限环境(带宽低、网络延迟高、网络通信不稳定)的消息分发,属于物联网(Internet of Thing)的一个标准传输协议。
设计思想是开放、简单、轻量、易于实现。这些特点使它适用于受限环境。例如,但不仅限于此:
- 特别适合于网络代价昂贵,带宽低、不可靠的环境。
- 能在处理器和内存资源有限的嵌入式设备中运行。
- 使用发布/订阅消息模式,提供一对多的消息发布,从而解除应用程序耦合。
- 使用 TCP/IP 提供网络连接。
- 提供Last Will 和 Testament 特性通知有关各方客户端异常中断的机制。
对消息中间件,估计大家不得不关心的就是消息的可靠性,也就是消息的发布服务质量,可喜的是,MQTT支持三种消息发布服务质量(QoS):
- “至多一次”(QoS==0),消息发布完全依赖底层 TCP/IP 网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。
- “至少一次”(QoS==1),确保消息到达,但消息重复可能会发生。
- “只有一次”(QoS==2),确保消息到达一次。这一级别可用于如下情况,在计费系统中,消息重复或丢失会导致不正确的结果。小型传输,开销很小(固定长度的头部是 2 字节),协议交换最小化,以降低网络流量。
1.2 MQTT协议的通信模型
下图是MQTT协议的通信模型。

Figure 1‑1 MQTT协议的通信模型
【基本概念】
Topic:Topic是UTF-8字符串,是发布/订阅(Pub/Sub)模型中消息的中介,可以向Topic发布或者订阅消息。
Topic类:同一产品下不同设备的Topic集合,用\{productkey\}和{deviceName}通配一个唯一的设备,一个Topic类对一个ProductKey下所有设备通用。
发布(Pub):操作Topic的权限类型,具有往Topic中发布消息的权限。
订阅(Sub):操作Topic的权限类型,具有从Topic中订阅消息的权限。

Figure 1‑2 MQTT协议的主题和消息
从上图中可以看出消息的订阅与发布,发小消息要带上主题和消息,MQTT的客户端既可以是消息的发布者也可以是消息的订阅者。

Figure 1‑3 MQTT协议的连接和会话
连接由客户端发起,会建立一个会话,把客户端附加到服务器上。服务器根据连接参数(ClientID,用户名,密码)对客户端进行鉴权和授权。连接的参数(CleanSession)决定此次会话是否支持持久会话(Persistent Session)。

Figure 1‑4 MQTT报文格式
固定报头,2-5个字节,所有报文都包含;可变报头,长度不固定,部分报文才包含;有效负载,长度不固定,部分报文才包含。

Figure 1‑5 报文类型
-## 1.3 MQTT版本
MQTT的协议最新的三个版本是:3.1.1,3.1.0和5.0的协议。
http://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/mqtt-v3r1.html
http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718029
http://docs.oasis-open.org/mqtt/mqtt/v5.0/mqtt-v5.0.html
其中5.0还只是一个提案(2017年7月13日发布的一个草稿版的提案),所以本篇文章暂时不涉及MQTT 5.0的协议,考虑到目前主流和成熟的MQTT协议的应用都是MQTT 3.1.1的协议,所以笔者还是以MQTT 3.1.1的协议为基准,给大家分享,只要这个会了,其他的自然不在话下。需要注意的是,MQTT 3.1.0和3.1.1 规范,还是有小部分的区别的。比如可变头部中,在3.1.0的规范中,其关键字是,“MQISdP”。
1.4 支持MQTT协议消息中间件产品
目前有很多的MQTT消息中间件服务器,如下,都是MQTT协议的服务器端的实现。
- IBM Websphere
- MQ Telemetry
- IBM MessageSight
- Mosquitto
- Eclipse Paho
- emqttd Xively
- m2m.io
- webMethods
- Nirvana Messaging
- RabbitMQ
- Apache ActiveMQ
- Apache Apollo
- Moquette
- HiveMQ
- Mosca
- Litmus Automation Loop
- JoramMQ
- ThingMQ
- VerneMQ
1.5 支持一对多的消息订阅
MQTT协议,支持一对多的基于消息主题(Topic)的消息订阅,也就是一个MQTT的客户端,发送一条特定主题的消息,能同时支持多个订阅者同时订阅。
例如,笔记本和手机都订阅了消息主题为temperature的消息,当温度传感器发布了temperature的消息(当前温度为21 C), 笔记本和手机都会受到温度传感器发布的温度。
1.6 MQTT客户端的语言支持
目前市面上的主流语言都支持。
Java
Javascript
C/C++
Python
Ruby
Objective-C
…
2 MQTT API
MicroPython已经封装好了MQTT客户端的库文件。直接使用即可。
2.1 构造函数
client=mqtt.MQTTClient(client_id, server, port)
导入MQTT库和构建client客户端对象。
参数说明:
- client_id : 客户端ID,具有唯一性;
- server : MQTT服务器地址,可以是IP或者网址;
- port : MQTT服务器端口。(取决于MQTT服务器厂家)
2.2 使用方法
client.connect()
连接到MQTT服务器。
client.publish(topic,message)
发布信息。
- topic : 主题名称;
- message : 信息内容,例:'Hello 01Studio!'
client.subscribe(topic)
订阅;
client.set_callback(callback)
设置回调函数。
callback : 订阅后如果接收到信息,就执行项应名称的回调函数。
client.check_msg()
检查订阅信息。如收到信息就执行设置过的回调函数callback。
由于客户端分为发布者和订阅者角色,因此为了方便大家更好理解,本实验分开两个案例来编程,分别为发布者(publish)和订阅者(subscribe)。再结合MQTT网络调试助手来测试。代表编写流程图如下:
发布者(publish)代码流程:
订阅者(subscribe)代码流程:
3 实例
下面笔者讲解一个具体的实例。CanMV K230作为发布者,发布温度湿度信息,另外一个客户端作为订阅者。
3.1 AHT20温湿度读取
from machine import Pin
from machine import FPIOA, I2C
import time
AHT10_ADDR = 0x38
AHT10_TRIG_MEAS = 0xAC
class AHT10:
def __init__(self, hard_flag=True, i2c_bus=2, sda_pin=12, scl_pin=11, freq=400000):
if hard_flag:
fpioa = FPIOA()
fpioa.set_function(scl_pin, FPIOA.IIC2_SCL)
fpioa.set_function(sda_pin, FPIOA.IIC2_SDA)
self.i2c=I2C(i2c_bus, freq = freq)
print(self.i2c.scan())
else:
self.i2c = I2C(i2c_bus, scl=scl_pin, sda=sda_pin, freq=freq)
print(self.i2c.scan())
def aht10_get_data(self):
self.i2c.writeto(AHT10_ADDR, bytearray([AHT10_TRIG_MEAS])) # AHT10_TRIG_MEAS
timeout = 100
while timeout > 0:
status = self.i2c.readfrom(AHT10_ADDR, True)[0]
if not (status & 0x80): # 检查忙标志
break
time.sleep_ms(10)
timeout -= 1
if timeout <= 0:
raise OSError("AHT10 测量超时")
data = self.i2c.readfrom(AHT10_ADDR, 6)
hum_raw = ((data[1] << 16) | (data[2] << 8) | data[3]) >> 4
temp_raw = ((data[3] & 0x0F) << 16) | (data[4] << 8) | data[5]
humidity = (hum_raw / (2**20)) * 100
temperature = (temp_raw / (2**20)) * 200 - 50
return temperature, humidity
def run(self):
while True:
temp, hum = self.aht10_get_data()
print(f"温度: {temp:.1f}°C, 湿度: {hum:.1f}%")
time.sleep(2) # AHT10最小测量间隔0.5秒
3.2 WiFi联网
import network,time
from machine import Pin
WIFI_SSID = 'Wifi_2G'
PASSWORK = '12345678'
class WIFI:
def __init__(self, state_pin=52, ssid='Wifi_2G', passwork='12345678'):
self.wifi_led = Pin(state_pin, Pin.OUT)
self.wlan = network.WLAN(network.STA_IF)
self.ssid = ssid
self.passwork = passwork
def wifi_status(self):
return self.wlan.isconnected()
def wifi_connect(self):
self.wlan.active(True)
if not self.wifi_status():
print('connecting to network...')
for i in range(3):
self.wlan.connect(self.ssid, self.passwork)
if self.wlan.isconnected():
break
if self.wifi_status():
print('connect success')
self.wifi_led.value(1)
while self.wlan.ifconfig()[0] == '0.0.0.0':
pass
print('network information:', self.wlan.ifconfig())
else:
for i in range(10):
self.wifi_led.value(1)
time.sleep_ms(300)
self.wifi_led.value(0)
time.sleep_ms(300)
3.3 MQTT客户端
import network,time
from machine import Pin,Timer
SERVER = 'mq.tongxinmao.com'
PORT = 18830
CLIENT_ID = 'CanMV-K230'
TOPIC = 'canmv/sensor'
class MQTT:
def __init__(self, client_id, server, port):
self.client = MQTTClient(client_id, server, port)
self.client.connect()
def mqtt_pub(self, topic, message):
self.client.publish(topic, message)
最后主函数代码如下:
if __name__ == '__main__':
wf = WIFI(52, WIFI_SSID, PASSWORK)
wf.wifi_connect()
if wf.wifi_status():
mq = MQTT(CLIENT_ID, SERVER, PORT)
ah = AHT10(True, 2, 12, 11, 400 * 1000)
while True:
time.sleep(5)
temp, hum = ah.aht10_get_data()
message = f"temp: {temp:.1f}C, hum: {hum:.1f}%"
mq.mqtt_pub(TOPIC, message)
3.4 实验现象
接下来是配置MQTT助手,按下图配置。
1、点击网络;
2、点击MQTT;
3、点击启动;
4、输入订阅主题,这里 填发送者的主题 “/public/01studio/1”;
5、点击订阅主题按钮,等待接收信息;
6、订阅成功后左边提示订阅成功。

运行代码,打印信息如下:

可以看到订阅者接收到数据。
