[文章]OpenHarmony轻量系统开发【11】移植MQTT

阅读量0
0
2
摘要:本文简单介绍如何移植MQTT
适合群体:适用于润和Hi3861开发板
文中所有代码仓库:https://gitee.com/qidiyun/hihope-3861-smart-home-kit
11.1 MQTT介绍
MQTT 是当前最主流的物联网通信协议,需要物联网云平台,例如华为云、阿里云、移动OneNET都支持mqtt。而Hi3861则是一款专为IoT应用场景打造的芯片。本节主要讲如何在鸿蒙系统中通过移植第3方软件包 paho mqtt去实现MQTT协议功能,最后会给出测试验证。为后续的物联网项目打好基础。
友情预告,本节内容较多,源码也贴出来了,大家最好先看一遍,然后再操作一次。
已经移植好的MQTT源码:
https://gitee.com/qidiyun/harmony_mqtt
11.2 MQTT移植
如果不想要自己移植的,可以跳过本节
MQTT 全称为 Message Queuing Telemetry Transport(消息队列遥测传输)是一种基于发布/订阅范式的二进制“轻量级”消息协议,由IB公司发布。针对于网络受限和嵌入式设备而设计的一种数据传输协议。MQTT最大优点在于,可以以极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息服务。作为一种低开销、低带宽占用的即时通讯协议,使其在物联网、小型设备、移动应用等方面有较广泛的应用。MQTT模型如图所示。
更多MQTT协议的介绍见这篇文章: MQTT 协议开发入门
1. 下载paho mqtt软件包,添加到鸿蒙代码中
paho mqtt-c 是基于C语言实现的MQTT客户端,非常适合用在嵌入式设备上。首先下载源码:
https://github.com/eclipse/paho.mqtt.embedded-c
下载之后解压,会得到这么一个文件夹:
如何在鸿蒙系统中移植 Paho-MQTT 实现MQTT协议-鸿蒙HarmonyOS技术社区
我们在鸿蒙系统源码的 third_party 文件夹下创建一个 pahomqtt 文件夹,然后把解压后的所有文件都拷贝到 pahomqtt 文件夹下
下一步,我们在pahomqtt 文件夹下面新建BUILD.gn文件,用来构建编译。其内容如下:
  1. # Copyright (c) 2020 Huawei Device Co., Ltd.
  2. # Licensed under the Apache License, Version 2.0 (the "License");
  3. # you may not use this file except in compliance with the License.
  4. # You may obtain a copy of the License at
  5. #
  6. #     [url]http://www.apache.org/licenses/LICENSE-2.0[/url]
  7. #
  8. # Unless required by applicable law or agreed to in writing, software
  9. # distributed under the License is distributed on an "AS IS" BASIS,
  10. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  11. # See the License for the specific language governing permissions and
  12. # limitations under the License.

  13. import("//build/lite/config/component/lite_component.gni")
  14. import("//build/lite/ndk/ndk.gni")

  15. config("pahomqtt_config") {
  16.     include_dirs = [
  17.         "MQTTPacket/src",
  18.         "MQTTClient-C/src",
  19.         "MQTTClient-C/src/liteOS",
  20.         "//kernel/liteos_m/components/cmsis/2.0",
  21.     ]
  22. }

  23. pahomqtt_sources = [
  24. "MQTTClient-C/src/liteOS/MQTTLiteOS.c",
  25. "MQTTClient-C/src/MQTTClient.c",

  26. "MQTTPacket/src/MQTTConnectClient.c",
  27. "MQTTPacket/src/MQTTConnectServer.c",
  28. "MQTTPacket/src/MQTTDeserializePublish.c",
  29. "MQTTPacket/src/MQTTFormat.c",
  30. "MQTTPacket/src/MQTTPacket.c",
  31. "MQTTPacket/src/MQTTSerializePublish.c",
  32. "MQTTPacket/src/MQTTSubscribeClient.c",
  33. "MQTTPacket/src/MQTTSubscribeServer.c",
  34. "MQTTPacket/src/MQTTUnsubscribeClient.c",
  35. "MQTTPacket/src/MQTTUnsubscribeServer.c",
  36. ]

  37. lite_library("pahomqtt_static") {
  38.     target_type = "static_library"
  39.     sources = pahomqtt_sources
  40.     public_configs = [ ":pahomqtt_config" ]
  41. }

  42. lite_library("pahomqtt_shared") {
  43.     target_type = "shared_library"
  44.     sources = pahomqtt_sources
  45.     public_configs = [ ":pahomqtt_config" ]
  46. }

  47. ndk_lib("pahomqtt_ndk") {
  48.     if (board_name != "hi3861v100") {
  49.         lib_extension = ".so"
  50.         deps = [
  51.             ":pahomqtt_shared"
  52.         ]
  53.     } else {
  54.         deps = [
  55.             ":pahomqtt_static"
  56.         ]
  57.     }
  58.     head_files = [
  59.         "//third_party/pahomqtt"
  60.     ]
  61. }
复制代码
2. 移植
我们使用到的是MQTTClient-C的代码,该代码支持多线程。
(1)创建LiteOS文件夹
MQTT已经提供了Linux和freertos的移植,这里我们参考,新建文件夹:
third_partypahomqttMQTTClient-CsrcliteOS
里面存放两个文件:
MQTTLiteOS.c 和 MQTTLiteOS.h

内容如下:
  1. #include "MQTTLiteOS.h"

  2. //用来创建线程
  3. int ThreadStart(Thread* thread, void (*fn)(void*), void* arg)
  4. {
  5.         int rc = 0;
  6.         thread = thread;

  7.         osThreadAttr_t attr;

  8.     attr.name = "MQTTTask";
  9.     attr.attr_bits = 0U;
  10.     attr.cb_mem = NULL;
  11.     attr.cb_size = 0U;
  12.     attr.stack_mem = NULL;
  13.     attr.stack_size = 2048;
  14.     attr.priority = osThreadGetPriority(osThreadGetId());

  15.     rc = (int)osThreadNew((osThreadFunc_t)fn, arg, &attr);

  16.         return rc;
  17. }
  18. //定时器初始化
  19. void TimerInit(Timer* timer)
  20. {
  21.         timer->end_time = (struct timeval){0, 0};
  22. }

  23. char TimerIsExpired(Timer* timer)
  24. {
  25.         struct timeval now, res;
  26.         gettimeofday(&now, NULL);
  27.         timersub(&timer->end_time, &now, &res);
  28.         return res.tv_sec < 0 || (res.tv_sec == 0 && res.tv_usec <= 0);
  29. }


  30. void TimerCountdownMS(Timer* timer, unsigned int timeout)
  31. {
  32.         struct timeval now;
  33.         gettimeofday(&now, NULL);
  34.         struct timeval interval = {timeout / 1000, (timeout % 1000) * 1000};
  35.         timeradd(&now, &interval, &timer->end_time);
  36. }


  37. void TimerCountdown(Timer* timer, unsigned int timeout)
  38. {
  39.         struct timeval now;
  40.         gettimeofday(&now, NULL);
  41.         struct timeval interval = {timeout, 0};
  42.         timeradd(&now, &interval, &timer->end_time);
  43. }


  44. int TimerLeftMS(Timer* timer)
  45. {
  46.         struct timeval now, res;
  47.         gettimeofday(&now, NULL);
  48.         timersub(&timer->end_time, &now, &res);
  49.         //printf("left %d msn", (res.tv_sec < 0) ? 0 : res.tv_sec * 1000 + res.tv_usec / 1000);
  50.         return (res.tv_sec < 0) ? 0 : res.tv_sec * 1000 + res.tv_usec / 1000;
  51. }




  52. void MutexInit(Mutex* mutex)
  53. {
  54.         mutex->sem = osSemaphoreNew(1, 1, NULL);
  55. }

  56. int MutexLock(Mutex* mutex)
  57. {
  58.         return osSemaphoreAcquire(mutex->sem, LOS_WAIT_FOREVER);
  59. }

  60. int MutexUnlock(Mutex* mutex)
  61. {
  62.         return osSemaphoreRelease(mutex->sem);
  63. }

  64. //接受数据
  65. int ohos_read(Network* n, unsigned char* buffer, int len, int timeout_ms)
  66. {
  67.         struct timeval interval = {timeout_ms / 1000, (timeout_ms % 1000) * 1000};
  68.         if (interval.tv_sec < 0 || (interval.tv_sec == 0 && interval.tv_usec <= 0))
  69.         {
  70.                 interval.tv_sec = 0;
  71.                 interval.tv_usec = 100;
  72.         }

  73.         setsockopt(n->my_socket, SOL_SOCKET, SO_RCVTIMEO, (char *)&interval, sizeof(struct timeval));

  74.         int bytes = 0;
  75.         while (bytes < len)
  76.         {
  77.                 int rc = recv(n->my_socket, &buffer[bytes], (size_t)(len - bytes), 0);
  78.                 if (rc == -1)
  79.                 {
  80.                         if (errno != EAGAIN && errno != EWOULDBLOCK)
  81.                           bytes = -1;
  82.                         break;
  83.                 }
  84.                 else if (rc == 0)
  85.                 {
  86.                         bytes = 0;
  87.                         break;
  88.                 }
  89.                 else
  90.                         bytes += rc;
  91.         }
  92.         return bytes;
  93. }

  94. //写数据
  95. int ohos_write(Network* n, unsigned char* buffer, int len, int timeout_ms)
  96. {
  97.         struct timeval tv;

  98.         tv.tv_sec = 0;  /* 30 Secs Timeout */
  99.         tv.tv_usec = timeout_ms * 1000;  // Not init'ing this can cause strange errors

  100.         setsockopt(n->my_socket, SOL_SOCKET, SO_SNDTIMEO, (char *)&tv,sizeof(struct timeval));
  101.         int        rc = send(n->my_socket, buffer, len, 0);
  102.         return rc;
  103. }

  104. //网络初始化
  105. void NetworkInit(Network* n)
  106. {
  107.         n->my_socket = 0;
  108.         n->mqttread = ohos_read;
  109.         n->mqttwrite = ohos_write;
  110. }

  111. //网络连接
  112. int NetworkConnect(Network* n, char* addr, int port)
  113. {
  114.         int type = SOCK_STREAM;
  115.         struct sockaddr_in address;
  116.         int rc = -1;
  117.         sa_family_t family = AF_INET;
  118.         struct addrinfo *result = NULL;
  119.         struct addrinfo hints = {0, AF_UNSPEC, SOCK_STREAM, IPPROTO_TCP, 0, NULL, NULL, NULL};

  120.         if ((rc = getaddrinfo(addr, NULL, &hints, &result)) == 0)
  121.         {
  122.                 struct addrinfo* res = result;

  123.                 /* prefer ip4 addresses */
  124.                 while (res)
  125.                 {
  126.                         if (res->ai_family == AF_INET)
  127.                         {
  128.                                 result = res;
  129.                                 break;
  130.                         }
  131.                         res = res->ai_next;
  132.                 }

  133.                 if (result->ai_family == AF_INET)
  134.                 {
  135.                         address.sin_port = htons(port);
  136.                         address.sin_family = family = AF_INET;
  137.                         address.sin_addr = ((struct sockaddr_in*)(result->ai_addr))->sin_addr;
  138.                 }
  139.                 else
  140.                         rc = -1;

  141.                 freeaddrinfo(result);
  142.         }

  143.         if (rc == 0)
  144.         {
  145.                 n->my_socket = socket(family, type, 0);
  146.                 if (n->my_socket != -1)
  147.                         rc = connect(n->my_socket, (struct sockaddr*)&address, sizeof(address));
  148.                 else
  149.                         rc = -1;
  150.         }

  151.         return rc;
  152. }


  153. void NetworkDisconnect(Network* n)
  154. {
  155.         close(n->my_socket);
  156. }

复制代码
至此我们移植基本结束
11.3 测试代码
测试代码比较好写。主要是3个文件,内容我都贴出来了:

(1)BUILD.gn文件内容:
static_library("mqtt_test") {
    sources = [
        "mqtt_test.c",
        "mqtt_entry.c"
    ]

    include_dirs = [
        "//utils/native/lite/include",
        "//kernel/liteos_m/components/cmsis/2.0",
        "//base/iot_hardware/interfaces/kits/wifiiot_lite",
        "//vendor/hisi/hi3861/hi3861/third_party/lwip_sack/include",
        "//foundation/communication/interfaces/kits/wifi_lite/wifiservice",
        "//third_party/pahomqtt/MQTTPacket/src",
        "//third_party/pahomqtt/MQTTClient-C/src",
        "//third_party/pahomqtt/MQTTClient-C/src/liteOS",
    ]
#表示需要a_myparty 软件包
    deps = [
        "//third_party/pahomqtt:pahomqtt_static",
    ]
}
(2)mqtt_entry.c文件
主要是进行热点连接,因为我们要使用MQTT需要用到网络。热点连接的代码之前在第9章已经讲说,这里就不完全贴了,代码仓库也有,主要的代码部分:
void wifi_sta_task(void *arg)
{
    arg = arg;
   
    //连接热点
    hi_wifi_start_sta();

    while(wifi_ok_flg == 0)
    {
        usleep(30000);
    }
   
   
    usleep(2000000);

    //开始进入MQTT测试
    mqtt_test();
}
(3)mqtt_test.c 文件则是编写了一个简单的MQTT测试代码

其中测试用的mqtt服务器是我自己的服务器:5.196.95.208
大家也可以改成自己的。
  1. #include <stdio.h>
  2. #include <unistd.h>
  3. #include "ohos_init.h"
  4. #include "cmsis_os2.h"

  5. #include "hi_wifi_api.h"
  6. #include "lwip/ip_addr.h"
  7. #include "lwip/netifapi.h"
  8. #include "lwip/sockets.h"

  9. #include "MQTTClient.h"


  10. static MQTTClient mq_client;

  11. unsigned char *onenet_mqtt_buf;
  12. unsigned char *onenet_mqtt_readbuf;
  13. int buf_size;

  14. Network n;
  15. MQTTPacket_connectData data = MQTTPacket_connectData_initializer;  

  16. //消息回调函数
  17. void mqtt_callback(MessageData *msg_data)
  18. {
  19.     size_t res_len = 0;
  20.     uint8_t *response_buf = NULL;
  21.     char topicname[45] = { "$crsp/" };

  22.     LOS_ASSERT(msg_data);

  23.     printf("topic %.*s receive a messagern", msg_data->topicName->lenstring.len, msg_data->topicName->lenstring.data);

  24.     printf("message is %.*srn", msg_data->message->payloadlen, msg_data->message->payload);

  25. }

  26. int mqtt_connect(void)
  27. {
  28.         int rc = 0;
  29.    
  30.         NetworkInit(&n);
  31.         NetworkConnect(&n, "5.196.95.208", 1883);

  32.     buf_size  = 2048;
  33.     onenet_mqtt_buf = (unsigned char *) malloc(buf_size);
  34.     onenet_mqtt_readbuf = (unsigned char *) malloc(buf_size);
  35.     if (!(onenet_mqtt_buf && onenet_mqtt_readbuf))
  36.     {
  37.         printf("No memory for MQTT client buffer!");
  38.         return -2;
  39.     }

  40.         MQTTClientInit(&mq_client, &n, 1000, onenet_mqtt_buf, buf_size, onenet_mqtt_readbuf, buf_size);
  41.        
  42.     MQTTStartTask(&mq_client);


  43.     data.keepAliveInterval = 30;
  44.     data.cleansession = 1;
  45.         data.clientID.cstring = "ohos_hi3861";
  46.         data.username.cstring = "123456";
  47.         data.password.cstring = "222222";

  48.         data.keepAliveInterval = 10;
  49.         data.cleansession = 1;
  50.        
  51.     mq_client.defaultMessageHandler = mqtt_callback;

  52.         //连接服务器
  53.         rc = MQTTConnect(&mq_client, &data);

  54.         //订阅消息,并设置回调函数
  55.         MQTTSubscribe(&mq_client, "ohossub", 0, mqtt_callback);

  56.         while(1)
  57.         {
  58.                 MQTTMessage message;

  59.                 message.qos = QOS1;
  60.                 message.retained = 0;
  61.                 message.payload = (void *)"openharmony";
  62.                 message.payloadlen = strlen("openharmony");

  63.                 //发送消息
  64.                 if (MQTTPublish(&mq_client, "ohospub", &message) < 0)
  65.                 {
  66.                         return -1;
  67.                 }
  68.         }

  69.         return 0;
  70. }


  71. void mqtt_test(void)
  72. {
  73.     mqtt_connect();
  74. }
复制代码
到这里就完成了代码部分,可以开始编译了。
11.4 实验这里我们需要先下载一个 Windows电脑端的 MQTT客户端,这样我们就可以用电脑订阅开发板的MQTT主题信息了。
电脑版的mqtt客户端下载链接: https://repo.eclipse.org/content ... .paho.ui.app/1.1.1/
我们选择这一个:

弄完后打开软件,按图操作:

此时我们去查看 我们电脑端的MQTT客户端软件,可以看到右边已经有接收MQTT信息了,主题未 ohospub,消息内容为 openharmony,说明实验成功。

电脑发送主题为ohossub,内容为123456,查看串口打印,可以看到也收到了数据




回帖

声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容图片侵权或者其他问题,请联系本站作侵删。 侵权投诉
链接复制成功,分享给好友