嵌入式技术论坛
直播中

马占云

7年用户 1549经验值
私信 关注
[问答]

求助,mymqtt软件包是否支持多主题订阅及发布?

目前用my_mqtt软件包做的单主题订阅已经调通,但是当把主题数量增加时就会报错,想到之前看到个帖子说paho_mqtt V1.1.0只支持1个主题订阅,请教各位是这样的吗?是否有人使用过多主题订阅?
如果不支持的话,是否可以曲线救国,通过更改主题来实现多个主题的订阅?

回帖(2)

王磊

2022-5-9 09:47:09
Paho MQTT 和 myMQTT 都是支持订阅多个主题的,myMQTT 订阅多个主题需要在配置中对最大订阅主题的个数进行修改,方法如下所示。

比如说你定义的最大订阅个数是3,直接像下面这样写就行了
/* set subscribe table and event callback */
client.message_handlers[0].topicFilter = rt_strdup(MQTT_SUBTOPIC0);
client.message_handlers[0].callback = mqtt_sub_callback;
client.message_handlers[0].qos = QOS1;
/* set subscribe table and event callback */
client.message_handlers[1].topicFilter = rt_strdup(MQTT_SUBTOPIC1);
client.message_handlers[1].callback = mqtt_sub_callback;
client.message_handlers[1].qos = QOS1;
/* set subscribe table and event callback */
client.message_handlers[2].topicFilter = rt_strdup(MQTT_SUBTOPIC2);
client.message_handlers[2].callback = mqtt_sub_callback;
client.message_handlers[2].qos = QOS1;

举报

ss

2022-5-9 09:47:45
我没有用过umqtt,但是看了一下源码,umqtt是支持多主题订阅的。
umqtt_subscribe的过程可以简化为:
1.通过语句取得client的sub_rec_list,若链表不为空,则判断topic是否已存在;
_cnt = rt_list_isempty(&client->sub_recv_list);
rt_list_for_each(node, &client->sub_recv_list)
     ....
2.发送sub数据,若收到suback,则将新节点插入到client的sub_rec_list链表中
rt_list_insert_after(&client->sub_recv_list, &p_subtop->next_list);
完整的umqtt_subscrbie的代码如下:
int umqtt_subscribe(struct umqtt_client *client, const char *topic, enum umqtt_qos qos, umqtt_subscribe_cb callback)
{
    int _ret = 0;
    int _length = 0;
    int _cnt = 0;
    struct subtop_recv_handler *p_subtop = RT_NULL;
    rt_list_t *node = RT_NULL;
    struct umqtt_msg encode_msg = { 0 };
    struct umqtt_msg_ack msg_ack = { 0 };
    RT_ASSERT(client);
    RT_ASSERT(topic);
    _cnt = rt_list_isempty(&client->sub_recv_list);
    if (_cnt == 0)
    {
        _cnt = 0;
        rt_list_for_each(node, &client->sub_recv_list)
        {
            p_subtop = rt_list_entry(node, struct subtop_recv_handler, next_list);
            if (p_subtop->topicfilter
            && (rt_strcmp(p_subtop->topicfilter, topic) == 0))
            {
                LOG_D(" subscribe topic(%s) is already subscribed.", topic);
                goto exit;
            }
        }
        _length = rt_list_len(&client->sub_recv_list);
    }
    if (_length > client->sub_recv_list_len)
    {
        _ret = UMQTT_MEM_FULL;
        LOG_E(" subscribe size(%d) is not enough! now length(%d)!", client->sub_recv_list_len, _length);
        goto exit;
    }
    else
    {
        rt_memset(&encode_msg, 0, sizeof(encode_msg));
        encode_msg.header.bits.qos = UMQTT_QOS1;
        encode_msg.msg.subscribe.packet_id = get_next_packetID(client);
        encode_msg.msg.subscribe.topic_filter[0].topic_filter = topic;
        encode_msg.msg.subscribe.topic_filter[0].filter_len = strlen(topic);
        encode_msg.msg.subscribe.topic_filter[0].req_qos.request_qos = qos;
        encode_msg.msg.subscribe.topic_count = 1;
        rt_memset(client->send_buf, 0, sizeof(rt_uint8_t) * client->mqtt_info.send_size);
        _length = umqtt_encode(UMQTT_TYPE_SUBSCRIBE, client->send_buf, client->mqtt_info.send_size, &encode_msg);
        if (_length <= 0)
        {
            _ret = UMQTT_ENCODE_ERROR;
            LOG_E(" subscribe encode failed! topic: %s", topic);
            goto exit;
        }
        client->send_len = _length;
        _ret = umqtt_trans_send(client->sock, client->send_buf, client->send_len, client->mqtt_info.send_timeout);
        if (_ret < 0)
        {
            _ret = UMQTT_SEND_FAILED;
            LOG_E(" subscribe trans send failed!");
            goto exit;
        }
        set_uplink_recon_tick(client, UPLINK_LAST_TICK);
        rt_memset(&msg_ack, 0, sizeof(msg_ack));
        if (RT_EOK == rt_mq_recv(client->msg_queue,
                                &msg_ack, sizeof(struct umqtt_msg_ack),
                                rt_tick_from_millisecond(client->mqtt_info.send_timeout * 1000)))
        {
            if (msg_ack.msg_type == UMQTT_TYPE_SUBACK)
            {
                p_subtop = RT_NULL;
                p_subtop = (struct subtop_recv_handler *)rt_calloc(1, sizeof(struct subtop_recv_handler));
                RT_ASSERT(p_subtop);
                LOG_D(" start assign datas !");
                p_subtop->topicfilter = rt_strdup(topic);
                p_subtop->qos = qos;
                if (callback)
                {
                    p_subtop->callback = callback;
                }
                rt_list_insert_after(&client->sub_recv_list, &p_subtop->next_list);
                set_uplink_recon_tick(client, UPLINK_NEXT_TICK);
                _ret = UMQTT_OK;
                LOG_I("subscribe ack ok! ");
                goto exit;
            }
            else
            {
                _ret = UMQTT_READ_ERROR;
                LOG_E("subscribe ack error!");
                goto exit;
            }
        }
        else
        {
            _ret = UMQTT_READ_FAILED;
            LOG_E(" subscribe recv message timeout! topic: %s", topic);
            goto exit;
        }
    }
exit:
    return _ret;
}
此外,内部的umqtt_deliver_message函数中,也是遍历链表,然后执行回调的。
因此,umqtt支持同一client的多主题订阅。

举报

更多回帖

发帖
×
20
完善资料,
赚取积分