发 帖  
原厂入驻New
LiteOS云端对接教程03-LiteOS基于MQTT对接EMQ-X服务器
2020-2-26 10:15:58  693 单片机 物联网 华为云 小熊派BearPi 嵌入式
分享
本帖最后由 小熊派开源社区 于 2020-2-26 10:17 编辑

1. LiteOS MQTT组件

概述
MQTT AL用来解基于MQTT的业务和MQTT的具体实现,具体来说以后的MQTT业务层应该有且只能使用MQTT AL提供的相关功能(API 数据结构 流程等)。MQTT AL定义MQTT的标准,用来屏蔽各个MQTT协议实现的差异(如软件库 或者硬件),让上层业务无需关心MQTT的实现部分。

MQTT AL的api接口声明在<mqtt_al.h>中,使用相关的接口需要包含该头文件,关于函数的详细参数请参考该头文件的声明。
配置并连接
对接服务器的所有信息保存在结构体mqtt_al_conpara_t中,其定义在mqtt_al.h中,如下:
  1. /** [url=home.php?mod=space&uid=2666770]@Brief[/url] defines the paramter for the mqtt connect */
  2. typedef struct
  3. {
  4.         mqtt_al_string_t               serveraddr;   ///< mqtt server:support domain name and dot format
  5.         int                            serverport;   ///< mqtt server port
  6.         mqtt_al_security_para_t       *security;     ///< IF NULL,will use en_mqtt_security_none
  7.         en_mqtt_al_verison             version;      ///< mqtt version will be used
  8.         mqtt_al_string_t               clientid;     ///< mqtt connect client identifier
  9.         mqtt_al_string_t               user;         ///< mqtt connect user
  10.         mqtt_al_string_t               passwd;       ///< mqtt connect passwd
  11.         int                            cleansession; ///< 1 clean the session while 0 not
  12.         mqtt_al_willmsg_t             *willmsg;      ///< mqtt connect will message
  13.         unsigned short                 keepalivetime;///< keep alive time
  14.         char                           conret;       ///< mqtt connect code, return by server
  15.         int                            timeout;      ///< how much time will be blocked
  16. }mqtt_al_conpara_t;
复制代码
其中的一些参数值已经使用枚举给出:
  • security:安全连接参数(使用此需要确保mbedtls组件开启)
枚举值如下:
  1. /** @brief  this enum all the transport encode we support now*/
  2. typedef enum
  3. {
  4.         en_mqtt_al_security_none = 0,    ///< no encode
  5.         en_mqtt_al_security_psk,         ///< use the psk mode in transport layer
  6.         en_mqtt_al_security_cas,             ///< use the ca mode in transport layer,only check the server
  7.         en_mqtt_al_security_cacs,             ///< use the ca mode in transport layer,both check the server and client
  8.         en_mqtt_al_security_end,         ///< the end for the mqtt
  9. }en_mqtt_al_security_t;
复制代码
  • version:使用的MQTT协议版本
枚举值如下:
  1. /** @brief enum the mqtt version*/
  2. typedef enum
  3. {
  4.         en_mqtt_al_version_3_1_0 = 0,
  5.         en_mqtt_al_version_3_1_1,
  6. }en_mqtt_al_verison;
复制代码
另外,在复制的时候还需要注意,很多字符串参数都是使用mqtt_al_string_t类型,其定义如下:
  1. /** brief defines for all the ascii or data used in the mqtt engine */
  2. typedef struct
  3. {
  4.         char *data;      ///< buffer to storage the data
  5.         int   len;       ///< buffer data length
  6. }mqtt_al_string_t;   //used to represent any type string (maybe not ascii)
复制代码
在配置结构体完成之后,调用配置函数进行配置并连接,API如下:
  1. /**
  2. *@brief: you could use this function to connect to the mqtt server
  3. *
  4. *@param[in] conparam  the parameter we will use in connect, refer to the data mqtt_al_conpara_t
  5. *@
  6. *@return: first you should check the return value then the return code in conparam
  7. *
  8. *@retval NULL which means you could not get the connect to the server,maybe network reason
  9. *@retval handle, which means you get the context, please check the conparam for more
  10. */
  11. void * mqtt_al_connect( mqtt_al_conpara_t *conparam);
复制代码
连接之后,首先应该检查返回的handle指针是否为空,其次应该检查mqtt_al_conpara_t结构体中conret的值,有以下枚举值:
  1. /** @brief defines for the mqtt connect code returned by the server */
  2. #define cn_mqtt_al_con_code_ok                0   ///< has been accepted by the server
  3. #define cn_mqtt_al_con_code_err_version       1   ///< server not support the version
  4. #define cn_mqtt_al_con_code_err_clientID      2   ///< client identifier is error
  5. #define cn_mqtt_al_con_code_err_netrefuse     3   ///< server service not ready yet
  6. #define cn_mqtt_al_con_code_err_u_p           4   ///< bad user name or password
  7. #define cn_mqtt_al_con_code_err_auth          5   ///< the client is not authorized
  8. #define cn_mqtt_al_con_code_err_unkown        -1  ///< unknown reason
  9. #define cn_mqtt_al_con_code_err_network      0x80 ///< network reason,you could try once more
复制代码
订阅消息
EMQ-X服务器有心跳机制,实际应用中订阅之前应该先检查连接状态,本实验中暂不检查。
连接成功后,首先订阅消息,设置回调函数,方便接收下发的命令
订阅消息的API如下:
  1. /**
  2. * @brief you could use this function subscribe a topic from the server
  3. *
  4. * @param[in] handle the handle we get from mqtt_al_connect
  5. *
  6. * @param[in] subpara  refer to the data mqtt_al_subpara_t
  7. *
  8. * @return 0 success  -1  faiLED
  9. *
  10. */
  11. int mqtt_al_subscribe(void *handle, mqtt_al_subpara_t *subpara);
复制代码
两个参数中,handle参数是之前使用mqtt_al_connect时返回的指针,直接传入即可,subpara参数需要重点讲述。
mqtt_al_subpara_t的定义如下:
  1. /** @brief defines the mqtt subscribe parameter*/
  2. typedef struct
  3. {
  4.         mqtt_al_string_t       topic;     ///< topic will be subscribe
  5.         en_mqtt_al_qos_t       qos;       ///< qos requested
  6.         fn_mqtt_al_msg_dealer  dealer;    ///< message dealer:used to deal the received message
  7.         void                  *arg;       ///< used for the message dealer
  8.         char                   subret;    ///< subscribe result code
  9.         int                    timeout;   ///< how much time will be blocked
  10. }mqtt_al_subpara_t;
复制代码
其中订阅消息质量qos的枚举值如下:
  1. /** @brief enum all the qos supported for the application */
  2. typedef enum
  3. {
  4.         en_mqtt_al_qos_0 = 0,     ///< mqtt QOS 0
  5.         en_mqtt_al_qos_1,         ///< mqtt QOS 1
  6.         en_mqtt_al_qos_2,         ///< mqtt QOS 2
  7.         en_mqtt_al_qos_err
  8. }en_mqtt_al_qos_t;
复制代码
dealer是一个函数指针,接收到下发命令之后会被回调,arg是回调函数参数,其定义如下:
  1. /** @brief  defines the mqtt received message dealer, called by mqtt engine*/
  2. typedef void (*fn_mqtt_al_msg_dealer)(void *arg,mqtt_al_msgrcv_t *msg);
复制代码
订阅之后,可以通过mqtt_al_subpara_t结构体中的subret值查看是否订阅成功。

发布消息
发布消息的API如下:
  1. /**
  2. * @brief you could use this function to publish a message to the server
  3. *
  4. * @param[in] handle the handle we get from mqtt_al_connect
  5. *
  6. * @param[in] msg  the message we will publish, see the data mqtt_al_pubpara_t
  7. *
  8. * @return 0 success  -1  failed
  9. *
  10. */
  11. int        mqtt_al_publish(void *handle, mqtt_al_pubpara_t *pubpara);
复制代码
两个参数中,handle参数是之前使用mqtt_al_connect时返回的指针,直接传入即可,pubpara参数需要重点讲述。
mqtt_al_pubpara_t的定义如下:
  1. /** @brief defines for the mqtt publish */
  2. typedef struct
  3. {
  4.         mqtt_al_string_t    topic;    ///< selected publish topic
  5.         mqtt_al_string_t    msg;      ///< message to be published
  6.         en_mqtt_al_qos_t    qos;      ///< message qos
  7.         int                 retain;   ///< message retain :1 retain while 0 not
  8.         int                 timeout;  ///< how much time will blocked
  9. }mqtt_al_pubpara_t;
复制代码
MQTT组件自动初始化
MQTT在配置之后,会自动初始化。
在SDK目录中的IoT_LINK_1.0.0\iot_link\link_main.c文件中可以看到:
2. 配置准备

Makefile配置
因为本次实验用到的组件较多:
  • AT框架
  • ESP8266设备驱动
  • 串口驱动框架
  • cJSON组件
  • SAL组件
  • MQTT组件

这些实验代码全部编译下来,有350KB,而小熊派开发板所使用的主控芯片STM32L431RCT6的 Flash 仅有256KB,会导致编译器无法链接出可执行文件,所以要在makefile中修改优化选项,修改为-OS参数,即最大限度的优化代码尺寸,并去掉-g参数,即代码只能下载运行,无法调试,如图:
ESP8266设备配置
在工程目录中的OS_CONFIG/iot_link_config.h文件中,配置ESP8266设备的波特率和设备名称:
WIFI对接信息配置
SDK:C:\Users\Administrator\.icode\sdk\IoT_LINK_1.0.0(其中Administrator是实验电脑的用户名)。
在SDK目录中的iot_link\network\tcpip\esp8266_socket\esp8266_socket_imp.c文件中,配置连接信息:

之后修改同路径下的esp8266_socket_imp.mk文件,如图,将 TOP_DIR 改为 SDK_DIR :

修改paho_mqtt文件路径
在SDK目录中的iot_link\network\mqtt\paho_mqtt\paho_mqtt.mk文件中,如图,将 TOP_DIR 改为 SDK_DIR :

3. 使用mqtt.fx对接EMQ-X

配置
对接信息配置如下:

其中ClientID随机生成一个即可。
订阅主题
使用mqtt.fx连接客户端,订阅本次实验中的两个主题:
  • 主题led_cmd:用于发布控制命令
  • 主题lightness:用于上报亮度

4. 上云实验

编写实验文件
在 Demo 文件夹下创建cloud_test_demo文件夹,在其中创建emqx_mqtt_demo.c文件。
编写代码:
  1. #include <osal.h>
  2. #include <mqtt_al.h>
  3. #include <string.h>
  4. #define DEFAULT_LIFETIME            60
  5. #define DEFAULT_SERVER_ipv4         "122.51.89.94"
  6. #define DEFAULT_SERVER_PORT         1883
  7. #define CN_MQTT_EP_CLIENTID         "emqx-test-001"
  8. #define CN_MQTT_EP_USERNAME         "mculover666"
  9. #define CN_MQTT_EP_PASSWD           "123456789"
  10. #define CN_MQTT_EP_SUB_TOPIC1       "led_cmd"
  11. #define CN_MQTT_EP_PUB_TOPIC1       "lightness"

  12. #define recv_buf_len 100
  13. static char recv_buffer[recv_buf_len];   //下发数据接收缓冲区
  14. static int  recv_datalen;                //表示接收数据长度

  15. osal_semp_t recv_sync;  //命令接收回调函数和处理函数之间的信号量

  16. char lightness_buf[10];

  17. static void mqtt_al_msg_dealer(void *arg,mqtt_al_msgrcv_t *msg)
  18. {
  19.     if((msg->msg.len) < recv_buf_len)
  20.     {
  21.         //保存数据
  22.         memcpy(recv_buffer,msg->msg.data,msg->msg.len );
  23.         recv_buffer[msg->msg.len] = '\0';
  24.         recv_datalen = msg->msg.len;
  25.         printf("recv buf: %s.\r\n", recv_buffer);
  26.         //释放信号量,交由数据处理线程进行处理
  27.         osal_semp_post(recv_sync);
  28.     }
  29.     else
  30.     {
  31.         printf("recv buf is too small, len = %d.\r\n", msg->msg.len);
  32.     }
  33. }

  34. static int task_recv_cmd_entry(void *args)
  35. {
  36.     while(1)
  37.     {
  38.         /* 阻塞等待信号量 */
  39.         osal_semp_pend(recv_sync,cn_osal_timeout_forever);

  40.         if(strstr(recv_buffer, "on"))
  41.         {
  42.                 printf("-----------------LED ON !!! --------------------\r\n");
  43.         }
  44.         else if(strstr(recv_buffer, "off"))
  45.         {
  46.                 printf("-----------------LED OFF !!! --------------------\r\n");
  47.         }
  48.     }
  49.     return 0;
  50. }

  51. static int task_report_msg_entry(void *args)
  52. {
  53.     int ret = -1;
  54.     void *handle = NULL;

  55.     mqtt_al_conpara_t config;
  56.     mqtt_al_string_t str_temp;
  57.     mqtt_al_subpara_t subpara_led_cmd;

  58.     mqtt_al_pubpara_t pubpara_lightness;

  59.     int lightness_value = 0;


  60.     /* 配置结构体 */
  61.     str_temp.data = DEFAULT_SERVER_IPV4;
  62.     str_temp.len  = sizeof(DEFAULT_SERVER_IPV4);
  63.     config.serveraddr = str_temp;
  64.     config.serverport = DEFAULT_SERVER_PORT;
  65.     config.security   = en_mqtt_al_security_none;
  66.     config.version    = en_mqtt_al_version_3_1_0;
  67.     str_temp.data = CN_MQTT_EP_CLIENTID;
  68.     str_temp.len  = sizeof(CN_MQTT_EP_CLIENTID);
  69.     config.clientid   = str_temp;
  70.     str_temp.data = CN_MQTT_EP_USERNAME;
  71.     str_temp.len  = sizeof(CN_MQTT_EP_USERNAME);
  72.     config.user       = str_temp;
  73.     str_temp.data = CN_MQTT_EP_PASSWD;
  74.     str_temp.len  = sizeof(CN_MQTT_EP_PASSWD);
  75.     config.passwd     = str_temp;
  76.     config.cleansession = 1;
  77.     config.willmsg    = NULL;
  78.     config.keepalivetime = DEFAULT_LIFETIME;
  79.     config.timeout    = 30;

  80.     /* 配置并连接服务器 */
  81.     handle = mqtt_al_connect(&config);
  82.     if(handle == NULL)
  83.     {
  84.         /* 连接出错 */
  85.         printf("config error.\r\n");
  86.         return -1;
  87.     }
  88.     else
  89.     {
  90.         /* 进一步检查服务器返回值 */
  91.         if(config.conret != cn_mqtt_al_con_code_ok)
  92.         {
  93.             /* 服务器返回值出错 */
  94.             printf("server return error, conret = %d.\r\n", config.conret);
  95.             return -1;
  96.         }
  97.         else
  98.         {
  99.             printf("connect to server success.\r\n");
  100.         }
  101.     }


  102.     /* 连接成功后,订阅led_cmd主题消息 */
  103.     str_temp.data = CN_MQTT_EP_SUB_TOPIC1;
  104.     str_temp.len  = sizeof(CN_MQTT_EP_SUB_TOPIC1);
  105.     subpara_led_cmd.topic = str_temp;
  106.     subpara_led_cmd.qos = en_mqtt_al_qos_0;
  107.     subpara_led_cmd.dealer = mqtt_al_msg_dealer;
  108.     subpara_led_cmd.arg = NULL;
  109.     subpara_led_cmd.timeout = 60;
  110.     ret =  mqtt_al_subscribe(handle, &subpara_led_cmd);
  111.     if(ret < 0)
  112.     {
  113.         printf("sub topic %s fail.\r\n", subpara_led_cmd.topic.data);
  114.         return -1;
  115.     }
  116.     else
  117.     {
  118.         /* 进一步判断是否订阅成功 */
  119.         if(cn_mqtt_al_con_code_ok != subpara_led_cmd.subret)
  120.         {
  121.             printf("sub topic %s fail, subret = %d.\r\n", subpara_led_cmd.topic.data, subpara_led_cmd.subret);
  122.             return -1;
  123.         }
  124.         else
  125.         {
  126.             printf("sub topic %s success.\r\n", subpara_led_cmd.topic.data);
  127.         }
  128.     }
  129.    
  130.     /* 每隔10s上报一次数据 */
  131.     str_temp.data = CN_MQTT_EP_PUB_TOPIC1;
  132.     str_temp.len  = sizeof(CN_MQTT_EP_PUB_TOPIC1);
  133.     pubpara_lightness.topic = str_temp;
  134.     pubpara_lightness.qos = en_mqtt_al_qos_0;
  135.     pubpara_lightness.retain = 0;
  136.     pubpara_lightness.timeout = 30;
  137.     while(1)
  138.     {
  139.         sprintf(lightness_buf, "%d", lightness_value);
  140.         str_temp.data = lightness_buf;
  141.         str_temp.len  = strlen(lightness_buf);
  142.         pubpara_lightness.msg = str_temp;

  143.         ret = mqtt_al_publish(handle, &pubpara_lightness);
  144.         if(ret < 0)
  145.         {
  146.             printf("publish topic %s fail.\r\n", pubpara_lightness.topic.data);
  147.             return -1;
  148.         }
  149.         else
  150.         {
  151.             printf("publish topic %s success. payload = %s, lightness = %d.\r\n", pubpara_lightness.topic.data, pubpara_lightness.msg.data, lightness_value);
  152.         }
  153.         lightness_value++;
  154.         osal_task_sleep(10*1000);
  155.     }
  156. }


  157. int standard_app_demo_main()
  158. {
  159.     /* 创建信号量 */
  160.     osal_semp_create(&recv_sync,1,0);

  161.     /* 创建任务 */
  162.     osal_task_create("task_reportmsg",task_report_msg_entry,NULL,0x800,NULL,8);
  163.     osal_task_create("task_recv_cmd",task_recv_cmd_entry,NULL,0x400,NULL,8);

  164.     return 0;
  165. }
复制代码
添加路径
在user_demo.mk中添加如下:
  1.         #example for emqx_mqtt_demo
  2.         ifeq ($(CONFIG_USER_DEMO), "emqx_mqtt_demo")        
  3.                 user_demo_src  = ${wildcard $(TOP_DIR)/targets/STM32L431_BearPi/Demos/cloud_test_demo/emqx_mqtt_demo.c}
  4.         endif
复制代码
添加位置如下:
配置.sdkconfig

特别说明:实验时需要关闭shell组件,否则会因动态内存分配失败而无法连接。


数据上报实验结果
编译下载之后,可以在串口助手中看到输出信息:

在订阅了该主题的客户端也可以看到上报数据:

命令下发实验结果
在mqtt.fx中下发一条开启命令:

可以看到设备后作出回应:

再下发一条关闭命令:

可以看到设备后作出回应:

关注“小熊派开源社区”微信公众号,回复“通信模组”获取更多工具资料




-------------------------------------END--------------------------------------
0
分享淘帖 显示全部楼层

评论

高级模式
您需要登录后才可以回帖 登录 | 注册

声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容图片侵权或者其他问题,请联系本站作侵删。 侵权投诉
发资料
关闭

站长推荐 上一条 /7 下一条

快速回复 返回顶部 返回列表