连志安老师,文章:https://hARMonyos.51cto.com/posts/1780
我们使用的是paho mqtt软件包,这里介绍一下怎么使用mqtt协议编程。关于鸿蒙系统的mqtt移植好的软件包,相关github链接如下:
https://gitee.com/qidiyun/harmony_mqtt
这里提供一个简单的编程示例:
这里我们使用MQTTClient编程模型,他支持多任务多线程,非常适合用在鸿蒙系统上。
1. 网络初始化
这里定义一个 Network 结构体,然后指定我们的MQTT服务器的IP和端口号。
- Network n;
- //初始化结构体
- NetworkInit(&n);
- //连接到指定的MQTT服务器IP、端口号
- NetworkConnect(&n, “XXX.XXX.XXX.XXX”, XXXX);
复制代码
2. 设置MQTT缓存和启动MQTT线程
我们这里使用的是MQTT线程功能。
- MQTTClientinit(&c, &n, 1000, buf, 100, readbuf, 100);
- MQTTStartTask(&c);
复制代码
3. 设置MQTT相关参数
接下来我们设置MQTT的相关参数,包括版本号、客户端ID、账户密码等
- MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
- data.willFlag = 0;
- //MQTT版本为 v3
- data.MQTTVersion = 3;
- //设置客户端ID
- data.clientID.cstring = opts.clientid;
- //设置客户端账户
- data.username.cstring = opts.username;
- //设置客户端密码
- data.password.cstring = opts.password;
- data.keepAliveInterval = 10;
- data.cleansession = 1;
- //连接到MQTT服务器
- rc = MQTTConnect(&c, &data);
复制代码
4. 订阅主题和接收消息
订阅主题可以使用如下函数
- MQTTSubscribe(&c, topic, opts.qos, messageArrived);
复制代码
它的函数原型如下:
DLLExport int MQTTSubscribe(MQTTClient* client, const char* topicFilter, enum QoS, messageHandler);
复制代码
其中:MQTTClient* c :我们前面定义的MQTTClient结构体
const char* topicFilter:订阅的主题
messageHandler messageHandler :接收到主题信息后的回调处理函数。
例如上面我们的回调函数是 messageArrived ,它的原型如下:
void messageArrived(MessageData* md)
- {
- MQTTMessage* message = md->message;
- //打印接收到的消息的长度、和消息内容
- printf("%.*s", (int)message->payloadlen, (char*)message->payload);
- }
复制代码
5. 发送消息
发送消息也比较简单,我们只需要设置好我们的主题和消息内容即可
- memset(&pubmsg, '\0', sizeof(pubmsg));
- //消息内容为 hello harmonyOS !
- pubmsg.payload = (void*)"hello harmonyOS !";
- //消息长度
- pubmsg.payloadlen = strlen((char*)pubmsg.payload);
- pubmsg.qos = QOS0;
- pubmsg.retained = 0;
- pubmsg.dup = 0;
- //推送消息,主题为 pubtest
- MQTTPublish(&c, "pubtest", &pubmsg);
复制代码
完整源码如下:
- #include
- #include
- #include "ohos_init.h"
- #include "cmsis_os2.h"
- #include
- #include "hi_wifi_api.h"
- //#include "wifi_sta.h"
- #include "lwip/ip_addr.h"
- #include "lwip/netifapi.h"
- #include "lwip/sockets.h"
- #include "MQTTClient.h"
- /**
- * MQTT URI farmat:
- * domain mode
- * tcp://iot.eclipse.org:1883
- *
- * ipv4 mode
- * tcp://192.168.10.1:1883
- * ssl://192.168.10.1:1884
- *
- * ipv6 mode
- * tcp://[fe80::20c:29ff:fe9a:a07e]:1883
- * ssl://[fe80::20c:29ff:fe9a:a07e]:1884
- */
- #define MQTT_URI "tcp://106.13.62.194:1883"
- struct opts_struct
- {
- char* clientid;
- int nodelimiter;
- char* delimiter;
- enum QoS qos;
- char* username;
- char* password;
- char* host;
- int port;
- int showtopics;
- } opts =
- {
- (char*)"stdout-subscriber", 0, (char*)"n", QOS2, NULL, NULL, (char*)"106.13.62.194", 1883, 1
- };
- void messageArrived(MessageData* md)
- {
- MQTTMessage* message = md->message;
- if (opts.showtopics)
- printf("%.*st", md->topicName->lenstring.len, md->topicName->lenstring.data);
- if (opts.nodelimiter)
- printf("%.*s", (int)message->payloadlen, (char*)message->payload);
- else
- printf("%.*s%s", (int)message->payloadlen, (char*)message->payload, opts.delimiter);
- //fflush(stdout);
- }
- unsigned char buf[100];
- unsigned char readbuf[100];
- int mqtt_test(void)
- {
- int rc = 0;
-
- MQTTMessage pubmsg;
-
- char* topic = "test";
- if (strchr(topic, '#') || strchr(topic, '+'))
- opts.showtopics = 1;
- if (opts.showtopics)
- printf("topic is %sn", topic);
- Network n;
- MQTTClient c;
- NetworkInit(&n);
- NetworkConnect(&n, opts.host, opts.port);
-
- MQTTClientInit(&c, &n, 1000, buf, 100, readbuf, 100);
- MQTTStartTask(&c);
- MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
- data.willFlag = 0;
- data.MQTTVersion = 3;
- data.clientID.cstring = opts.clientid;
- data.username.cstring = opts.username;
- data.password.cstring = opts.password;
- data.keepAliveInterval = 10;
- data.cleansession = 1;
- printf("Connecting to %s %dn", opts.host, opts.port);
-
- rc = MQTTConnect(&c, &data);
- printf("Connected %dn", rc);
-
-
- printf("Subscribing to %sn", topic);
- rc = MQTTSubscribe(&c, topic, opts.qos, messageArrived);
- printf("Subscribed %dn", rc);
- memset(&pubmsg, '\0', sizeof(pubmsg));
- pubmsg.payload = (void*)"hello harmonyOS !";
- pubmsg.payloadlen = strlen((char*)pubmsg.payload);
- pubmsg.qos = QOS0;
- pubmsg.retained = 0;
- pubmsg.dup = 0;
-
- while (1)
- {
- MQTTPublish(&c, "pubtest", &pubmsg);
- sleep(1);
- }
-
- printf("Stoppingn");
- MQTTDisconnect(&c);
- NetworkDisconnect(&n);
- return 0;
- }
复制代码
|