1 MQTT简介
1.1 MQTT概念
MQTT (Message Queue Telemetry Transport),翻译成中文就是,遥测传输协议,其主要提供了订阅/发布两种消息模式,更为简约、轻量,易于使用,特别适合于受限环境(带宽低、网络延迟高、网络通信不稳定)的消息分发,属于物联网(Internet of Thing)的一个标准传输协议。
设计思想是开放、简单、轻量、易于实现。这些特点使它适用于受限环境。例如,但不仅限于此:
- 特别适合于网络代价昂贵,带宽低、不可靠的环境。
- 能在处理器和内存资源有限的嵌入式设备中运行。
- 使用发布/订阅消息模式,提供一对多的消息发布,从而解除应用程序耦合。
- 使用 TCP/IP 提供网络连接。
- 提供Last Will 和 Testament 特性通知有关各方客户端异常中断的机制。
对消息中间件,估计大家不得不关心的就是消息的可靠性,也就是消息的发布服务质量,可喜的是,MQTT支持三种消息发布服务质量(QoS):
- “至多一次”(QoS==0),消息发布完全依赖底层 TCP/IP 网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。
- “至少一次”(QoS==1),确保消息到达,但消息重复可能会发生。
- “只有一次”(QoS==2),确保消息到达一次。这一级别可用于如下情况,在计费系统中,消息重复或丢失会导致不正确的结果。小型传输,开销很小(固定长度的头部是 2 字节),协议交换最小化,以降低网络流量。
1.2 MQTT协议的通信模型
下图是MQTT协议的通信模型。

Figure ‑ MQTT协议的通信模型
【基本概念】
Topic:Topic是UTF-8字符串,是发布/订阅(Pub/Sub)模型中消息的中介,可以向Topic发布或者订阅消息。
Topic类:同一产品下不同设备的Topic集合,用\{productkey\}和{deviceName}通配一个唯一的设备,一个Topic类对一个ProductKey下所有设备通用。
发布(Pub):操作Topic的权限类型,具有往Topic中发布消息的权限。
订阅(Sub):操作Topic的权限类型,具有从Topic中订阅消息的权限。

Figure ‑ MQTT协议的主题和消息
从上图中可以看出消息的订阅与发布,发小消息要带上主题和消息,MQTT的客户端既可以是消息的发布者也可以是消息的订阅者。

Figure ‑ MQTT协议的连接和会话
连接由客户端发起,会建立一个会话,把客户端附加到服务器上。服务器根据连接参数(ClientID,用户名,密码)对客户端进行鉴权和授权。连接的参数(CleanSession)决定此次会话是否支持持久会话(Persistent Session)。
2 MQTT服务端安装
2.1 安装Mosquitto相关的软件
# 安装 MQTT 代理
$ sudo apt install mosquitto mosquitto-clients
2.2 修改 Mosquitto 配置
要修改 Mosquitto 的监听地址和端口,需要编辑其配置文件。以下是详细步骤:
1、找到并编辑 Mosquitto 配置文件
配置文件位置
- Ubuntu/Debian: /etc/mosquitto/mosquitto.conf
- Windows: 安装目录下的 mosquitto.conf (如 C:\Program Files\Mosquitto\mosquitto.conf)
- macOS (Homebrew): /usr/local/etc/mosquitto/mosquitto.conf
编辑配置文件
sudo vi /etc/mosquitto/mosquitto.conf
2、修改监听设置
在配置文件中添加或修改以下内容:
# 监听所有网络接口的默认端口 1883
listener 1883 0.0.0.0
# pid_file /var/run/mosquitto.pid
# persistence true
# 防止重复消息
persistence false
allow_duplicate_messages false
persistence_location /var/lib/mosquitto/
log_dest file /var/log/mosquitto/mosquitto.log
include_dir /etc/mosquitto/conf.d
# 需要身份验证 (生产环境)
password_file /etc/mosquitto/passwd
allow_anonymous false
3、配置防火墙
# 开放 MQTT 端口 (1883)
sudo ufw allow 1883/tcp
# 如果使用SSL/TLS
sudo ufw allow 8883/tcp
4、重启 Mosquitto 服务
sudo systemctl restart mosquitto
5、验证配置
检查服务状态:
sudo systemctl status mosquitto
检查监听端口:

sudo netstat -tuln | grep 1883
6、创建用户 (如果需要身份验证)
# 创建密码文件
sudo touch /etc/mosquitto/passwd
sudo chmod 777 /etc/mosquitto/passwd
添加用户
sudo mosquitto_passwd -b /etc/mosquitto/passwd your_username your_password
在配置文件/etc/mosquitto/mosquitto.conf中启用认证
password_file /etc/mosquitto/passwd
allow_anonymous false
# 重启服务
sudo systemctl restart mosquitto
7、测试连接
使用命令行工具测试
# 订阅测试
$ mosquitto_sub -h your_server_ip -t "test" -u your_username -P your_password
# 发布测试 (在另一个终端)
$ mosquitto_pub -h your_server_ip -t "test" -m "Hello MQTT" -u your_username -P your_password
3 Paho-MQTT客户端移植
3.1 获取 MQTTClient 源码
可以从 GitHub 克隆 Paho MQTT C 客户端库,它包含 MQTTClient 实现:
$ git clone成都
$ cd paho.mqtt.c
$ git checkout v1.3.8 # 使用稳定版本
3.2 编译安装
配置和编译。
$ mkdir build
$ cd build
$ cmake -DPAHO_WITH_SSL=OFF -DPAHO_BUILD_STATIC=ON \
-DCMAKE_SYSTEM_NAME=Linux \
-DCMAKE_SYSTEM_PROCESSOR=arm64 \
-DCMAKE_C_COMPILER=aarch64-linux-gnu-gcc \
-DPAHO_BUILD_SHARED=OFF -DPAHO_BUILD_SAMPLES=ON \
-DCMAKE_INSTALL_PREFIX=./install ..
make
make
make install
将编译生成的库文件和示例程序复制到 开发板:
$ scp -r install/ root@192.168.101.179:/usr/
3.3 简单使用示例
发布者示例 (publisher.c)
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "MQTTClient.h"
#define ADDRESS "tcp://192.168.101.100:1883"
#define CLIENTID "Publisher"
#define TOPIC "test"
#define QOS 1
#define TIMEOUT 10000L
#define USERNAME "admin"
#define PASSWORD "password"
int main(int argc, char* argv[])
{
MQTTClient client;
MQTTClient_create(&client, ADDRESS, CLIENTID,
MQTTCLIENT_PERSISTENCE_NONE, NULL);
MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
conn_opts.keepAliveInterval = 20;
conn_opts.cleansession = 1;
conn_opts.username = USERNAME;
conn_opts.password = PASSWORD;
int rc;
if ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS) {
printf("Failed to connect, return code %d\n", rc);
printf("Error: %s\n", MQTTClient_strerror(rc));
return -1;
}
const char* message = " Hello, this is RK3568 Publisher";
MQTTClient_deliveryToken token;
rc = MQTTClient_publish(client, TOPIC, strlen(message),
message, QOS, 0, &token);
if (rc != MQTTCLIENT_SUCCESS)
{
printf("Failed to publish message, return code %d\n", rc);
}
else
{
printf("Message published successfully, delivery token %d\n", token);
MQTTClient_waitForCompletion(client, token, TIMEOUT);
}
MQTTClient_disconnect(client, TIMEOUT);
MQTTClient_destroy(&client);
return rc;
}
订阅者示例 (subscriber.c)
#include <stdlib.h>
#include <string.h>
#include "MQTTClient.h"
#define ADDRESS "tcp://mqtt.eclipseprojects.io:1883"
#define CLIENTID "MilkvDuoSsubscriber "
#define TOPIC "test"
#define QOS 1
#define TIMEOUT 10000L
#define USERNAME "admin"
#define PASSWORD "password"
volatile MQTTClient_deliveryToken deliveredtoken;
void delivered(void *context, MQTTClient_deliveryToken dt) {
printf("Message with token value %d delivery confirmed\n", dt);
deliveredtoken = dt;
}
int msgarrvd(void *context, char *topicName, int topicLen, MQTTClient_message *message) {
printf("Message arrived\n");
printf(" topic: %s\n", topicName);
printf(" message: %.*s\n", message->payloadlen, (char*)message->payload);
MQTTClient_free(topicName);
MQTTClient_freeMessage(&message);
return 1;
}
void connlost(void *context, char *cause) {
printf("\nConnection lost\n");
printf(" cause: %s\n", cause);
}
int main(int argc, char* argv[]) {
MQTTClient client;
MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
int rc;
int ch;
if ((rc = MQTTClient_create(&client, ADDRESS, CLIENTID,
MQTTCLIENT_PERSISTENCE_NONE, NULL)) != MQTTCLIENT_SUCCESS) {
printf("Failed to create client, return code %d\n", rc);
exit(EXIT_FAILURE);
}
MQTTClient_setCallbacks(client, NULL, connlost, msgarrvd, delivered);
conn_opts.keepAliveInterval = 20;
conn_opts.cleansession = 1;
conn_opts.username = USERNAME;
conn_opts.password = PASSWORD;
if ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS) {
printf("Failed to connect, return code %d\n", rc);
MQTTClient_destroy(&client);
exit(EXIT_FAILURE);
}
printf("Subscribing to topic %s\nfor client %s using QoS%d\n\n", TOPIC, CLIENTID, QOS);
if ((rc = MQTTClient_subscribe(client, TOPIC, QOS)) != MQTTCLIENT_SUCCESS) {
printf("Failed to subscribe, return code %d\n", rc);
MQTTClient_disconnect(client, 10000);
MQTTClient_destroy(&client);
exit(EXIT_FAILURE);
}
printf("Press Q<Enter> to quit\n\n");
do {
ch = getchar();
} while(ch!='Q' && ch != 'q');
MQTTClient_unsubscribe(client, TOPIC);
MQTTClient_disconnect(client, 10000);
MQTTClient_destroy(&client);
return EXIT_SUCCESS;
}
3.4 实验现象
订阅者订阅信息:

开发者发布消息:
