Protel|AD|DXP论坛
直播中

郎晓

6年用户 3经验值
擅长:上位机
私信 关注
[经验]

物联网分布式架构[java实现]支持TCP/MODBUS转TCP

最近一直在做物联网相关的平台开发,好多朋友来咨询设计上的思路,我在这里总结了一下我在这方面的一些积累,供有需要的朋友参考,如果有更好的设计思路,欢迎一起探讨。QQ:66075945.


           首先是技术选型,我采用的是Apache Mina,Spring MVC,Redis,ActiveMQ 。

           ApacheMINA是一个网络应用程序框架,用来帮助用户简单地开发高性能和高可扩展性的网络应用程序。它提供了一个通过Java NIO在不同的传输例如TCP/IP和UDP/IP上抽象的事件驱动的异步API。具体查看Mina的技术文档。   

  架构中涉及了数据的接收,设备的远程控制,如下图。

         

          设备的数据通过协议接口识别协议类型,并转发到各个协议解析组件,在组件中会记录会话的session,供远程控制使用。为使协议解析组件保持无状态属性,在此处使用REDIS保持会话状态。

          远程控制使用ActiveMQ[或者使用Mina来传输指令]传递消息至指令转发模块,指令转发模块作为消息的消费者,收到消息后转发给相应的Session,通过此通道下发给设备,实现数据的双向通信

          下面介绍一下平台的关键技术点。

          1.协议识别组件的实现;

          2.协议解析组件的热插拔;

          3.分布式设计;


       一、协议识别组件的实现

         平台支持同时解析多协议的功能,需要识别每个协议簇,发送到特定的协议解码器。这就需要用到Mina提供的自定义解码器功能,

            acceptor.getFilterChain().addLast("codec",

                            new ProtocolCodecFilter(new MessageCodecFactory(Charset.forName("UTF-8"))));

          在MessageCodecFactory中,可以对应各个协议簇注册所属的解码器,如下所示:

           addMessageDecoder(tcpDecoder);
   addMessageEncoder(AbstractMessage.class, tcpEncoder);

           addMessageDecoder(modbusDecoder);
   addMessageEncoder(AbstractMessage.class, modbusEncoder);


          当服务器收到发来的消息后,将循环所有的解码器,如果某一解码器返回MessageDecoder.OK,说明本解码器是合适的解码器,消息将被发送到对应的解码器进行解码。解析出协议的包头,命令号等,以备后续处理。

        二、协议解析组件的热插拔

           上面协议识别组件仅仅是识别出了协议簇,以及哪个协议;具体的业务逻辑还需要交由协议解析组件来处理。在这里我引入了Spring来管理协议解析组件。系统启动后注册协议解析组件,通过getBeansOfType查找到对应的协议解析组件来处理协议逻辑。

具体代码:


  •         协议接收转发

         /**
* 数据通过此接口转发到各个解析模块
*/
@Override
public void onMsg(AbstractMessage message, IoSession session) {
logger.info("onMsg() cmd = " + message.getCmd() + ", protocolType = " + message.getProtocolType() );
ApplicationContextUtil.callMessageRequestProvider(message, session);
}


  •        查找门面  

public  static voidcallMessageRequestProvider(AbstractMessage message, IoSession session){
MessageManagerLogicHandler handler = null;
Map map = context.getBeansOfType(MessageManagerFacade.class);
for(Map.Entry entry : map.entrySet()){
MessageManagerFacade facade = entry.getValue();
if(facade.getFacadeMap() != null){
handler = (MessageManagerLogicHandler) facade.getFacadeMap().get(message.getProtocolType());
if(handler != null){
handler.doExec(message, session);
}
}
}
}

        上面代码中通过协议簇类型ProtocolType找到具体的协议簇注册类,然后分发给具体的业务逻辑处理类。如下所示:

         @PostConstruct
public void registry() {
logger.info("====================TcpMessageFacade Registry=======================");
// TCP协议解析组件
this.facadeMap.put(TcpNetCmd.LOGIN_ID, loginMessageClientHandler);
this.facadeMap.put(TcpNetCmd.READ_DATA_ID, dataGetMessageHandler);
this.facadeMap.put(TcpNetCmd.SET_TIME_ID, timeSetResponseHandler);
}

        三、分布式设计

            目前的架构设计,在无重的IO情况下,单台服务器可以支持8000+设备并发。若需接入更多的设备,采用分布式设计,则需要使门户系统变为无状态,通常的解决方案就是Redis,ActiveMQ等。


         本篇文章到此就结束了,欢迎交流指正!


更多回帖

发帖
×
20
完善资料,
赚取积分