EMQX 是全球领先的开源物联网MQTT 5.0服务器,高并发、低延时,内置强大的规则引擎,支持边缘及云端部署,是5G时代大型物联网应用首选技术方案。IG902边缘计算网关,可以通过MQTT协议接入EMQ服务器,帮助您快速搭建IIoT计算方案。
1. IG902初始化配置
1)开启边缘计算安装相应APP device_supervisor并启动APP。
2)添加设备选择右侧栏目设备监控-》设备列表右上角操作中的“ ”号添加设备。
3)添加变量
2.模拟传感器数据上传:
1)使用软件modbus Slave
2) 添加模拟数据
3)启动数据发送connection-》connect
3.配置设备云服务及搭建EMQ服务器
1)在IG Web端查看接收数据
4.搭建EMQ服务器
2)以Linux系统为例注册账户下载免费的license 文件,替换etc目录下的对应文件。
3)安装目录bin目录下启动系统 通过./empx start命令启动。
4)浏览器访问http://服务器地址:18083/#/login 输入用户名和密码用户名admin 密码public登录管理页面
5.EMQ平台配置用户名认证
1)关闭匿名登录
配置匿名认证开关:
# 修改etc/emqx.conf
## Value:
true | false
allow_anonymous=
false
方式一:
修改etc/plugins文件夹下的
emqx_auth_username.conf文件
# etc/plugins/emqx_auth_username.conf
auth.user.password_hash = plain
## 第一组认证数据
auth.user
.1.username = admin
auth.user
.1.password =
public
## 第二组认证数据
auth.user
.2.username = test
auth.user
.2.password = test
启动认证服务
./emqx_ctlplugins loademqx_auth_username或在web界面开启服务
重启emqx服务
./bin/emqx stop
./bin/emqx start
查看运行状态
./bin/emqx_ctlstatus
方式二: 1)安装mysql 数据库
mysql安装
检查系统本身是否有预装的mysql
rpm -qa |
grep mysql
#检查是否安装了mysql
rpm -qa | grep mariadb #检查是否安装了mariadb
rpm -e xxx#一般使用此命令即可卸载成功
rpm -e --nodeps xxx #卸载不成功时使用此命令强制卸载)
安装
wget -i -c http://dev.mysql.com/get/mysql57-community-release-el7-10.noarch.rpm
//下载mysql的rpm的包
yum -y installmysql57-community-release-el7-10.noarch.rpm
yum -y installmysql-community-server
2)创建相关表格:
>mysql –u root –p
createdatabase mqtt;
CREATETABLE
`mqtt_user`(
`id`int(
11) unsignedNOTNULLAUTO_INCREMENT,
`username`varchar(
100) DEFAULTNULL,
`password`varchar(
100) DEFAULTNULL,
`salt`varchar(
35) DEFAULTNULL,
`is_superuser`tinyint(
1) DEFAULT
0,
`created`datetime DEFAULTNULL,
PRIMARY KEY(
`id`),
UNIQUEKEY
`mqtt_username`(
`username`)
) ENGINE=InnoDBDEFAULTCHARSET=utf8mb4;
3) 修改文件
# etc/plugins/emqx_auth_mysql.conf
## 服务器地址
auth.mysql.server = 127.0.0.1:3306
## 连接池大小
auth.mysql.pool = 8
auth.mysql.username = mysql数据库名
auth.mysql.password = 数据库密码
auth.mysql.database = mqtt数据库
auth.mysql.query_timeout = 5s
4)启动服务mysql认证服务emqx_ctl plugins load emqx_auth_mysql 或通过web管理端插件开启
注意:如果服务未启动查看报错内容解决。1.配置文件出错(检查后修改)。2.mysql未开启远程管理权限
5)添加测试用户
mysql -u root -p
usemysql;
updateusersethost=’%’ whereuser=‘root’ andhost=‘localhost’;
添加认证用户test密码test
INSERTINTO
`mqtt_user`(
`username`,
`password`,
`salt`)
VALUES
(
'test',
' f2ca1bb6c7e907d06dafe4687e579fce76b37e4e93b7605022da52e6ccc26fd2', NULL);
密码采用sha256加密
7)点击connect,提示连接成功
8)IG连接需要添加用户名密码
9)编辑发布脚本
相关发布脚本参考链接:
https://read-ts-en.readthedocs.io/en/latest/PLC%20Supervisor%E7%94%A8%E6%88%B7%E6%89%8B%E5%86%8C.html#mqtt
6.配置客户端数据存储到Mysql
1)手动新建数据表,或根据附件mqtt.sql导入数据库。
修改文件emqx_backend_mysql.conf
# etc/plugins/emqx_backend_mysql.conf
## 服务器地址
auth.mysql.server=127.0.0.1:3306
## 连接池大小
auth.mysql.pool=8
auth.mysql.username=mysql数据库名
auth.mysql.password=数据库密码
auth.mysql.database=mqtt数据库
auth.mysql.query_timeout=5s
2)建立相关表格MySQL设备在线状态表
mqtt_client 存储设备在线状态:
DROPTABLEIFEXISTS
`mqtt_client`;
CREATETABLE
`mqtt_client`(
`id`int(
11) unsignedNOTNULLAUTO_INCREMENT,
`clientid`varchar(
64) DEFAULTNULL,
`state`varchar(
3) DEFAULTNULL,
`node`varchar(
64) DEFAULTNULL,
`online_at`datetime DEFAULTNULL,
`offline_at`datetime DEFAULTNULL,
`created`timestampNULLDEFAULTCURRENT_TIMESTAMP,
PRIMARY KEY(
`id`),
KEY
`mqtt_client_idx`(
`clientid`),
UNIQUEKEY
`mqtt_client_key`(
`clientid`),
INDEXtopic_index(
`id`,
`clientid`)
) ENGINE=InnoDBDEFAULTCHARSET=utf8MB4;
1 rows in set(
0.
00sec)
MySQL主题订阅表
mqtt_sub 存储设备的主题订阅关系:
DROPTABLEIFEXISTS
`mqtt_sub`;
CREATETABLE
`mqtt_sub`(
`id`int(
11) unsignedNOTNULLAUTO_INCREMENT,
`clientid`varchar(
64) DEFAULTNULL,
`topic`varchar(
180) DEFAULTNULL,
`qos`tinyint(
1) DEFAULTNULL,
`created`timestampNULLDEFAULTCURRENT_TIMESTAMP,
PRIMARY KEY(
`id`),
KEY
`mqtt_sub_idx`(
`clientid`,
`topic`,
`qos`),
UNIQUEKEY
`mqtt_sub_key`(
`clientid`,
`topic`),
INDEXtopic_index(
`id`,
`topic`)
) ENGINE=InnoDBDEFAULTCHARSET=utf8MB4;
MySQL消息存储表
mqtt_msg 存储MQTT 消息:
DROPTABLEIFEXISTS
`mqtt_msg`;
CREATETABLE
`mqtt_msg`(
`id`int(
11) unsignedNOTNULLAUTO_INCREMENT,
`msgid`varchar(
64) DEFAULTNULL,
`topic`varchar(
180) NOTNULL,
`sender`varchar(
64) DEFAULTNULL,
`node`varchar(
64) DEFAULTNULL,
`qos`tinyint(
1) NOTNULLDEFAULT
'0',
`retain`tinyint(
1) DEFAULTNULL,
`payload`blob,
`arrived`datetime NOTNULL,
PRIMARY KEY(
`id`),
INDEXtopic_index(
`id`,
`topic`)
) ENGINE=InnoDBDEFAULTCHARSET=utf8MB4;
MySQL保留消息表
mqtt_retain 存储retain 消息:
DROPTABLEIFEXISTS
`mqtt_retain`;
CREATETABLE
`mqtt_retain`(
`id`int(
11) unsignedNOTNULLAUTO_INCREMENT,
`topic`varchar(
180) DEFAULTNULL,
`msgid`varchar(
64) DEFAULTNULL,
`sender`varchar(
64) DEFAULTNULL,
`node`varchar(
64) DEFAULTNULL,
`qos`tinyint(
1) DEFAULTNULL,
`payload`blob,
`arrived`timestampNOTNULLDEFAULTCURRENT_TIMESTAMP,
PRIMARY KEY(
`id`),
UNIQUEKEY
`mqtt_retain_key`(
`topic`),
INDEXtopic_index(
`id`,
`topic`)
) ENGINE=InnoDBDEFAULTCHARSET=utf8MB4;
select* frommqtt_retain wheretopic =
"retain";
MySQL消息确认表
mqtt_acked 存储客户端消息确认:
DROPTABLEIFEXISTS
`mqtt_acked`;
CREATETABLE
`mqtt_acked`(
`id`int(
11) unsignedNOTNULLAUTO_INCREMENT,
`clientid`varchar(
64) DEFAULTNULL,
`topic`varchar(
180) DEFAULTNULL,
`mid`int(
11) unsignedDEFAULTNULL,
`created`timestampNULLDEFAULTNULL,
PRIMARY KEY(
`id`),
UNIQUEKEY
`mqtt_acked_key`(
`clientid`,
`topic`),
INDEXtopic_index(
`id`,
`topic`)
) ENGINE=InnoDBDEFAULTCHARSET=utf8MB4;
3)启动服务mysql存储方法一:emqx_ctl plugins load emqx_backend_mysql方法二:通过插件启动对应插件emqx_backend_mysql
4)通过mysql查看客户端发送数据
7.数据显示通过软件Grafana
2)设置相关参数
3)测试连接mysql是否可以连接
4)创建视图表格
5)编辑表格
6)设置数据库及查询语句。
8.尝试修改上传值
查看传输的数据
9.EMQ平台下发数据到客户端
1)编辑订阅
2)填写代码
import logging
import json
defctl_test(topic, payload, wizard_api): #定义订阅主函数
logging.info(topic) #打印订阅主题,假定topic为write/plc
logging.info(payload) #打印订阅数据,假定payload数据为{
"method":
"setValue",
"TagName":
"SP1",
"TagValue":
12.3}
payload = json.loads(payload) #反序列化订阅数据
ifpayload[
"method"] ==
"setValue": #检测是否为写入数据
message = {payload[
"TagName"]:payload[
"TagValue"]} #定义下发消息,包括下发的变量名称和变量值
wizard_api.write_plc_values(message) #调用wizard_api模块中的write_plc_values方法,将message字典中的数据下发至指定变量
3)EMQ服务器端连接websocket
4)发布信息到客户端,填写对应客户端订阅主题,及修改字段{"method":"setValue", "TagName":"字段名", "TagValue":修改值为}此处修改客户端字段temperature修改值为12.3此处输入{"method":"setValue", "TagName":"temperature", "TagValue":12.3} 点击发布按钮
查看值是否修改(modbus中值和变量列表中值都被修改)
相关日志
扩展 python实现对数据的修改
import paho.mqtt.client
as mqtt
# 连接成功回调
defon_connect(client, userdata, flags, rc):
print(
'Connected with result code
'str(rc))
client.subscribe(
'to_service')
# 消息接收回调
defon_message(client, userdata, msg):
print(msg.topic
" "str(msg.payload))
client = mqtt.Client()
# 指定回调函数
client.on_connect = on_connect
client.on_message = on_message
# 建立连接
client.username_pw_set('用户名
','密码')
client.connect('服务器地址',
1883,
60)
# 发布消息修改对象修改值
client.publish(
'to_client',payload='{
"method":
"setValue",
"TagName":
"T1",
"TagValue":
12}',qos=
1)
client.loop_forever()
运行程序查看数据变化
其它:
相关启动命令
启动mysql: systemctl start mysqld
启动EMQ: ./emqx/bin/emqx start
启动grafana:service grafana-server start
如果访问服务器IP:端口无法访问的情况
1.检查服务是否开启
2.centos下防火墙开启相关接口
查看命令
开启相关端口例:开启tcp协议端口3306 firewall-cmd --permanent --add-port=3306/tcp