原料
STM32CubeMX 4.25.0
KEIL5
Stm32F107单片机硬件系统
网线+路由器
MQTT源码
创建STM32F107的Basecode
通过STM32CubeMX配置STM32F107VCTx的demo:
1、配置有串口uart2,用于调试输出,其他串口不使用
2、配置有网口RMII模式
3、使用外部25MHZ晶振,独立看门狗
4、USB–未使用
5、GPIO系统运行灯
6、调试口SW
7、插件:USB/FATFS–未使用
8、插件:FREERTOS
9、插件:LWIP
最后生成代码(附件的ico文件生成出来的代码与提供的源码是有些差别,以源码为准)

MQTT源码获取
源码目录中我们需要用到的只有文件夹:“MQTTPacket”中的源码;
“MQTTPacket/src”是mqtt的库文件
“MQTTPacket/samples”是各种用法的参考文件
“MQTTPacket/samples/transport.c/h”是中间层文件,里面的接口需要我们实现,并在应用中调用;

添加MQTT源码到工程
直接复制MQTTPacket/src下的所有文件和“MQTTPacket/samples/transport.c/h”到工程目录:

在keil工程中添加源文件和头文件的包含路径;
修改mqtt接口文件(transport.c/h)
注:因为命名规范不符合我的编码风格,所以修改了默认的接口命名,但是功能是一样的;
需要实现的功能接口,直接使用lwip的socket接口:
1、创建socket接口:
int MqttOpen(const char* domain , unsigned char * ip, int port)
会将接口设置为读取不阻塞的模式;
2、读取指定socket数据(未使用到):
int MqttBytesRecv(void sck, unsigned char buf, int count)
3、读取默认的socket数据:
int MqttBytesRecv(void sck, unsigned char buf, int count)
4、向指定的socket发送数据:
int MqttSend(int sock, const unsigned char* buf, int buflen)
5、关闭socket:
int MqttClose(int sock)
6、transport.h对头文件声明
建立mqtt的应用程序
源码主要集中在network.c这个文件中:
1、给MQTT建立一个独立的网络任务;
2、实现建立mqtt连接;
3、订阅和发布5个主题;
4、周期性的发布订阅;
5、接收处理订阅消息(输出);
mqtt应用接口分析
static void networkThreadTask(void const * argument)
mqtt主任务,处理服务器的连接和发布、订阅、网络重连等
static int mqttTaskCb(void)
mqtt服务器事件响应的处理
static int connectServerCb(void)
连接服务器,此处通过域名连接到我自己的一个mqtt服务器,没有账号密码以及订阅权限要求,各位可以善意用于测试;
static int mqttSubscribe(char topicIndex)
订阅主题:topicTable
static int mqttPublish(char * topic , const unsigned char * payload , int len)
发布消息
static int mqttPublishCb(void)
周期发布消息的回调
下面是这个文件的源码:
/* Includes ------------------------------------------------------------------*/
#include “main.h”
#include “FreeRTOS.h”
#include “task.h”
#include “cmsis_os.h”
#include “string.h”
#include “stdio.h”
#include “socket.h”
#include “network.h”
#include “MQTTPacket.h”
#include “MQTTConnect.h”
#include “MQTTPublish.h”
#include “transport.h”
#include “lwip/tcp.h”
#include “lwip.h”
#include “lwip/init.h”
#include “lwip/netif.h”
#include “lwip/netdb.h”
#define MAX_TX_DATA 256
#define MAX_RX_DATA 128
#define SERVER_DNS “xushan.lxdream.com”
#define SERVER_PORT 1883//默认的mqtt端口
#define MAX_TOPIC 5
#define MQTT_ID_STR “ID12345”
#define MQTT_USER_STR “xushan”
#define MQTT_PASW_STR “zxcvbnm123”
extern struct netif gnetif;//网口对象
osThreadId networkTaskHandle;
uint32_t networkTaskBuffer[ 512 ];
osStaticThreadDef_t networkTaskControlBlock;
typedef struct{
int connected : 1;//建立连接
int login : 1;//登录连接
int subIndex : 3;//订阅序号
}NET_FLAG_TYPE;
static NET_FLAG_TYPE netFlags = {0};
static int mySocket = 0;
static unsigned char sendBuf[MAX_TX_DATA] = {0};
static unsigned char txBuf[MAX_TX_DATA] = {0};
static unsigned char rxBuf[MAX_RX_DATA] = {0};
static unsigned char *txData = txBuf;
static char *topicTable[MAX_TOPIC] = {
“/public/0”,
“/public/1”,
“/public/2”,
“/public/3”,
“/public/4”
};
//连接
static int connectServerCb(void)
{
static unsigned int tick = 0;
if(netFlags.connected == 0 && HAL_GetTick() - tick 》 3000)
{//未连接
static MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
int len = 0;
tick = HAL_GetTick();
if(mySocket != -1)
MqttClose(mySocket);
mySocket = MqttOpen(SERVER_DNS,NULL,SERVER_PORT);
if(mySocket 》= 0)
{
netFlags.connected = 1;
netFlags.subIndex = 0;
data.clientID.cstring = MQTT_ID_STR;
data.keepAliveInterval = 20;
data.cleansession = 1;
data.username.cstring = MQTT_USER_STR;
data.password.cstring = MQTT_PASW_STR;
len = MQTTSerialize_connect(sendBuf, MAX_TX_DATA, &data);
MqttSend(mySocket, sendBuf, len);
return 0;
}
else
return -1;
}
return -1;
}
//订阅事件
static int mqttSubscribe(char topicIndex)
{
int len = 0;
int req_qos = 0;
static MQTTString topicString[MAX_TOPIC] = {MQTTString_initializer};
if(topicIndex 》 MAX_TOPIC)
return -1;
topicString[topicIndex].cstring = topicTable[topicIndex];
len = MQTTSerialize_subscribe(sendBuf, MAX_TX_DATA, 0, 0, 1 ,&topicString[topicIndex], &req_qos);
if(MqttSend(mySocket, sendBuf, len) != len)
{
printf(“订阅发送失败[%d]n”,topicIndex);
return -1;
}
else
{
printf(“订阅发送成功[%d]n”,topicIndex);
return 0;
}
}
//发布一个订阅
static int mqttPublish(char * topic , const unsigned char * payload , int len)
{
MQTTString topicString = MQTTString_initializer;
topicString.cstring = topic;
len = MQTTSerialize_publish(sendBuf, MAX_TX_DATA, 0, 0, 0, 0, topicString, (unsigned char*)payload, len);
if(MqttSend(mySocket, sendBuf, len) != len)
{
printf(“发布发送失败[%s]n”,topic);
return -1;
}
else
return 0;
}
//mqtt任务事件处理
static int mqttTaskCb(void)
{
if(netFlags.connected)
{
int type = MQTTPacket_read(rxBuf, MAX_RX_DATA, MqttRecv);
if(type == -1)
return -1;
switch (type)
{
case PUBLISH:{//接收到服务器来的发布
unsigned char dup = 0;
int qos = 0;
unsigned char retained = 0;
unsigned short msgid = 0;
int payloadlen = 0;
unsigned char* payload = NULL;
int rc = 0;
MQTTString receivedTopic;
rc = MQTTDeserialize_publish(&dup, &qos, &retained, &msgid,
&receivedTopic,&payload, &payloadlen, rxBuf, MAX_RX_DATA);
if(rc == 1)
{
printf(“---PUBLISH[dup=%d qos=%d retained=%d msgid=%d]n”,
dup,qos,retained,msgid);
printf(“message arrived=%.*sn”, payloadlen, payload);
}
else
printf(“PUBLISH failn”);
}break;
case PUBACK://发布消息的回复
case PUBREC:
case PUBREL:
case PUBCOMP:{//发布消息的回复
printf(“---PUBACK/PUBREC/PUBREL/PUBCOMPn”);
}break;
case SUBACK:{//订阅主题回复
unsigned short submsgid;
int subcount;
int granted_qos;
MQTTDeserialize_suback(&submsgid, 1, &subcount, &granted_qos,rxBuf, MAX_RX_DATA);
printf(“---SUBACK[submsgid=%d subcount=%d granted_qos=%d]n”,
submsgid,subcount,granted_qos);
if (granted_qos != 0)
{
printf(“granted qos != 0, %dn”, granted_qos);
netFlags.connected = 0;
netFlags.login = 0;
return -1;
}
if(++netFlags.subIndex 《 MAX_TOPIC)
mqttSubscribe(netFlags.subIndex);
}break;
case CONNACK:{//连接服务器成功
unsigned char session, connack;
printf(“---CONNACKn”);
if(MQTTDeserialize_connack(&session, &connack, rxBuf, MAX_RX_DATA) != 1 ||
connack != 0)
{
printf(“连接失败:%dn”, connack);
return -1;
}
netFlags.login = 1;
//连接成功后,订阅消息
mqttSubscribe(netFlags.subIndex);
}break;
default:{
printf(“msgType:%un”,type);
}break;
}
return 0;
}
return -1;
}
//发布消息任务
static int mqttPublishCb(void)
{
int res = 0;
static char topicIndex = 0;
static unsigned int tick = 0;
if(HAL_GetTick() - tick 》 5000)
{
int len = 0;
tick = HAL_GetTick();
len = snprintf((char*)txData,MAX_TX_DATA,“%s--%10u”,__DATE__,tick);
res = mqttPublish(topicTable[topicIndex],txData,len);
if(++topicIndex 》= MAX_TOPIC)
topicIndex = 0;
}
return res;
}
static void networkThreadTask(void const * argument)
{
unsigned int tick = 0;
MX_LWIP_Init();
memset(&netFlags,0,sizeof(netFlags));
for(;;)
{
if(connectServerCb() == 0)
tick = HAL_GetTick();
else if(mqttTaskCb() == 0)
tick = HAL_GetTick();
else
{
if(HAL_GetTick() - tick 》 30000)
{
tick = HAL_GetTick();
memset(&netFlags,0,sizeof(netFlags));
}
}
if(netFlags.login && netFlags.subIndex 》= MAX_TOPIC)
mqttPublishCb();
osDelay(10);
}
}
void InitNetWorkTask(void)
{
osThreadStaticDef(networkTask, networkThreadTask, osPriorityNormal, 0, ARRAY_SIZE(networkTaskBuffer), networkTaskBuffer, &networkTaskControlBlock);
networkTaskHandle = osThreadCreate(osThread(networkTask), NULL);
}
疑点
1、在订阅主题时mqttSubscribe,主题对象使用了静态的对象 static MQTTString topicString[MAX_TOPIC] = {MQTTString_initializer};
不知道是否必须为静态全局,是否被释放;
2、同样问题,在创建mqtt连接时,使用静态对象 static MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
不知道是否必须,是否可以被释放。
原料
STM32CubeMX 4.25.0
KEIL5
Stm32F107单片机硬件系统
网线+路由器
MQTT源码
创建STM32F107的Basecode
通过STM32CubeMX配置STM32F107VCTx的demo:
1、配置有串口uart2,用于调试输出,其他串口不使用
2、配置有网口RMII模式
3、使用外部25MHZ晶振,独立看门狗
4、USB–未使用
5、GPIO系统运行灯
6、调试口SW
7、插件:USB/FATFS–未使用
8、插件:FREERTOS
9、插件:LWIP
最后生成代码(附件的ico文件生成出来的代码与提供的源码是有些差别,以源码为准)

MQTT源码获取
源码目录中我们需要用到的只有文件夹:“MQTTPacket”中的源码;
“MQTTPacket/src”是mqtt的库文件
“MQTTPacket/samples”是各种用法的参考文件
“MQTTPacket/samples/transport.c/h”是中间层文件,里面的接口需要我们实现,并在应用中调用;

添加MQTT源码到工程
直接复制MQTTPacket/src下的所有文件和“MQTTPacket/samples/transport.c/h”到工程目录:

在keil工程中添加源文件和头文件的包含路径;
修改mqtt接口文件(transport.c/h)
注:因为命名规范不符合我的编码风格,所以修改了默认的接口命名,但是功能是一样的;
需要实现的功能接口,直接使用lwip的socket接口:
1、创建socket接口:
int MqttOpen(const char* domain , unsigned char * ip, int port)
会将接口设置为读取不阻塞的模式;
2、读取指定socket数据(未使用到):
int MqttBytesRecv(void sck, unsigned char buf, int count)
3、读取默认的socket数据:
int MqttBytesRecv(void sck, unsigned char buf, int count)
4、向指定的socket发送数据:
int MqttSend(int sock, const unsigned char* buf, int buflen)
5、关闭socket:
int MqttClose(int sock)
6、transport.h对头文件声明
建立mqtt的应用程序
源码主要集中在network.c这个文件中:
1、给MQTT建立一个独立的网络任务;
2、实现建立mqtt连接;
3、订阅和发布5个主题;
4、周期性的发布订阅;
5、接收处理订阅消息(输出);
mqtt应用接口分析
static void networkThreadTask(void const * argument)
mqtt主任务,处理服务器的连接和发布、订阅、网络重连等
static int mqttTaskCb(void)
mqtt服务器事件响应的处理
static int connectServerCb(void)
连接服务器,此处通过域名连接到我自己的一个mqtt服务器,没有账号密码以及订阅权限要求,各位可以善意用于测试;
static int mqttSubscribe(char topicIndex)
订阅主题:topicTable
static int mqttPublish(char * topic , const unsigned char * payload , int len)
发布消息
static int mqttPublishCb(void)
周期发布消息的回调
下面是这个文件的源码:
/* Includes ------------------------------------------------------------------*/
#include “main.h”
#include “FreeRTOS.h”
#include “task.h”
#include “cmsis_os.h”
#include “string.h”
#include “stdio.h”
#include “socket.h”
#include “network.h”
#include “MQTTPacket.h”
#include “MQTTConnect.h”
#include “MQTTPublish.h”
#include “transport.h”
#include “lwip/tcp.h”
#include “lwip.h”
#include “lwip/init.h”
#include “lwip/netif.h”
#include “lwip/netdb.h”
#define MAX_TX_DATA 256
#define MAX_RX_DATA 128
#define SERVER_DNS “xushan.lxdream.com”
#define SERVER_PORT 1883//默认的mqtt端口
#define MAX_TOPIC 5
#define MQTT_ID_STR “ID12345”
#define MQTT_USER_STR “xushan”
#define MQTT_PASW_STR “zxcvbnm123”
extern struct netif gnetif;//网口对象
osThreadId networkTaskHandle;
uint32_t networkTaskBuffer[ 512 ];
osStaticThreadDef_t networkTaskControlBlock;
typedef struct{
int connected : 1;//建立连接
int login : 1;//登录连接
int subIndex : 3;//订阅序号
}NET_FLAG_TYPE;
static NET_FLAG_TYPE netFlags = {0};
static int mySocket = 0;
static unsigned char sendBuf[MAX_TX_DATA] = {0};
static unsigned char txBuf[MAX_TX_DATA] = {0};
static unsigned char rxBuf[MAX_RX_DATA] = {0};
static unsigned char *txData = txBuf;
static char *topicTable[MAX_TOPIC] = {
“/public/0”,
“/public/1”,
“/public/2”,
“/public/3”,
“/public/4”
};
//连接
static int connectServerCb(void)
{
static unsigned int tick = 0;
if(netFlags.connected == 0 && HAL_GetTick() - tick 》 3000)
{//未连接
static MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
int len = 0;
tick = HAL_GetTick();
if(mySocket != -1)
MqttClose(mySocket);
mySocket = MqttOpen(SERVER_DNS,NULL,SERVER_PORT);
if(mySocket 》= 0)
{
netFlags.connected = 1;
netFlags.subIndex = 0;
data.clientID.cstring = MQTT_ID_STR;
data.keepAliveInterval = 20;
data.cleansession = 1;
data.username.cstring = MQTT_USER_STR;
data.password.cstring = MQTT_PASW_STR;
len = MQTTSerialize_connect(sendBuf, MAX_TX_DATA, &data);
MqttSend(mySocket, sendBuf, len);
return 0;
}
else
return -1;
}
return -1;
}
//订阅事件
static int mqttSubscribe(char topicIndex)
{
int len = 0;
int req_qos = 0;
static MQTTString topicString[MAX_TOPIC] = {MQTTString_initializer};
if(topicIndex 》 MAX_TOPIC)
return -1;
topicString[topicIndex].cstring = topicTable[topicIndex];
len = MQTTSerialize_subscribe(sendBuf, MAX_TX_DATA, 0, 0, 1 ,&topicString[topicIndex], &req_qos);
if(MqttSend(mySocket, sendBuf, len) != len)
{
printf(“订阅发送失败[%d]n”,topicIndex);
return -1;
}
else
{
printf(“订阅发送成功[%d]n”,topicIndex);
return 0;
}
}
//发布一个订阅
static int mqttPublish(char * topic , const unsigned char * payload , int len)
{
MQTTString topicString = MQTTString_initializer;
topicString.cstring = topic;
len = MQTTSerialize_publish(sendBuf, MAX_TX_DATA, 0, 0, 0, 0, topicString, (unsigned char*)payload, len);
if(MqttSend(mySocket, sendBuf, len) != len)
{
printf(“发布发送失败[%s]n”,topic);
return -1;
}
else
return 0;
}
//mqtt任务事件处理
static int mqttTaskCb(void)
{
if(netFlags.connected)
{
int type = MQTTPacket_read(rxBuf, MAX_RX_DATA, MqttRecv);
if(type == -1)
return -1;
switch (type)
{
case PUBLISH:{//接收到服务器来的发布
unsigned char dup = 0;
int qos = 0;
unsigned char retained = 0;
unsigned short msgid = 0;
int payloadlen = 0;
unsigned char* payload = NULL;
int rc = 0;
MQTTString receivedTopic;
rc = MQTTDeserialize_publish(&dup, &qos, &retained, &msgid,
&receivedTopic,&payload, &payloadlen, rxBuf, MAX_RX_DATA);
if(rc == 1)
{
printf(“---PUBLISH[dup=%d qos=%d retained=%d msgid=%d]n”,
dup,qos,retained,msgid);
printf(“message arrived=%.*sn”, payloadlen, payload);
}
else
printf(“PUBLISH failn”);
}break;
case PUBACK://发布消息的回复
case PUBREC:
case PUBREL:
case PUBCOMP:{//发布消息的回复
printf(“---PUBACK/PUBREC/PUBREL/PUBCOMPn”);
}break;
case SUBACK:{//订阅主题回复
unsigned short submsgid;
int subcount;
int granted_qos;
MQTTDeserialize_suback(&submsgid, 1, &subcount, &granted_qos,rxBuf, MAX_RX_DATA);
printf(“---SUBACK[submsgid=%d subcount=%d granted_qos=%d]n”,
submsgid,subcount,granted_qos);
if (granted_qos != 0)
{
printf(“granted qos != 0, %dn”, granted_qos);
netFlags.connected = 0;
netFlags.login = 0;
return -1;
}
if(++netFlags.subIndex 《 MAX_TOPIC)
mqttSubscribe(netFlags.subIndex);
}break;
case CONNACK:{//连接服务器成功
unsigned char session, connack;
printf(“---CONNACKn”);
if(MQTTDeserialize_connack(&session, &connack, rxBuf, MAX_RX_DATA) != 1 ||
connack != 0)
{
printf(“连接失败:%dn”, connack);
return -1;
}
netFlags.login = 1;
//连接成功后,订阅消息
mqttSubscribe(netFlags.subIndex);
}break;
default:{
printf(“msgType:%un”,type);
}break;
}
return 0;
}
return -1;
}
//发布消息任务
static int mqttPublishCb(void)
{
int res = 0;
static char topicIndex = 0;
static unsigned int tick = 0;
if(HAL_GetTick() - tick 》 5000)
{
int len = 0;
tick = HAL_GetTick();
len = snprintf((char*)txData,MAX_TX_DATA,“%s--%10u”,__DATE__,tick);
res = mqttPublish(topicTable[topicIndex],txData,len);
if(++topicIndex 》= MAX_TOPIC)
topicIndex = 0;
}
return res;
}
static void networkThreadTask(void const * argument)
{
unsigned int tick = 0;
MX_LWIP_Init();
memset(&netFlags,0,sizeof(netFlags));
for(;;)
{
if(connectServerCb() == 0)
tick = HAL_GetTick();
else if(mqttTaskCb() == 0)
tick = HAL_GetTick();
else
{
if(HAL_GetTick() - tick 》 30000)
{
tick = HAL_GetTick();
memset(&netFlags,0,sizeof(netFlags));
}
}
if(netFlags.login && netFlags.subIndex 》= MAX_TOPIC)
mqttPublishCb();
osDelay(10);
}
}
void InitNetWorkTask(void)
{
osThreadStaticDef(networkTask, networkThreadTask, osPriorityNormal, 0, ARRAY_SIZE(networkTaskBuffer), networkTaskBuffer, &networkTaskControlBlock);
networkTaskHandle = osThreadCreate(osThread(networkTask), NULL);
}
疑点
1、在订阅主题时mqttSubscribe,主题对象使用了静态的对象 static MQTTString topicString[MAX_TOPIC] = {MQTTString_initializer};
不知道是否必须为静态全局,是否被释放;
2、同样问题,在创建mqtt连接时,使用静态对象 static MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
不知道是否必须,是否可以被释放。
举报