STM32
直播中

贾埃罗

8年用户 1799经验值
私信 关注
[问答]

STM32F107是怎样通过LWIP实现MQTT发布和订阅框架的呢

怎样通过STM32CubeMX配置STM32F107VCTx的demo呢?
STM32F107是怎样通过LWIP实现MQTT发布和订阅框架的呢?



回帖(1)

张磊

2021-10-27 10:25:24
  原料
  STM32CubeMX 4.25.0
  KEIL5
  Stm32F107单片机硬件系统
  网线+路由器
  MQTT源码
  创建STM32F107的Basecode
  通过STM32CubeMX配置STM32F107VCTx的demo:
  1、配置有串口uart2,用于调试输出,其他串口不使用
  2、配置有网口RMII模式
  3、使用外部25MHZ晶振,独立看门狗
  4、USB–未使用
  5、GPIO系统运行灯
  6、调试口SW
  7、插件:USB/FATFS–未使用
  8、插件:FREERTOS
  9、插件:LWIP
  最后生成代码(附件的ico文件生成出来的代码与提供的源码是有些差别,以源码为准)
  
  MQTT源码获取
  源码目录中我们需要用到的只有文件夹:“MQTTPacket”中的源码;
  “MQTTPacket/src”是mqtt的库文件
  “MQTTPacket/samples”是各种用法的参考文件
  “MQTTPacket/samples/transport.c/h”是中间层文件,里面的接口需要我们实现,并在应用中调用;
  
  
  添加MQTT源码到工程
  直接复制MQTTPacket/src下的所有文件和“MQTTPacket/samples/transport.c/h”到工程目录:
  
  在keil工程中添加源文件和头文件的包含路径;
  修改mqtt接口文件(transport.c/h)
  注:因为命名规范不符合我的编码风格,所以修改了默认的接口命名,但是功能是一样的;
  需要实现的功能接口,直接使用lwip的socket接口:
  1、创建socket接口:
  int MqttOpen(const char* domain , unsigned char * ip, int port)
  会将接口设置为读取不阻塞的模式;
  2、读取指定socket数据(未使用到):
  int MqttBytesRecv(void sck, unsigned char buf, int count)
  3、读取默认的socket数据:
  int MqttBytesRecv(void sck, unsigned char buf, int count)
  4、向指定的socket发送数据:
  int MqttSend(int sock, const unsigned char* buf, int buflen)
  5、关闭socket:
  int MqttClose(int sock)
  6、transport.h对头文件声明
  建立mqtt的应用程序
  源码主要集中在network.c这个文件中:
  1、给MQTT建立一个独立的网络任务;
  2、实现建立mqtt连接;
  3、订阅和发布5个主题;
  4、周期性的发布订阅;
  5、接收处理订阅消息(输出);
  mqtt应用接口分析
  static void networkThreadTask(void const * argument)
  mqtt主任务,处理服务器的连接和发布、订阅、网络重连等
  static int mqttTaskCb(void)
  mqtt服务器事件响应的处理
  static int connectServerCb(void)
  连接服务器,此处通过域名连接到我自己的一个mqtt服务器,没有账号密码以及订阅权限要求,各位可以善意用于测试;
  static int mqttSubscribe(char topicIndex)
  订阅主题:topicTable
  static int mqttPublish(char * topic , const unsigned char * payload , int len)
  发布消息
  static int mqttPublishCb(void)
  周期发布消息的回调
  下面是这个文件的源码:
  /* Includes ------------------------------------------------------------------*/
  #include “main.h”
  #include “FreeRTOS.h”
  #include “task.h”
  #include “cmsis_os.h”
  #include “string.h”
  #include “stdio.h”
  #include “socket.h”
  #include “network.h”
  #include “MQTTPacket.h”
  #include “MQTTConnect.h”
  #include “MQTTPublish.h”
  #include “transport.h”
  #include “lwip/tcp.h”
  #include “lwip.h”
  #include “lwip/init.h”
  #include “lwip/netif.h”
  #include “lwip/netdb.h”
  #define MAX_TX_DATA 256
  #define MAX_RX_DATA 128
  #define SERVER_DNS “xushan.lxdream.com”
  #define SERVER_PORT 1883//默认的mqtt端口
  #define MAX_TOPIC 5
  #define MQTT_ID_STR “ID12345”
  #define MQTT_USER_STR “xushan”
  #define MQTT_PASW_STR “zxcvbnm123”
  extern struct netif gnetif;//网口对象
  osThreadId networkTaskHandle;
  uint32_t networkTaskBuffer[ 512 ];
  osStaticThreadDef_t networkTaskControlBlock;
  typedef struct{
  int connected : 1;//建立连接
  int login : 1;//登录连接
  int subIndex : 3;//订阅序号
  }NET_FLAG_TYPE;
  static NET_FLAG_TYPE netFlags = {0};
  static int mySocket = 0;
  static unsigned char sendBuf[MAX_TX_DATA] = {0};
  static unsigned char txBuf[MAX_TX_DATA] = {0};
  static unsigned char rxBuf[MAX_RX_DATA] = {0};
  static unsigned char *txData = txBuf;
  static char *topicTable[MAX_TOPIC] = {
  “/public/0”,
  “/public/1”,
  “/public/2”,
  “/public/3”,
  “/public/4”
  };
  //连接
  static int connectServerCb(void)
  {
  static unsigned int tick = 0;
  if(netFlags.connected == 0 && HAL_GetTick() - tick 》 3000)
  {//未连接
  static MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
  int len = 0;
  tick = HAL_GetTick();
  if(mySocket != -1)
  MqttClose(mySocket);
  mySocket = MqttOpen(SERVER_DNS,NULL,SERVER_PORT);
  if(mySocket 》= 0)
  {
  netFlags.connected = 1;
  netFlags.subIndex = 0;
  data.clientID.cstring = MQTT_ID_STR;
  data.keepAliveInterval = 20;
  data.cleansession = 1;
  data.username.cstring = MQTT_USER_STR;
  data.password.cstring = MQTT_PASW_STR;
  len = MQTTSerialize_connect(sendBuf, MAX_TX_DATA, &data);
  MqttSend(mySocket, sendBuf, len);
  return 0;
  }
  else
  return -1;
  }
  return -1;
  }
  //订阅事件
  static int mqttSubscribe(char topicIndex)
  {
  int len = 0;
  int req_qos = 0;
  static MQTTString topicString[MAX_TOPIC] = {MQTTString_initializer};
  if(topicIndex 》 MAX_TOPIC)
  return -1;
  topicString[topicIndex].cstring = topicTable[topicIndex];
  len = MQTTSerialize_subscribe(sendBuf, MAX_TX_DATA, 0, 0, 1 ,&topicString[topicIndex], &req_qos);
  if(MqttSend(mySocket, sendBuf, len) != len)
  {
  printf(“订阅发送失败[%d]n”,topicIndex);
  return -1;
  }
  else
  {
  printf(“订阅发送成功[%d]n”,topicIndex);
  return 0;
  }
  }
  //发布一个订阅
  static int mqttPublish(char * topic , const unsigned char * payload , int len)
  {
  MQTTString topicString = MQTTString_initializer;
  topicString.cstring = topic;
  len = MQTTSerialize_publish(sendBuf, MAX_TX_DATA, 0, 0, 0, 0, topicString, (unsigned char*)payload, len);
  if(MqttSend(mySocket, sendBuf, len) != len)
  {
  printf(“发布发送失败[%s]n”,topic);
  return -1;
  }
  else
  return 0;
  }
  //mqtt任务事件处理
  static int mqttTaskCb(void)
  {
  if(netFlags.connected)
  {
  int type = MQTTPacket_read(rxBuf, MAX_RX_DATA, MqttRecv);
  if(type == -1)
  return -1;
  switch (type)
  {
  case PUBLISH:{//接收到服务器来的发布
  unsigned char dup = 0;
  int qos = 0;
  unsigned char retained = 0;
  unsigned short msgid = 0;
  int payloadlen = 0;
  unsigned char* payload = NULL;
  int rc = 0;
  MQTTString receivedTopic;
  rc = MQTTDeserialize_publish(&dup, &qos, &retained, &msgid,
  &receivedTopic,&payload, &payloadlen, rxBuf, MAX_RX_DATA);
  if(rc == 1)
  {
  printf(“---PUBLISH[dup=%d qos=%d retained=%d msgid=%d]n”,
  dup,qos,retained,msgid);
  printf(“message arrived=%.*sn”, payloadlen, payload);
  }
  else
  printf(“PUBLISH failn”);
  }break;
  case PUBACK://发布消息的回复
  case PUBREC:
  case PUBREL:
  case PUBCOMP:{//发布消息的回复
  printf(“---PUBACK/PUBREC/PUBREL/PUBCOMPn”);
  }break;
  case SUBACK:{//订阅主题回复
  unsigned short submsgid;
  int subcount;
  int granted_qos;
  MQTTDeserialize_suback(&submsgid, 1, &subcount, &granted_qos,rxBuf, MAX_RX_DATA);
  printf(“---SUBACK[submsgid=%d subcount=%d granted_qos=%d]n”,
  submsgid,subcount,granted_qos);
  if (granted_qos != 0)
  {
  printf(“granted qos != 0, %dn”, granted_qos);
  netFlags.connected = 0;
  netFlags.login = 0;
  return -1;
  }
  if(++netFlags.subIndex 《 MAX_TOPIC)
  mqttSubscribe(netFlags.subIndex);
  }break;
  case CONNACK:{//连接服务器成功
  unsigned char session, connack;
  printf(“---CONNACKn”);
  if(MQTTDeserialize_connack(&session, &connack, rxBuf, MAX_RX_DATA) != 1 ||
  connack != 0)
  {
  printf(“连接失败:%dn”, connack);
  return -1;
  }
  netFlags.login = 1;
  //连接成功后,订阅消息
  mqttSubscribe(netFlags.subIndex);
  }break;
  default:{
  printf(“msgType:%un”,type);
  }break;
  }
  return 0;
  }
  return -1;
  }
  //发布消息任务
  static int mqttPublishCb(void)
  {
  int res = 0;
  static char topicIndex = 0;
  static unsigned int tick = 0;
  if(HAL_GetTick() - tick 》 5000)
  {
  int len = 0;
  tick = HAL_GetTick();
  len = snprintf((char*)txData,MAX_TX_DATA,“%s--%10u”,__DATE__,tick);
  res = mqttPublish(topicTable[topicIndex],txData,len);
  if(++topicIndex 》= MAX_TOPIC)
  topicIndex = 0;
  }
  return res;
  }
  static void networkThreadTask(void const * argument)
  {
  unsigned int tick = 0;
  MX_LWIP_Init();
  memset(&netFlags,0,sizeof(netFlags));
  for(;;)
  {
  if(connectServerCb() == 0)
  tick = HAL_GetTick();
  else if(mqttTaskCb() == 0)
  tick = HAL_GetTick();
  else
  {
  if(HAL_GetTick() - tick 》 30000)
  {
  tick = HAL_GetTick();
  memset(&netFlags,0,sizeof(netFlags));
  }
  }
  if(netFlags.login && netFlags.subIndex 》= MAX_TOPIC)
  mqttPublishCb();
  osDelay(10);
  }
  }
  void InitNetWorkTask(void)
  {
  osThreadStaticDef(networkTask, networkThreadTask, osPriorityNormal, 0, ARRAY_SIZE(networkTaskBuffer), networkTaskBuffer, &networkTaskControlBlock);
  networkTaskHandle = osThreadCreate(osThread(networkTask), NULL);
  }
  疑点
  1、在订阅主题时mqttSubscribe,主题对象使用了静态的对象 static MQTTString topicString[MAX_TOPIC] = {MQTTString_initializer};
  不知道是否必须为静态全局,是否被释放;
  2、同样问题,在创建mqtt连接时,使用静态对象 static MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
  不知道是否必须,是否可以被释放。
举报

更多回帖

发帖
×
20
完善资料,
赚取积分