本帖最后由 小熊派开源社区 于 2020-2-26 10:17 编辑
1. LiteOS MQTT组件
概述MQTT AL用来解耦基于MQTT的业务和MQTT的具体实现,具体来说以后的MQTT业务层应该有且只能使用MQTT AL提供的相关功能(API 数据结构 流程等)。MQTT AL定义MQTT的标准,用来屏蔽各个MQTT协议实现的差异(如软件库 或者硬件),让上层业务无需关心MQTT的实现部分。
MQTT AL的api接口声明在中,使用相关的接口需要包含该头文件,关于函数的详细参数请参考该头文件的声明。 配置并连接对接服务器的所有信息保存在结构体mqtt_al_conpara_t中,其定义在mqtt_al.h中,如下:
- /** [url=home.php?mod=space&uid=2666770]@Brief[/url] defines the paramter for the mqtt connect */
- typedef struct
- {
- mqtt_al_string_t serveraddr; ///< mqtt server:support domain name and dot format
- int serverport; ///< mqtt server port
- mqtt_al_security_para_t *security; ///< if NULL,will use en_mqtt_security_none
- en_mqtt_al_verison version; ///< mqtt version will be used
- mqtt_al_string_t clientid; ///< mqtt connect client identifier
- mqtt_al_string_t user; ///< mqtt connect user
- mqtt_al_string_t passwd; ///< mqtt connect passwd
- int cleansession; ///< 1 clean the session while 0 not
- mqtt_al_willmsg_t *willmsg; ///< mqtt connect will message
- unsigned short keepalivetime;///< keep alive time
- char conret; ///< mqtt connect code, return by server
- int timeout; ///< how much time will be blocked
- }mqtt_al_conpara_t;
复制代码
其中的一些参数值已经使用枚举给出: - security:安全连接参数(使用此需要确保mbedtls组件开启)
枚举值如下:
- /** @brief this enum all the transport encode we support now*/
- typedef enum
- {
- en_mqtt_al_security_none = 0, ///< no encode
- en_mqtt_al_security_psk, ///< use the psk mode in transport layer
- en_mqtt_al_security_cas, ///< use the ca mode in transport layer,only check the server
- en_mqtt_al_security_cacs, ///< use the ca mode in transport layer,both check the server and client
- en_mqtt_al_security_end, ///< the end for the mqtt
- }en_mqtt_al_security_t;
复制代码
枚举值如下:
- /** @brief enum the mqtt version*/
- typedef enum
- {
- en_mqtt_al_version_3_1_0 = 0,
- en_mqtt_al_version_3_1_1,
- }en_mqtt_al_verison;
复制代码
另外,在复制的时候还需要注意,很多字符串参数都是使用mqtt_al_string_t类型,其定义如下:
- /** brief defines for all the ascii or data used in the mqtt engine */
- typedef struct
- {
- char *data; ///< buffer to storage the data
- int len; ///< buffer data length
- }mqtt_al_string_t; //used to represent any type string (maybe not ascii)
复制代码
在配置结构体完成之后,调用配置函数进行配置并连接,API如下:
- /**
- *@brief: you could use this function to connect to the mqtt server
- *
- *@param[in] conparam the parameter we will use in connect, refer to the data mqtt_al_conpara_t
- *@
- *@return: first you should check the return value then the return code in conparam
- *
- *@retval NULL which means you could not get the connect to the server,maybe network reason
- *@retval handle, which means you get the context, please check the conparam for more
- */
- void * mqtt_al_connect( mqtt_al_conpara_t *conparam);
复制代码
连接之后,首先应该检查返回的handle指针是否为空,其次应该检查mqtt_al_conpara_t结构体中conret的值,有以下枚举值:
- /** @brief defines for the mqtt connect code returned by the server */
- #define cn_mqtt_al_con_code_ok 0 ///< has been accepted by the server
- #define cn_mqtt_al_con_code_err_version 1 ///< server not support the version
- #define cn_mqtt_al_con_code_err_clientID 2 ///< client identifier is error
- #define cn_mqtt_al_con_code_err_netrefuse 3 ///< server service not ready yet
- #define cn_mqtt_al_con_code_err_u_p 4 ///< bad user name or password
- #define cn_mqtt_al_con_code_err_auth 5 ///< the client is not authorized
- #define cn_mqtt_al_con_code_err_unkown -1 ///< unknown reason
- #define cn_mqtt_al_con_code_err_network 0x80 ///< network reason,you could try once more
复制代码
订阅消息EMQ-X服务器有心跳机制,实际应用中订阅之前应该先检查连接状态,本实验中暂不检查。
连接成功后,首先订阅消息,设置回调函数,方便接收下发的命令。 订阅消息的API如下:
- /**
- * @brief you could use this function subscribe a topic from the server
- *
- * @param[in] handle the handle we get from mqtt_al_connect
- *
- * @param[in] subpara refer to the data mqtt_al_subpara_t
- *
- * @return 0 success -1 failed
- *
- */
- int mqtt_al_subscribe(void *handle, mqtt_al_subpara_t *subpara);
复制代码
两个参数中,handle参数是之前使用mqtt_al_connect时返回的指针,直接传入即可,subpara参数需要重点讲述。 mqtt_al_subpara_t的定义如下:
- /** @brief defines the mqtt subscribe parameter*/
- typedef struct
- {
- mqtt_al_string_t topic; ///< topic will be subscribe
- en_mqtt_al_qos_t qos; ///< qos requested
- fn_mqtt_al_msg_dealer dealer; ///< message dealer:used to deal the received message
- void *arg; ///< used for the message dealer
- char subret; ///< subscribe result code
- int timeout; ///< how much time will be blocked
- }mqtt_al_subpara_t;
复制代码
其中订阅消息质量qos的枚举值如下:
- /** @brief enum all the qos supported for the application */
- typedef enum
- {
- en_mqtt_al_qos_0 = 0, ///< mqtt QOS 0
- en_mqtt_al_qos_1, ///< mqtt QOS 1
- en_mqtt_al_qos_2, ///< mqtt QOS 2
- en_mqtt_al_qos_err
- }en_mqtt_al_qos_t;
复制代码
dealer是一个函数指针,接收到下发命令之后会被回调,arg是回调函数参数,其定义如下:
- /** @brief defines the mqtt received message dealer, called by mqtt engine*/
- typedef void (*fn_mqtt_al_msg_dealer)(void *arg,mqtt_al_msgrcv_t *msg);
复制代码
订阅之后,可以通过mqtt_al_subpara_t结构体中的subret值查看是否订阅成功。
发布消息发布消息的API如下:
- /**
- * @brief you could use this function to publish a message to the server
- *
- * @param[in] handle the handle we get from mqtt_al_connect
- *
- * @param[in] msg the message we will publish, see the data mqtt_al_pubpara_t
- *
- * @return 0 success -1 failed
- *
- */
- int mqtt_al_publish(void *handle, mqtt_al_pubpara_t *pubpara);
复制代码
两个参数中,handle参数是之前使用mqtt_al_connect时返回的指针,直接传入即可,pubpara参数需要重点讲述。 mqtt_al_pubpara_t的定义如下:
- /** @brief defines for the mqtt publish */
- typedef struct
- {
- mqtt_al_string_t topic; ///< selected publish topic
- mqtt_al_string_t msg; ///< message to be published
- en_mqtt_al_qos_t qos; ///< message qos
- int retain; ///< message retain :1 retain while 0 not
- int timeout; ///< how much time will blocked
- }mqtt_al_pubpara_t;
复制代码
MQTT组件自动初始化MQTT在配置之后,会自动初始化。 在SDK目录中的IoT_LINK_1.0.0iot_linklink_main.c文件中可以看到: 2. 配置准备
Makefile配置因为本次实验用到的组件较多: - AT框架
- ESP8266设备驱动
- 串口驱动框架
- cJSON组件
- SAL组件
- MQTT组件
这些实验代码全部编译下来,有350KB,而小熊派开发板所使用的主控芯片STM32L431RCT6的 Flash 仅有256KB,会导致编译器无法链接出可执行文件,所以要在makefile中修改优化选项,修改为-Os参数,即最大限度的优化代码尺寸,并去掉-g参数,即代码只能下载运行,无法调试,如图: ESP8266设备配置在工程目录中的OS_CONFIG/iot_link_config.h文件中,配置ESP8266设备的波特率和设备名称: WIFI对接信息配置SDK:C:UsersAdministrator.icodesdkIoT_LINK_1.0.0(其中Administrator是实验电脑的用户名)。
在SDK目录中的iot_linknetworktcpipesp8266_socketesp8266_socket_imp.c文件中,配置连接信息:
之后修改同路径下的esp8266_socket_imp.mk文件,如图,将 TOP_DIR 改为 SDK_DIR :
修改paho_mqtt文件路径在SDK目录中的iot_linknetworkmqttpaho_mqttpaho_mqtt.mk文件中,如图,将 TOP_DIR 改为 SDK_DIR :
3. 使用mqtt.fx对接EMQ-X
配置对接信息配置如下:
其中ClientID随机生成一个即可。 订阅主题使用mqtt.fx连接客户端,订阅本次实验中的两个主题: - 主题led_cmd:用于发布控制命令
- 主题lightness:用于上报亮度
4. 上云实验
编写实验文件在 Demo 文件夹下创建cloud_test_demo文件夹,在其中创建emqx_mqtt_demo.c文件。 编写代码:
- #include
- #include
- #include
- #define DEFAULT_LIFETIME 60
- #define DEFAULT_SERVER_IPV4 "122.51.89.94"
- #define DEFAULT_SERVER_PORT 1883
- #define CN_MQTT_EP_CLIENTID "emqx-test-001"
- #define CN_MQTT_EP_USERNAME "mculover666"
- #define CN_MQTT_EP_PASSWD "123456789"
- #define CN_MQTT_EP_SUB_TOPIC1 "led_cmd"
- #define CN_MQTT_EP_PUB_TOPIC1 "lightness"
- #define recv_buf_len 100
- static char recv_buffer[recv_buf_len]; //下发数据接收缓冲区
- static int recv_datalen; //表示接收数据长度
- osal_semp_t recv_sync; //命令接收回调函数和处理函数之间的信号量
- char lightness_buf[10];
- static void mqtt_al_msg_dealer(void *arg,mqtt_al_msgrcv_t *msg)
- {
- if((msg->msg.len) < recv_buf_len)
- {
- //保存数据
- memcpy(recv_buffer,msg->msg.data,msg->msg.len );
- recv_buffer[msg->msg.len] = '\0';
- recv_datalen = msg->msg.len;
- printf("recv buf: %s.rn", recv_buffer);
- //释放信号量,交由数据处理线程进行处理
- osal_semp_post(recv_sync);
- }
- else
- {
- printf("recv buf is too small, len = %d.rn", msg->msg.len);
- }
- }
- static int task_recv_cmd_entry(void *args)
- {
- while(1)
- {
- /* 阻塞等待信号量 */
- osal_semp_pend(recv_sync,cn_osal_timeout_forever);
- if(strstr(recv_buffer, "on"))
- {
- printf("-----------------LED ON !!! --------------------rn");
- }
- else if(strstr(recv_buffer, "off"))
- {
- printf("-----------------LED OFF !!! --------------------rn");
- }
- }
- return 0;
- }
- static int task_report_msg_entry(void *args)
- {
- int ret = -1;
- void *handle = NULL;
- mqtt_al_conpara_t config;
- mqtt_al_string_t str_temp;
- mqtt_al_subpara_t subpara_led_cmd;
- mqtt_al_pubpara_t pubpara_lightness;
- int lightness_value = 0;
- /* 配置结构体 */
- str_temp.data = DEFAULT_SERVER_IPV4;
- str_temp.len = sizeof(DEFAULT_SERVER_IPV4);
- config.serveraddr = str_temp;
- config.serverport = DEFAULT_SERVER_PORT;
- config.security = en_mqtt_al_security_none;
- config.version = en_mqtt_al_version_3_1_0;
- str_temp.data = CN_MQTT_EP_CLIENTID;
- str_temp.len = sizeof(CN_MQTT_EP_CLIENTID);
- config.clientid = str_temp;
- str_temp.data = CN_MQTT_EP_USERNAME;
- str_temp.len = sizeof(CN_MQTT_EP_USERNAME);
- config.user = str_temp;
- str_temp.data = CN_MQTT_EP_PASSWD;
- str_temp.len = sizeof(CN_MQTT_EP_PASSWD);
- config.passwd = str_temp;
- config.cleansession = 1;
- config.willmsg = NULL;
- config.keepalivetime = DEFAULT_LIFETIME;
- config.timeout = 30;
- /* 配置并连接服务器 */
- handle = mqtt_al_connect(&config);
- if(handle == NULL)
- {
- /* 连接出错 */
- printf("config error.rn");
- return -1;
- }
- else
- {
- /* 进一步检查服务器返回值 */
- if(config.conret != cn_mqtt_al_con_code_ok)
- {
- /* 服务器返回值出错 */
- printf("server return error, conret = %d.rn", config.conret);
- return -1;
- }
- else
- {
- printf("connect to server success.rn");
- }
- }
- /* 连接成功后,订阅led_cmd主题消息 */
- str_temp.data = CN_MQTT_EP_SUB_TOPIC1;
- str_temp.len = sizeof(CN_MQTT_EP_SUB_TOPIC1);
- subpara_led_cmd.topic = str_temp;
- subpara_led_cmd.qos = en_mqtt_al_qos_0;
- subpara_led_cmd.dealer = mqtt_al_msg_dealer;
- subpara_led_cmd.arg = NULL;
- subpara_led_cmd.timeout = 60;
- ret = mqtt_al_subscribe(handle, &subpara_led_cmd);
- if(ret < 0)
- {
- printf("sub topic %s fail.rn", subpara_led_cmd.topic.data);
- return -1;
- }
- else
- {
- /* 进一步判断是否订阅成功 */
- if(cn_mqtt_al_con_code_ok != subpara_led_cmd.subret)
- {
- printf("sub topic %s fail, subret = %d.rn", subpara_led_cmd.topic.data, subpara_led_cmd.subret);
- return -1;
- }
- else
- {
- printf("sub topic %s success.rn", subpara_led_cmd.topic.data);
- }
- }
-
- /* 每隔10s上报一次数据 */
- str_temp.data = CN_MQTT_EP_PUB_TOPIC1;
- str_temp.len = sizeof(CN_MQTT_EP_PUB_TOPIC1);
- pubpara_lightness.topic = str_temp;
- pubpara_lightness.qos = en_mqtt_al_qos_0;
- pubpara_lightness.retain = 0;
- pubpara_lightness.timeout = 30;
- while(1)
- {
- sprintf(lightness_buf, "%d", lightness_value);
- str_temp.data = lightness_buf;
- str_temp.len = strlen(lightness_buf);
- pubpara_lightness.msg = str_temp;
- ret = mqtt_al_publish(handle, &pubpara_lightness);
- if(ret < 0)
- {
- printf("publish topic %s fail.rn", pubpara_lightness.topic.data);
- return -1;
- }
- else
- {
- printf("publish topic %s success. payload = %s, lightness = %d.rn", pubpara_lightness.topic.data, pubpara_lightness.msg.data, lightness_value);
- }
- lightness_value++;
- osal_task_sleep(10*1000);
- }
- }
- int standard_app_demo_main()
- {
- /* 创建信号量 */
- osal_semp_create(&recv_sync,1,0);
- /* 创建任务 */
- osal_task_create("task_reportmsg",task_report_msg_entry,NULL,0x800,NULL,8);
- osal_task_create("task_recv_cmd",task_recv_cmd_entry,NULL,0x400,NULL,8);
- return 0;
- }
复制代码
添加路径在user_demo.mk中添加如下:
- #example for emqx_mqtt_demo
- ifeq ($(CONFIG_USER_DEMO), "emqx_mqtt_demo")
- user_demo_src = ${wildcard $(TOP_DIR)/targets/STM32L431_BearPi/Demos/cloud_test_demo/emqx_mqtt_demo.c}
- endif
复制代码
添加位置如下: 配置.sdkconfig
特别说明:实验时需要关闭shell组件,否则会因动态内存分配失败而无法连接。
数据上报实验结果编译下载之后,可以在串口助手中看到输出信息:
在订阅了该主题的客户端也可以看到上报数据:
命令下发实验结果在mqtt.fx中下发一条开启命令:
可以看到设备后作出回应:
再下发一条关闭命令:
可以看到设备后作出回应:
关注“小熊派开源社区”微信公众号,回复“通信模组”获取更多工具资料
-------------------------------------END--------------------------------------
0
|
|
|
|