嵌入式技术论坛
直播中

河神大人

9年用户 1583经验值
擅长:电源/新能源
私信 关注
[问答]

新手求助kawaii mqtt使用注意事项有哪些

1.jpg

2.jpg

两个问题,在图中已经表达了,就不再手打一遍了

回帖(7)

李莉

2022-10-20 10:32:38
问题1
mqtt_publish失败,一般是掉线了,此处理逻辑因人而异。

kawaii-mqtt提供了自动重连机制,通过mqtt_set_reconnect_handler和mqtt_set_reconnect_data设置重连的句柄和数据。

我个人项目中,如果publish失败,则应用线程不做任何处理,继续发数据,避免ringbuffer被写满,自动重连上线后,发出的数据总是最新的。

具体看你的需求,如果数据不能丢,那就要另行做一段时间的缓存。

问题2
QOS1和QOS2的消息复制在mqtt_ack_handler_create函数中,从heap上开辟了空间,存放payload。所以,mqtt_publish函数返回成功,释放msg的内存就好。收到PUBACK/PUBCOMP后,会调用mqtt_puback_and_pubcomp_packet_handle函数unrecord ack。
保存payload的具体调用过程如下:
mqtt_ack_list_record
—> mqtt_ack_handler_create

mqtt_ack_handler_create的部分业务代码如下,内部自动暂存payload。

static ack_handlers_t *mqtt_ack_handler_create(mqtt_client_t* c, int type, uint16_t packet_id, uint16_t payload_len, message_handlers_t* handler)
{
    ack_handlers_t *ack_handler = NULL;
    ack_handler = (ack_handlers_t *) platform_memory_alloc(sizeof(ack_handlers_t) + payload_len);
    ......
    ack_handler->payload_len = payload_len;
    ack_handler->payload = (uint8_t *)ack_handler + sizeof(ack_handlers_t);
    memcpy(ack_handler->payload, c->mqtt_write_buf, payload_len);    /* save the data in ack handler*/
    return ack_handler;
}
举报

杨秀英

2022-10-20 10:32:52
第一个问题, 发送失败应该踢掉连接,但是mqtt本身有一个线程维持连接的 直接踢掉会出问题,尝试了 很多办法 效果都不好,我是在这边加一个标志位 连段断开 置位,在mqtt keeplive检测的地方 写入连接状态,然后他维持连接的现场自己就踢掉了

mqtt_mark =  mqtt_publish(client_fd,g_run_GW_parament.mqtt_setparament.pub_topic, &msg);
    rt_kprintf("nmqtt data:%dnn",mqtt_mark);
    cJSON_free(mqtt_modbus_data);
        if(mqtt_mark !=0)
        {
    client_fd->lcc_flag = 1;
    }
    然后
    int mqtt_keep_alive(mqtt_client_t* c)
{
    int rc = KAWAII_MQTT_SUCCESS_ERROR;
    rc = mqtt_is_connected(c);
    if (KAWAII_MQTT_SUCCESS_ERROR != rc)
    {
        RETURN_ERROR(rc);
    }
    if(c->lcc_flag==1)
    {
            c->lcc_flag = 0;
            KAWAII_MQTT_LOG_W(" send err and close socketrn");
            /*must realse the socket file descriptor zhaoshimin 20200629*/
            network_release(c->mqtt_network);
            mqtt_set_client_state(c, CLIENT_STATE_DISCONNECTED);
            rc = KAWAII_MQTT_NOT_CONNECT_ERROR; /* PINGRESP not received in keepalive interval */        
    }
mqtt线程本身有重连的功能 所以 置位了 他就自动重连了

第二个问题发送完了 你就不用管了 等待回复状态就好了,
mqtt demo自己做好了剩下的事情
举报

河神大人

2022-10-20 10:33:09
1.jpg

上图是针对第一个问题,我这mqtt_keep_alive函数内已经有和你给出的标志位方案的感觉类似的代码,我是否不需要关注mqtt_publish这个API的返回值了?

第二个问题,你的意思是调用mqtt_publish这个API以后,就可以释放掉msg.payload了吧
2.jpg
举报

杨秀英

2022-10-20 10:33:24
keeplive 那个本身的是检测不到心跳包才 断开连接的 可设置 一般是1min之内检测到,很慢 所以这个在keeplive之上的 检测到发送失败就断开连接

if(c->lcc_flag==1)
    {
            c->lcc_flag = 0;
            KAWAII_MQTT_LOG_W(" send err and close socketrn");
            /*must realse the socket file descriptor zhaoshimin 20200629*/
            network_release(c->mqtt_network);
            mqtt_set_client_state(c, CLIENT_STATE_DISCONNECTED);
            rc = KAWAII_MQTT_NOT_CONNECT_ERROR; /* PINGRESP not received in keepalive interval */        
    }
    else if (platform_timer_is_expired(&c->mqtt_last_sent) || platform_timer_is_expired(&c->mqtt_last_received))
    {
        if (c->mqtt_ping_outstanding)
        {
            KAWAII_MQTT_LOG_W("%s:%d %s()... ping outstanding", __FILE__, __LINE__, __FUNCTION__);
            /*must realse the socket file descriptor zhaoshimin 20200629*/
            network_release(c->mqtt_network);
            mqtt_set_client_state(c, CLIENT_STATE_DISCONNECTED);
            rc = KAWAII_MQTT_NOT_CONNECT_ERROR; /* PINGRESP not received in keepalive interval */
        }
        else
        {
            platform_timer_t timer;
            timer.time =10000;
            rt_kprintf("send keeplive datan");
//            rt_thread_mdelay(3000);
            /*add mutex sem protect zhaoshimin 20200630*/
            platform_mutex_lock(&c->mqtt_write_lock);
            int len = MQTTSerialize_pingreq(c->mqtt_write_buf, c->mqtt_write_buf_size);
            if (len > 0) // send the ping packet
            {
                /*when the socket error, mqtt_ping_outstanding counter must add one*/
                rc = mqtt_send_packet(c, len, &timer);
                c->mqtt_ping_outstanding++;
            }
            platform_mutex_unlock(&c->mqtt_write_lock);
        }
    }
第二个问题 发送完就可以释放了 我的代码就是在发送完释放的

    msg.qos = QOS0;
    msg.payload = mqtt_modbus_data;
    mqtt_mark =  mqtt_publish(client_fd,g_run_GW_parament.mqtt_setparament.pub_topic, &msg);
    rt_kprintf("nmqtt data:%dnn",mqtt_mark);
    cJSON_free(mqtt_modbus_data);
    if(mqtt_mark !=0)
    {
        client_fd->lcc_flag = 1;
    }
举报

刘秀英

2022-10-20 10:33:32
第一个问题最流氓的做法,检测mqtt的心跳,如果检测不到心跳,强制关闭mqtt连接或者tcp连接,然后重连
第二个问题,qos的问题,如果是qos为0,那么你publish消息后就可以清除信息,如果是qos1,他至少保证一次消息被接受,所以你应该在收到服务器的ack消息后清除信息,(同理如果你没有收到服务器的确认信息,那你应该一直发这条数据)。qos2同理,不过qos2我没用过,看协议等到服务器确认消息被接收后清除信息
举报

河神大人

2022-10-20 10:33:41
kawaii-mqtt并没有对用户提供收到服务器ack的PUBLIC API。不知该publish发布API内部是否进行了对msg消息的拷贝备份。
举报

刘秀英

2022-10-20 10:33:52
ack信息应该是数据头PUBACK了,发送数据的API它内部应该不会copy,一般是你把要发送的数据的地址传给他,他开辟内存是把你要发送的数据打包成mqtt协议数据时开辟的。
举报

更多回帖

发帖
×
20
完善资料,
赚取积分