综合技术交流
直播中

听风说梦

9年用户 902经验值
擅长:可编程逻辑 嵌入式技术 EMC/EMI设计
私信 关注
[经验]

IG902如何上传数据到MQTT云平台EMQX ?

EMQX 是全球领先的开源物联网MQTT 5.0服务器,高并发、低延时,内置强大的规则引擎,支持边缘及云端部署,是5G时代大型物联网应用首选技术方案。IG902边缘计算网关,可以通过MQTT协议接入EMQ服务器,帮助您快速搭建IIoT计算方案。   

1. IG902初始化配置

1)开启边缘计算安装相应APP device_supervisor并启动APP。





2)添加设备选择右侧栏目设备监控-》设备列表右上角操作中的“ ”号添加设备。





3)添加变量






2.模拟传感器数据上传:

1)使用软件modbus Slave



   2) 添加模拟数据




3)启动数据发送connection-》connect

3.配置设备云服务及搭建EMQ服务器

1)在IG Web端查看接收数据



4.搭建EMQ服务器




2)以Linux系统为例注册账户下载免费的license 文件,替换etc目录下的对应文件。

3)安装目录bin目录下启动系统 通过./empx start命令启动。



  4)浏览器访问http://服务器地址:18083/#/login  输入用户名和密码用户名admin 密码public登录管理页面



  

5.EMQ平台配置用户名认证

1)关闭匿名登录

配置匿名认证开关:

# 修改etc/emqx.conf

## Value: true | false

allow_anonymous=false

方式一:

修改etc/plugins文件夹下的

emqx_auth_username.conf文件

# etc/plugins/emqx_auth_username.conf

auth.user.password_hash = plain

## 第一组认证数据

auth.user.1.username = admin

auth.user.1.password = public

## 第二组认证数据

auth.user.2.username = test

auth.user.2.password = test

启动认证服务

./emqx_ctlplugins loademqx_auth_username或在web界面开启服务

重启emqx服务

./bin/emqx stop

./bin/emqx start

查看运行状态

./bin/emqx_ctlstatus

方式二: 1)安装mysql 数据库

mysql安装

检查系统本身是否有预装的mysql

rpm -qa | grep mysql #检查是否安装了mysql

rpm -qa | grep mariadb #检查是否安装了mariadb

rpm -e xxx#一般使用此命令即可卸载成功

rpm -e --nodeps xxx #卸载不成功时使用此命令强制卸载)

安装

wget -i -c http://dev.mysql.com/get/mysql57-community-release-el7-10.noarch.rpm

//下载mysql的rpm的包

yum -y installmysql57-community-release-el7-10.noarch.rpm

yum -y installmysql-community-server

2)创建相关表格:

>mysql –u root –p

createdatabase mqtt;

CREATETABLE`mqtt_user`(

`id`int(11) unsignedNOTNULLAUTO_INCREMENT,

`username`varchar(100) DEFAULTNULL,

`password`varchar(100) DEFAULTNULL,

`salt`varchar(35) DEFAULTNULL,

`is_superuser`tinyint(1) DEFAULT0,

`created`datetime DEFAULTNULL,

PRIMARY KEY(`id`),

UNIQUEKEY`mqtt_username`(`username`)

) ENGINE=InnoDBDEFAULTCHARSET=utf8mb4;

3) 修改文件

# etc/plugins/emqx_auth_mysql.conf

## 服务器地址

auth.mysql.server = 127.0.0.1:3306

## 连接池大小

auth.mysql.pool = 8

auth.mysql.username = mysql数据库名

auth.mysql.password = 数据库密码

auth.mysql.database = mqtt数据库

auth.mysql.query_timeout = 5s

4)启动服务mysql认证服务emqx_ctl plugins load emqx_auth_mysql   或通过web管理端插件开启

注意:如果服务未启动查看报错内容解决。1.配置文件出错(检查后修改)。2.mysql未开启远程管理权限

5)添加测试用户

mysql -u root -p

usemysql;

updateusersethost=’%’ whereuser=‘root’ andhost=‘localhost’;

添加认证用户test密码test

INSERTINTO`mqtt_user`( `username`, `password`, `salt`)

VALUES

('test', ' f2ca1bb6c7e907d06dafe4687e579fce76b37e4e93b7605022da52e6ccc26fd2', NULL);

密码采用sha256加密



6)客户端测试使用软件MQTTX  链接:https://mqttx.app/cn/



7)点击connect,提示连接成功



8)IG连接需要添加用户名密码



9)编辑发布脚本


相关发布脚本参考链接:
https://read-ts-en.readthedocs.io/en/latest/PLC%20Supervisor%E7%94%A8%E6%88%B7%E6%89%8B%E5%86%8C.html#mqtt

6.配置客户端数据存储到Mysql

1)手动新建数据表,或根据附件mqtt.sql导入数据库。

修改文件emqx_backend_mysql.conf

# etc/plugins/emqx_backend_mysql.conf

## 服务器地址

auth.mysql.server=127.0.0.1:3306

## 连接池大小

auth.mysql.pool=8

auth.mysql.username=mysql数据库名

auth.mysql.password=数据库密码

auth.mysql.database=mqtt数据库

auth.mysql.query_timeout=5s


2)建立相关表格MySQL设备在线状态表

mqtt_client 存储设备在线状态:

DROPTABLEIFEXISTS`mqtt_client`;

CREATETABLE`mqtt_client`(

`id`int(11) unsignedNOTNULLAUTO_INCREMENT,

`clientid`varchar(64) DEFAULTNULL,

`state`varchar(3) DEFAULTNULL,

`node`varchar(64) DEFAULTNULL,

`online_at`datetime DEFAULTNULL,

`offline_at`datetime DEFAULTNULL,

`created`timestampNULLDEFAULTCURRENT_TIMESTAMP,

PRIMARY KEY(`id`),

KEY`mqtt_client_idx`(`clientid`),

UNIQUEKEY`mqtt_client_key`(`clientid`),

INDEXtopic_index(`id`, `clientid`)

) ENGINE=InnoDBDEFAULTCHARSET=utf8MB4;

1 rows in set(0.00sec)

MySQL主题订阅表

mqtt_sub 存储设备的主题订阅关系:

DROPTABLEIFEXISTS`mqtt_sub`;

CREATETABLE`mqtt_sub`(

`id`int(11) unsignedNOTNULLAUTO_INCREMENT,

`clientid`varchar(64) DEFAULTNULL,

`topic`varchar(180) DEFAULTNULL,

`qos`tinyint(1) DEFAULTNULL,

`created`timestampNULLDEFAULTCURRENT_TIMESTAMP,

PRIMARY KEY(`id`),

KEY`mqtt_sub_idx`(`clientid`,`topic`,`qos`),

UNIQUEKEY`mqtt_sub_key`(`clientid`,`topic`),

INDEXtopic_index(`id`, `topic`)

) ENGINE=InnoDBDEFAULTCHARSET=utf8MB4;

MySQL消息存储表

mqtt_msg 存储MQTT 消息:

DROPTABLEIFEXISTS`mqtt_msg`;

CREATETABLE`mqtt_msg`(

`id`int(11) unsignedNOTNULLAUTO_INCREMENT,

`msgid`varchar(64) DEFAULTNULL,

`topic`varchar(180) NOTNULL,

`sender`varchar(64) DEFAULTNULL,

`node`varchar(64) DEFAULTNULL,

`qos`tinyint(1) NOTNULLDEFAULT'0',

`retain`tinyint(1) DEFAULTNULL,

`payload`blob,

`arrived`datetime NOTNULL,

PRIMARY KEY(`id`),

INDEXtopic_index(`id`, `topic`)

) ENGINE=InnoDBDEFAULTCHARSET=utf8MB4;

MySQL保留消息表

mqtt_retain 存储retain 消息:

DROPTABLEIFEXISTS`mqtt_retain`;

CREATETABLE`mqtt_retain`(

`id`int(11) unsignedNOTNULLAUTO_INCREMENT,

`topic`varchar(180) DEFAULTNULL,

`msgid`varchar(64) DEFAULTNULL,

`sender`varchar(64) DEFAULTNULL,

`node`varchar(64) DEFAULTNULL,

`qos`tinyint(1) DEFAULTNULL,

`payload`blob,

`arrived`timestampNOTNULLDEFAULTCURRENT_TIMESTAMP,

PRIMARY KEY(`id`),

UNIQUEKEY`mqtt_retain_key`(`topic`),

INDEXtopic_index(`id`, `topic`)

) ENGINE=InnoDBDEFAULTCHARSET=utf8MB4;

select* frommqtt_retain wheretopic = "retain";

MySQL消息确认表

mqtt_acked 存储客户端消息确认:

DROPTABLEIFEXISTS`mqtt_acked`;

CREATETABLE`mqtt_acked`(

`id`int(11) unsignedNOTNULLAUTO_INCREMENT,

`clientid`varchar(64) DEFAULTNULL,

`topic`varchar(180) DEFAULTNULL,

`mid`int(11) unsignedDEFAULTNULL,

`created`timestampNULLDEFAULTNULL,

PRIMARY KEY(`id`),

UNIQUEKEY`mqtt_acked_key`(`clientid`,`topic`),

INDEXtopic_index(`id`, `topic`)

) ENGINE=InnoDBDEFAULTCHARSET=utf8MB4;
3)启动服务mysql存储方法一:emqx_ctl plugins load emqx_backend_mysql方法二:通过插件启动对应插件emqx_backend_mysql



4)通过mysql查看客户端发送数据




7.数据显示通过软件Grafana

1)下载链接:https://grafana.com/grafana/download安装完成后访问服务器地址:3000进入管理界面,此处访问10.5.20.245:3000



2)设置相关参数



3)测试连接mysql是否可以连接



4)创建视图表格



5)编辑表格



6)设置数据库及查询语句。



      8.尝试修改上传值



查看传输的数据



   9.EMQ平台下发数据到客户端

1)编辑订阅




2)填写代码

import logging

import json

defctl_test(topic, payload, wizard_api): #定义订阅主函数

logging.info(topic) #打印订阅主题,假定topic为write/plc

logging.info(payload) #打印订阅数据,假定payload数据为{"method":"setValue", "TagName":"SP1", "TagValue":12.3}

payload = json.loads(payload) #反序列化订阅数据

ifpayload["method"] == "setValue": #检测是否为写入数据

message = {payload["TagName"]:payload["TagValue"]} #定义下发消息,包括下发的变量名称和变量值

wizard_api.write_plc_values(message) #调用wizard_api模块中的write_plc_values方法,将message字典中的数据下发至指定变量

3)EMQ服务器端连接websocket



4)发布信息到客户端,填写对应客户端订阅主题,及修改字段{"method":"setValue", "TagName":"字段名", "TagValue":修改值为}此处修改客户端字段temperature修改值为12.3此处输入{"method":"setValue", "TagName":"temperature", "TagValue":12.3}  点击发布按钮



查看值是否修改(modbus中值和变量列表中值都被修改)



相关日志



扩展 python实现对数据的修改

import paho.mqtt.client as mqtt

# 连接成功回调

defon_connect(client, userdata, flags, rc):

print('Connected with result code 'str(rc))

client.subscribe('to_service')

# 消息接收回调

defon_message(client, userdata, msg):

print(msg.topic " "str(msg.payload))

client = mqtt.Client()

# 指定回调函数

client.on_connect = on_connect

client.on_message = on_message

# 建立连接

client.username_pw_set('用户名','密码')

client.connect('服务器地址', 1883, 60)

# 发布消息修改对象修改值

client.publish('to_client',payload='{"method":"setValue", "TagName":"T1", "TagValue":12}',qos=1)

client.loop_forever()

   Python paho-mqtt 模块使用   https://www.jianshu.com/p/ef546f476322  

运行程序查看数据变化




其它:

相关启动命令

启动mysql:  systemctl start mysqld

启动EMQ:   ./emqx/bin/emqx start

启动grafana:service grafana-server start

如果访问服务器IP:端口无法访问的情况

1.检查服务是否开启

2.centos下防火墙开启相关接口

查看命令



开启相关端口例:开启tcp协议端口3306 firewall-cmd --permanent --add-port=3306/tcp

更多回帖

发帖
×
20
完善资料,
赚取积分