范文健康探索娱乐情感热点
热点动态
科技财经
情感日志
励志美文
娱乐时尚
游戏搞笑
探索旅游
历史星座
健康养生
美丽育儿
范文作文
教案论文

MQTT协议客户端开发入门指北

  引言
  Hello 大家好,这里是Anyin。
  前段时间因为工作涉及到和硬件设备打交道,做了一些MQTT相关的工作。今天在这里也做下简单的分享。 基础概念
  在做相关开发工作之前,我们先需要了解下什么是MQTT。
  MQTT是一个轻量的发布订阅模式消息传输协议,专门针对低带宽和不稳定网络环境的物联网应用设计。
  • MQTT官网[1]
  • MQTT V3.1.1协议规范[2]
  MQTT协议具有如下特点:
  •开放消息协议,简单易实现
  •发布订阅模式,一对多消息发布
  •基于TCP/IP网络连接
  •1字节固定报头,2字节心跳报文,报文结构紧凑
  •消息QoS支持,可靠传输保证
  MQTT主要的应用场景:
  •物联网M2M通信,物联网大数据采集
  •Android消息推送,WEB消息推送
  •移动即时消息,例如Facebook Messenger
  •智能硬件、智能家具、智能电器
  •车联网通信,电动车站桩采集
  •智慧城市、远程医疗、远程教育
  •电力、石油与能源等行业市场
  更多详细信息可以查看官网。这里就不复述了。
  对于MQTT服务端的安装,我们这里使用 EMQX  , 其官网地址:https://www.emqx.io/zh实现一个MQTT客户端
  当我们 EMQ  服务端安装之后,我们就可以进行编码我们的MQTT客户端,用于接收设备端的消息或者给设备端发送消息,整个过程都是异步  的。
  1. pom.xml   添加依赖               org.eclipse.paho          org.eclipse.paho.client.mqttv3          ${mqtt.version}      
  2.封装 MqttClient  实例 在步骤1,我们依赖了一个mqtt的第三方库,为了以防后续可能替换其他第三方库,我们需要对其MqttClient  进行一个简单的封装。
  新增一个 MQTTClient  类。    /**      * 实例化MQTT 实例      * @param properties 配置信息      * @param factory 扩展点工厂      */     public MQTTClient(MQTTProperties properties,                       IExtensionHandlerFactory factory,                       List subscribeTopics) throws MqttException {         if(CollectionUtils.isEmpty(subscribeTopics)){             throw new CommonBusinessException("-1","订阅列表不能为空");         }         this.subscribeTopics = subscribeTopics;         this.properties = properties;         this.factory = factory;         this.clientId = "SERVER_" + RandomUtil.randomString(8);         this.init();     }
  • MQTTProperties   是MQTT的相关配置熟悉
  • IExtensionHandlerFactory   扩展点工厂组件,在接收消息的时候,需要根据不同的指令进行业务处理,所以需要这个组件
  • List   客户端需要订阅设备端的topic列表
  接着,我们看看 init  方法。    /**      * 初始化MQTT Client 实例      */     public void init() throws MqttException {         String broker = "tcp://" + properties.getHost() + ":" + properties.getPort();         MemoryPersistence persistence = new MemoryPersistence();         try {             if(client == null){                 client = new MqttClient(broker, clientId, persistence);             }             MqttConnectOptions connOpts = this.getOptions();             if (client.isConnected()) {                 client.disconnect();             }             client.connect(connOpts);             client.setCallback(new MQTTSubscribe(this, factory, subscribeTopics));             // 订阅路径             client.subscribe(this.getSubscribeTopicList());         }catch (MqttException ex) {             log.error("mqtt服务初始化失败: {}", ex.getMessage(), ex);             throw ex;         }         log.info("mqtt服务连接成功");     }
  这里主要处理了客户端实例连接服务器的一些操作,主要有设置参数 connOpts  、设置接收消息的回调setCallback  ,设置订阅设备端的消息主题subscribe  。
  这里有地方需要特别注意下,在进行连接的时候之前,做了一个 client.isConnected()  的判断,如果连接的状态,则需要手动的断开连接client.disconnect()  。这里主要是为了做重连的时候,能够确保客户端是断开连接的状态,然后再进行重连。
  客户端连接的逻辑处理了,我们还需要处理下发送消息的逻辑,简单的封装下即可。     /**      * 发送消息      * @param path path      * @param deviceId 设备ID      * @param content 发送内容      */     public void publish(String path, String deviceId, byte[] content){         try {             MqttMessage message = new MqttMessage(content);             message.setQos(properties.getQos());             String topic = path + deviceId;             client.publish( topic, message);         }catch (Exception ex){             log.error("mqtt服务发送消息失败: deviceId: {}  {}",deviceId, ex.getMessage(), ex);         }     }
  3.处理订阅的消息
  基本的客户端实例化我们已经处理完了,接着需要处理上行的消息(就是订阅的消息)。
  对于不同厂商上来的业务消息可能不一样,有可能是 MQTT协议  包含着JSON的字符串的业务数据,也有可能是MQTT协议包含的是二进制的私有协议  。
  为了抽象上行的消息,我们定义了2个接口,分别抽象上行消息的整包对象和上行消息的某个指令。上行消息的整包对象就是从订阅接口返回的完整的 byte[]  数据包;而上行消息的某个指令是指在这个完整的数据包内肯定会有某个字段指明本次消息是属于什么业务的,可能是心跳、可能是状态等等。
  分别新增一个 MQTTProtocol  和Cmd  类。@Data public abstract class MQTTProtocol {     /**      * 设备ID      */     private String deviceId;       /**      * 消息唯一序号      */     private String msgId;     /**       *  具体某个业务的指令      */     private Cmd cmd; }   public interface Cmd {     /**      * 指令类型,上行的指令或者是下行的指令      */     CmdTypeEnum getCmdType();     /**      * 指令发送的目标topic      */     Topic getTopic(); }
  接着,我们再新增一个协议的处理器接口: MQTTProtocolHandler  public interface MQTTProtocolHandler extends IExtensionHandler {       String getDeviceId(String topic, byte[] payload);     /**      * 解码      * @param payload 原始数据      * @return 协议      */     T decode(byte[] payload);       /**      * 校验      * @param protocol 解析出来的基础协议      * @param payload 原始数据      * @return true 通过  false 不通过      */     boolean check(T protocol, byte[] payload);       /**      * 编码      * @param protocol 协议      * @param data 业务数据      * @return 编码数据      */     byte[] encode(T protocol, byte[] data);       /**      * 业务处理      * @param protocol 协议      */     byte[] handle(T protocol);       /**      * 错误响应      * @param protocol 协议      */     byte[] error(T protocol); }
  这个接口把整个消息的处理过程分为5个步骤:解码、校验、编码、业务处理、错误响应。该接口是一个扩展点,扩展点的枚举类是: BusinessType  ,它表示业务类型,即使不同的业务,可能会不同的编解码和处理规则。例如:JSON的数据和二进制的私有协议,它们的编解码就不一样。
  然后,我们再看看当接收到消息的时候,我们如何使用这个扩展点进行业务逻辑处理。 @Override     public void messageArrived(String subscribeTopic, MqttMessage message) throws Exception {         try {             // 根据topic解析不同的业务类型             BusinessType businessType = this.matchBusinessTypeBySubscribeTopic(subscribeTopic);             // 根据业务类型拿到具体的协议处理器             MQTTProtocolHandler protocolHandler = extensionHandlerFactory.getExtensionHandler(businessType, MQTTProtocolHandler.class);            // 获取设备ID             String deviceId = protocolHandler.getDeviceId(subscribeTopic, message.getPayload());             // 整包协议解码             MQTTProtocol protocol = protocolHandler.decode(message.getPayload());             if (protocol == null) {                 log.error("协议解析异常,无法进行应答");                 return;             }             // 指令             Cmd cmd = protocol.getCmd();             if(cmd == null){                 log.error("解析后指令为空,无法进行应答");                 return;             }             // 设置基础信息             protocol.setMsgId(String.valueOf(message.getId()));             protocol.setDeviceId(deviceId);               // 校验             boolean success = protocolHandler.check(protocol, message.getPayload());             if(!success){                 this.errorHandle(protocolHandler, protocol, cmd.getTopic());                 return;             }               try {                 // 业务处理                 byte[] result = protocolHandler.handle(protocol);                   // 应答                 if(CmdTypeEnum.DOWN == cmd.getCmdType()){                     log.info("下行消息,无需应答");                     return;                 }                 Topic topic = cmd.getTopic();                 if(topic == null){                     log.error("上行消息的发布Topic为空,无需进行应答");                     return;                 }                 // 编码后进行应答                 byte[] content = protocolHandler.encode(protocol, result);                 client.publish(topic.getTopic(), deviceId, content);             } catch (Exception ex) {                 log.error("业务逻辑处理异常: {}, 原始数据:{}", ex.getMessage(),  ByteUtil.byte2Str(message.getPayload()), ex);                 this.errorHandle(protocolHandler, protocol, cmd.getTopic());             }         }catch (Exception ex){             log.error("unknown error: {}, 原始数据:{}", ex.getMessage(),  ByteUtil.byte2Str(message.getPayload()), ex);         }     }
  4.处理需要发送的消息
  该步骤3,我们处理的是上行的消息,会涉及到解码、业务处理、编码、应答等步骤。接着我们需要处理发送的消息,即下行的消息。
  下行的消息处理会比较简单,只要拿到对应的 MQTTClient  实例和协议处理器即可编码之后,然后进行下发消息@Slf4j public class MQTTPublish {     private MQTTClient client;     private MQTTProtocolHandler protocolHandler;     public MQTTPublish(MQTTClient client, MQTTProtocolHandler protocolHandler) {         this.client = client;         this.protocolHandler = protocolHandler;     }     public void publish(MQTTProtocol protocol, byte[] data){         byte[] content = protocolHandler.encode(protocol, data);         String deviceId = protocol.getDeviceId();         String topic = protocol.getCmd().getTopic().getTopic();         client.publish(topic, deviceId, content);     } }
  以上的代码只能处理异步的下行协议,在某些场景下,下行协议还需要等待设备端的应答。那这个时候这段代码就无法满足需求。
  所以,我们还需要对这段代码再封装。我们设计一个扩展点,不同的业务类型具有不同的发送逻辑 public interface MQTTPublishHandler extends IExtensionHandler {      T handle(C cmd, Class clazz); }
  接着处理其实现类。 @Override     public  T handle(C cmd, Class clazz) {         CmdEnum cmdEnum = CmdEnum.get(cmd.getCmd());         // 编码         EncodeCmdHandler handler = factory.getExtensionHandler(cmdEnum, EncodeCmdHandler.class);         byte[] data = handler.encode(cmd);           // 根据业务类型,拿到具体的协议处理器         MQTTProtocolHandler protocolHandler = factory.getExtensionHandler(BusinessType.CHARGING, MQTTProtocolHandler.class);         MQTTPublish publish = new MQTTPublish(client, protocolHandler);         Long serial = this.getSerial(cmd.getDeviceId());           // TODO 这里是具体的实现类,需要具体业务实现         ChargingMQTTProtocol protocol = new ChargingMQTTProtocol();         protocol.setSerial(serial.shortValue());         protocol.setDeviceId(cmd.getDeviceId());         protocol.setVersion("10");         protocol.setMac(cmd.getDeviceId());         protocol.setCode(cmd.getCmd());         protocol.setCmd(cmd);         publish.publish(protocol, data);           // 阻塞应答         RedisMessageTask task = new RedisMessageTask();         RedisMessageListener listener = new RedisMessageListener(task);         try {             // 配置RedisKey             String key = MQTTRedisKeyUtil.callbackKey(cmd.getTopic().getTopic(), cmd.getDeviceId(), serial);             ChannelTopic topic = new ChannelTopic(key);             redisMessageListenerContainer.addMessageListener(listener, topic);             // 同步阻塞             Message message = (Message)task.getFuture().get(60000, TimeUnit.MILLISECONDS);             return JsonUtil.fromJson(message.toString(), clazz);         }catch (Exception ex){             log.error("消息获取失败: {}", ex.getMessage(), ex);             throw new CommonBusinessException("-1", "Redis应答失败: " + ex.getMessage());         } finally {             redisMessageListenerContainer.removeMessageListener(listener);         }     }
  这里是使用Java的 CompletableFuture  类进行异步阻塞的。另外我们通过使用Redis的MQ机制,在Redis实现一个监听,当有上行的消息是作为下行的应答的时候,则通过StringRedisTemplate#convertAndSend  发送消息,监听收到消息后,设置到CompletableFuture  中进行应答。
  RedisMessageTask   会持有一个CompletableFuture实例和RedisMessageListener的引用。它的代码如下:@Data public class RedisMessageTask{     private CompletableFuture future = new CompletableFuture<>();     // Redis的监听器     private RedisMessageListener listener;   }
  RedisMessageListener  持有RedisMessageTask  的引用,在收到消息的时候,把消息设置到CompletableFuture  中,则CompletableFuture  实例的阻塞就会收到应答。public class RedisMessageListener implements MessageListener {     private RedisMessageTask task;     public RedisMessageListener(RedisMessageTask task) {         this.task = task;     }     @Override     public void onMessage(Message message, byte[] bytes) { ;         task.getFuture().complete(message);     } }
  最后,在上行应答的时候发送消息即可。 stringRedisTemplate.convertAndSend(key, JsonUtil.toJson(data));最后
  好了,以上就是前端时间接触MQTT相关内容的一些笔记。相关代码因为在实现的过程中,未做更细致的设计和解耦,部分还是和业务耦合。但是后续也会整理成一个Lib包放在 Anyin Cloud[3] 项目中,敬请期待。
  如果您觉得文章对您有帮助,帮忙点关注哈References
  [1] MQTT官网:  http://mqtt.org
  [2] MQTT V3.1.1协议规范:  http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html
  [3] Anyin Cloud:  https://gitee.com/anyin/anyin-cloud

这才叫学生手机,海信F50妈妈再也不用担心我沉迷手机了提到海信,相信大家都不会陌生,但在手机市场,海信手机远没有排名前几的厂商亮眼,但却一直有新款机型推出,该品牌旗下最具代表性的便是水墨屏彩墨屏手机。而本篇文章要跟大家聊的是,该品牌的印度向中国抛出橄榄枝,将2。6亿订单送给华为,我国会接受吗?前段时间,中印边境对峙事件在国际上闹的沸沸扬扬,而最终双方经过谈判,此事才得到平息。就在边境事件得到缓和之后,印度的巴帝电公司公开对外表示,要将一份2。6亿的电信基础扩张订单交给中初春就要一起轻薄!共赏vivoS9双色美图气温回升,当我们脱下厚重的外套,不免觉得手里的手机不合时宜从配色到重量都太过沉重的5G手机显然不是初春的最佳伴侣。近日,vivo最新的轻薄自拍旗舰vivoS9如约而至,不仅为追求潮央视315晚会曝光互联网医疗虚假广告保健品伪装药品,半数医美机构涉虚假宣传蓝鲸产经3月15日讯,中央广播电视总台联合国家政府部门为维护消费者权益举办的315晚会,于今晚8点正式开始,2021年315晚会的主题为提振消费从心开始。医疗虚假广告成为互联网虚假被迫放弃热爱的华为,这些原因,是你的烦恼么我觉得我是华为的忠粉,给家人和自己都用的华为,但是现在,我可能要放弃了,原因也很无奈,不知道有没有宝妈跟我有共鸣。下面是我的两点无奈1。华为手机在设计的时候可能没考虑过宝妈的感受,数字人民币发展迅猛,IPFSFilecoin成为风口数字人民币发展迅猛梳理2021年各地政府工作报告不难发现,数字货币成为高频词,多地不约而同布局数字人民币试点应用。北京市提出,将加快金融科技与专业服务创新示范区建设,推进数字货币试注意!前方雷区请绕行!坑你没商量的科技产品黑名单没错,看标题你就知道了,这是一年一度的315特辑,又到了揭露行业黑恶势力的时候了。根据某平台统计的结果,今晚的315晚会将重点瞄准以下六大领域开撕汽车直播带货短视频网约车教育机构手北京整顿线下校外培训,培训机构的好日子到头了?从今年1月23日开始,由于疫情管控的原因,直到现在,北京所有的线下培训机构都一直处于停摆的状态。至于什么时候可以开始恢复,没有人知道,官方也没有给出明确的通知。于是,关于北京大力整打压借口依旧老一套!美宣称华为中兴等5家中企对美国家安全构成威胁美国又在借口国家安全打压中国科技企业。据香港南华早报消息,当地时间12日,美国联邦通信委员会(FCC)下属的公共和国土安全局发布声明,宣称华为中兴等五家中国公司是对美国国家安全的威微软春季Surface活动将推出全新网络摄像头满足在家办公需求消息称微软计划在今年春季召开一场Surface主题活动,有望推出SurfaceLaptop4。外媒Petri编辑BradSams最新报告称,这次活动可能还会推出一些外设产品,尤其是刘强东的喜和忧作者老谢,财经作家在中国的互联网公司中,有把公司带入绝路的,比如ofo的戴威有把行业带入绝路的,比如罗永浩老师。但在公司陷入困境后,又能开着飞机换引擎让公司重新飞起来的,满打满算就
现在什么app最火?我不能确定什么app最火,我只能这样说我的手机上的软件都是我喜欢的软件,我只能把我手机上软件推荐给你们。我手机上的软件有许多然后呢?我手机上软件也是由许多的漫画,我差不多我这个人挺除湿机在家用机器方面的市场占有率有多少?目前全球除湿机市场还处在起步发展阶段,全球除湿机的主要产地集中在澳大利亚意大利日本和中国台湾等地。2016年家用除湿机的销量达到6110。1千台,预测到2025年全球的家用除湿机将全球气体行业正迅猛发展,未来几年工业气体行业发展如何?工业气体被喻为工业的血液。随着经济的发展,工业气体作为经济基础工业要素之一,在未来发展中需求会越来越大。工业气体是一种瓶装压缩液态气体,在常温常压下呈气态,是工业的血液。随着中国经黑莓正式停止系统运营手机秒变砖今天,黑莓层打造的OS系统正式停止运营,搭载该系统的所有黑莓手机都无法使用了,电话无法拨打短信无法接发,并且连紧急报警应急电话都无法拨通,一些专用的程序也都无法使用。而对于黑莓手机华为变身代工商,为中国移动生产自家品牌新机美国制裁导致华为推出5G手机变得相当困难,但华为的手机业务未有因此停止,更开拓了为其他品牌代工的业务。早前我们分享过他们代工的Hinova9手机,如今就连中国最大的通讯商中国移动也你认为今日头条在广大的百姓中还有很大推广空间吗?有!头条是个最大众化的平台,雅俗共赏,老少皆宜。高端大v们,大咖们,被它抓进来了(肯出高薪独家专访)。普通百姓们,被他吸进来了,可赏赐点小钱儿花花,门槛低,易迈进。婆婆妈妈,柴米油司机要下岗了吗?前几年吃铁饭碗的高速公路收费员下岗了,或许他们自己也没想到吧。现在据说司机有可能也要下岗了?这个怎么说呢?最近感觉科技变化的越来越快了,这几年特别火的无人驾驶虽然各种新闻不断,频道60008000元才是多数货车司机的真实收入对于货车司机而言,月收入多少才算是正常水平?据中国物流与采购联合会发布的2021年货车司机从业状况调查报告(以下简称报告)显示,月均收入在500010000元的货车司机合计占比最多家里买了电视盒子还有必要保留有线电视吗?电视可以说是每个家庭必备的家用产品之一,尤其是在后疫情时代待在家中的时间也变多了,那么使用电视的频率也相对增加,根据相关数据显示,2021年第一季度,在103个城市推及的全国电视收如何选择性价比高的打印机?只要记住这么几点前几天写了一篇关于亲戚朋友买打印机的文章,想不到大家反应很激烈。这里边除了人情世故的内容,我想最多的还是说明,打印机已经从办公需求,慢慢地成了大多数家庭的需求,特别是家里有学生的。今日科技热点华为突然开卖5G手机,怎么回事?近日,有不少网友发现,华为Mate40Pro5G手机居然有货了。京东淘宝官网都有。该手机于2020年10月22日发布,搭载5nm工艺的麒麟9000芯片。集成了153亿个晶体管,八核