范文健康探索娱乐情感热点
投稿投诉
热点动态
科技财经
情感日志
励志美文
娱乐时尚
游戏搞笑
探索旅游
历史星座
健康养生
美丽育儿
范文作文
教案论文
国学影视

RocketMQ源码分析之RemotingCommand网络通信协议源码分析

  一、前言
  在分析NameServer的请求和响应流程之前我们需要先看一下他的序列化协议是怎样的,RocketMQ支持的序列化协议有以下2种:JSON;RocketMQ自定义的协议;
  json进行序列化其实是省力做法,效率是比较差的,序列化以后的数据格式是比较占用空间,一般成熟的中间件项目一般都会采用自定义的方式进行序列化和反序列化;二、RemotingCommand源码分析
  RemotingCommand为RocketMQ中自定义协议组件,其中包含了序列化和反序列化代码逻辑;
  但是不向服务直接提供调用,而是通过前文讲解的NettyRemotingServer 类中的NettyEncoder (编码器)和NettyDecoder (解码器)进行具体的调用;
  序列化:就是将一段字节数组以固定的顺序的形式存放数据,第一个字节存放什么,后面4个字节存放什么,再后面几个字节存放什么;
  反序列化:就是以固定的顺序取数据,你第一个字节存放的是消息的标志位,那你取出来就是消息的标志位,再后面4个为消息体的长度,那取出来就是消息体的长度,你再可以根据消息体的长度去获取对应长度字节的数据;1、数据模型public class RemotingCommand {      public static final String SERIALIZE_TYPE_PROPERTY = "rocketmq.serialize.type";     public static final String SERIALIZE_TYPE_ENV = "ROCKETMQ_SERIALIZE_TYPE";     public static final String REMOTING_VERSION_KEY = "rocketmq.remoting.version";     private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);     private static final int RPC_TYPE = 0; // 0, REQUEST_COMMAND     private static final int RPC_ONEWAY = 1; // 0, RPC     private static final Map, Field[]> CLASS_HASH_MAP =         new HashMap, Field[]>();     private static final Map CANONICAL_NAME_CACHE = new HashMap();     // 1, Oneway     // 1, RESPONSE_COMMAND     private static final Map NULLABLE_FIELD_CACHE = new HashMap();     private static final String STRING_CANONICAL_NAME = String.class.getCanonicalName();     private static final String DOUBLE_CANONICAL_NAME_1 = Double.class.getCanonicalName();     private static final String DOUBLE_CANONICAL_NAME_2 = double.class.getCanonicalName();     private static final String INTEGER_CANONICAL_NAME_1 = Integer.class.getCanonicalName();     private static final String INTEGER_CANONICAL_NAME_2 = int.class.getCanonicalName();     private static final String LONG_CANONICAL_NAME_1 = Long.class.getCanonicalName();     private static final String LONG_CANONICAL_NAME_2 = long.class.getCanonicalName();     private static final String BOOLEAN_CANONICAL_NAME_1 = Boolean.class.getCanonicalName();     private static final String BOOLEAN_CANONICAL_NAME_2 = boolean.class.getCanonicalName();     private static volatile int configVersion = -1;     private static AtomicInteger requestId = new AtomicInteger(0);      private static SerializeType serializeTypeConfigInThisServer = SerializeType.JSON;      static {         final String protocol = System.getProperty(SERIALIZE_TYPE_PROPERTY, System.getenv(SERIALIZE_TYPE_ENV));         if (!isBlank(protocol)) {             try {                 serializeTypeConfigInThisServer = SerializeType.valueOf(protocol);             } catch (IllegalArgumentException e) {                 throw new RuntimeException("parser specified protocol error. protocol=" + protocol, e);             }         }     }      // code编号,请求编号     private int code;     private LanguageCode language = LanguageCode.JAVA; // 编程语言,java     private int version = 0; // 版本号     private int opaque = requestId.getAndIncrement(); // 请求id     private int flag = 0; // 标识     private String remark; // 备注     private HashMap extFields; // 扩展字段     private transient CommandCustomHeader customHeader; // 自定义header头     // 这一次rpc调用的序列化类型,默认就是json格式     private SerializeType serializeTypeCurrentRPC = serializeTypeConfigInThisServer;     // 消息体,会把真正的消息体序列化成字节数组     private transient byte[] body; }2、序列化
  org.apache.rocketmq.remoting.netty.NettyEncoder#encodepublic void encode(ChannelHandlerContext ctx, RemotingCommand remotingCommand, ByteBuf out)     throws Exception {     try {         ByteBuffer header = remotingCommand.encodeHeader();         out.writeBytes(header);         byte[] body = remotingCommand.getBody();         if (body != null) {             out.writeBytes(body);         }     } catch (Exception e) {         log.error("encode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e);         if (remotingCommand != null) {             log.error(remotingCommand.toString());         }         RemotingUtil.closeChannel(ctx.channel());     } }public ByteBuffer encodeHeader() {     return encodeHeader(this.body != null ? this.body.length : 0); }  public ByteBuffer encodeHeader(final int bodyLength) {     // 1> header length size     int length = 4;      // 2> header data length     byte[] headerData;     headerData = this.headerEncode();      length += headerData.length;      // 3> body data length     length += bodyLength;      ByteBuffer result = ByteBuffer.allocate(4 + length - bodyLength);      // length     result.putInt(length);      // header length     result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC));      // header data     result.put(headerData);      result.flip();      return result; }
  这里会去判断序列化协议的类型,json类型其实没什么好看的,JSON.toJSONString(obj, prettyFormat).getBytes(CHARSET_UTF8); 就没了,我们主要是看RocketMQ的自定义协议;private byte[] headerEncode() {     // 把自定义headers放到一个ext fields map里去     this.makeCustomHeaderToNet();     if (SerializeType.ROCKETMQ == serializeTypeCurrentRPC) {         return RocketMQSerializable.rocketMQProtocolEncode(this);     } else {         return RemotingSerializable.encode(this);     } }public void makeCustomHeaderToNet() {     if (this.customHeader != null) {         // 通过反射获取到自定义header类里面的fields         Field[] fields = getClazzFields(customHeader.getClass());         if (null == this.extFields) {             this.extFields = new HashMap();         }          // 对自定义header类的fields进行遍历         for (Field field : fields) {             if (!Modifier.isStatic(field.getModifiers())) {                 String name = field.getName();                 if (!name.startsWith("this")) {                     Object value = null;                     try {                         field.setAccessible(true);                         value = field.get(this.customHeader);                     } catch (Exception e) {                         log.error("Failed to access field [{}]", name, e);                     }                      // 自定义header这些fields都是放到ext fields里面去                     if (value != null) {                         this.extFields.put(name, value.toString());                     }                 }             }         }     } }private Field[] getClazzFields(Class<? extends CommandCustomHeader> classHeader) {     // 如果说你要是自定义了一套header以后,你搞一个类,实现接口     // 然后在这个自定义头的类里,可以定义一堆的field,这些field就是你的自定义的头     Field[] field = CLASS_HASH_MAP.get(classHeader);      if (field == null) {         // 通过反射直接获取到你自定义类里的头fields拿出来         field = classHeader.getDeclaredFields();         synchronized (CLASS_HASH_MAP) {             CLASS_HASH_MAP.put(classHeader, field);         }     }     return field; }public static byte[] markProtocolType(int source, SerializeType type) {     byte[] result = new byte[4];      result[0] = type.getCode(); // header length里一共是4个字节,第一个字节是序列化类型code     result[1] = (byte) ((source >> 16) & 0xFF); // 第二个字节开始到第四个字节,一共是3个字节都是跟header length是有关系的     result[2] = (byte) ((source >> 8) & 0xFF);     result[3] = (byte) (source & 0xFF);     return result; }
  其实自定义序列化就是搞一个byte数组,采用固定的显示进行构建。
  如:第一个字节放请求类型,后面四个字节放消息体总长度,在后面发具体的消息体。消息体前面几位为header长度,后面为header消息体等等,通过固定排列的顺序进行构建,这样解析的时候我们就可以根据字节顺序来读取消息了。public static byte[] rocketMQProtocolEncode(RemotingCommand cmd) {     // 用json进行序列化其实是省力做法,效率是比较差的,序列化以后的数据格式是比较占用空间一些     // 常规做法是自己对RemotingCommand协议数据对象进行序列化     // 编码,对象 -> 字节数组      // String remark     byte[] remarkBytes = null;     int remarkLen = 0;     if (cmd.getRemark() != null && cmd.getRemark().length() > 0) {         remarkBytes = cmd.getRemark().getBytes(CHARSET_UTF8);         remarkLen = remarkBytes.length;     }      // HashMap extFields     // ext fields,是我们可能的自定义headers就在这里,把扩展头序列化为字节数组     byte[] extFieldsBytes = null;     int extLen = 0;     if (cmd.getExtFields() != null && !cmd.getExtFields().isEmpty()) {         extFieldsBytes = mapSerialize(cmd.getExtFields());         extLen = extFieldsBytes.length;     }      // 计算出来消息头总长度     int totalLen = calTotalLen(remarkLen, extLen);      ByteBuffer headerBuffer = ByteBuffer.allocate(totalLen);     // int code(~32767)     headerBuffer.putShort((short) cmd.getCode());     // LanguageCode language     headerBuffer.put(cmd.getLanguage().getCode());     // int version(~32767)     headerBuffer.putShort((short) cmd.getVersion());     // int opaque     headerBuffer.putInt(cmd.getOpaque());     // int flag     headerBuffer.putInt(cmd.getFlag());     // String remark     if (remarkBytes != null) {         headerBuffer.putInt(remarkBytes.length);         headerBuffer.put(remarkBytes);     } else {         headerBuffer.putInt(0);     }     // HashMap extFields;     if (extFieldsBytes != null) {         headerBuffer.putInt(extFieldsBytes.length);         headerBuffer.put(extFieldsBytes);     } else {         headerBuffer.putInt(0);     }      return headerBuffer.array(); }3、反序列化
  org.apache.rocketmq.remoting.netty.NettyDecoder#decodepublic Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {     ByteBuf frame = null;     try {         frame = (ByteBuf) super.decode(ctx, in);         if (null == frame) {             return null;         }          ByteBuffer byteBuffer = frame.nioBuffer();          return RemotingCommand.decode(byteBuffer);     } catch (Exception e) {         log.error("decode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e);         RemotingUtil.closeChannel(ctx.channel());     } finally {         if (null != frame) {             frame.release();         }     }      return null; }public static RemotingCommand decode(final ByteBuffer byteBuffer) throws RemotingCommandException {     // 解码的过程就是编码过程的逆向过程     int length = byteBuffer.limit(); // 总长度     int oriHeaderLen = byteBuffer.getInt(); // 头长度     int headerLength = getHeaderLength(oriHeaderLen);      // 搞一个头长度的字节数组,一次性把headers都读出来放到字节数组里去     byte[] headerData = new byte[headerLength];     byteBuffer.get(headerData);      // 对header要做一个解码     RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen));      int bodyLength = length - 4 - headerLength;     byte[] bodyData = null;     if (bodyLength > 0) {         bodyData = new byte[bodyLength];         byteBuffer.get(bodyData);     }     cmd.body = bodyData;      return cmd; }
  这里判断header是用什么协议进行序列化的,就会使用什么协议进行反序列化;private static RemotingCommand headerDecode(byte[] headerData, SerializeType type) throws RemotingCommandException {     switch (type) {         case JSON:             RemotingCommand resultJson = RemotingSerializable.decode(headerData, RemotingCommand.class);             resultJson.setSerializeTypeCurrentRPC(type);             return resultJson;         case ROCKETMQ:             RemotingCommand resultRMQ = RocketMQSerializable.rocketMQProtocolDecode(headerData);             resultRMQ.setSerializeTypeCurrentRPC(type);             return resultRMQ;         default:             break;     }      return null; }
  我们之间看rocketMQ自定义的协议吧,其实就是一个逆向的过程,你之前放的什么,他就根据字节拿出来;public static RemotingCommand rocketMQProtocolDecode(final byte[] headerArray) throws RemotingCommandException {     RemotingCommand cmd = new RemotingCommand();     ByteBuffer headerBuffer = ByteBuffer.wrap(headerArray);     // int code(~32767)     cmd.setCode(headerBuffer.getShort());     // LanguageCode language     cmd.setLanguage(LanguageCode.valueOf(headerBuffer.get()));     // int version(~32767)     cmd.setVersion(headerBuffer.getShort());     // int opaque     cmd.setOpaque(headerBuffer.getInt());     // int flag     cmd.setFlag(headerBuffer.getInt());     // String remark     int remarkLength = headerBuffer.getInt();     if (remarkLength > 0) {         if (remarkLength > headerArray.length) {             throw new RemotingCommandException("RocketMQ protocol decoding failed, remark length: " + remarkLength + ", but header length: " + headerArray.length);         }         byte[] remarkContent = new byte[remarkLength];         headerBuffer.get(remarkContent);         cmd.setRemark(new String(remarkContent, CHARSET_UTF8));     }      // HashMap extFields     int extFieldsLength = headerBuffer.getInt();     if (extFieldsLength > 0) {         if (extFieldsLength > headerArray.length) {             throw new RemotingCommandException("RocketMQ protocol decoding failed, extFields length: " + extFieldsLength + ", but header length: " + headerArray.length);         }         byte[] extFieldsBytes = new byte[extFieldsLength];         headerBuffer.get(extFieldsBytes);         cmd.setExtFields(mapDeserialize(extFieldsBytes));     }          return cmd; }三、总结RemotingCommand为rocketMQ的序列化和反序列化的组件,所有消息都需要使用他进行处理;序列化和反序列化是根据约定的协议存放数据,再根据约定的协议取数据;

6连败!CBA昔日总冠军打回原形,广厦3驾马车爆发,赛季首次连胜4年内两次倒在总决赛之后,广厦新赛季重新扬帆起航,早早续约主力框架并签约外援,广厦的目标仍然瞄准了四强总决赛甚至是冲击总冠军。然而,广厦新赛季的起步并不算顺利,胡金秋热身赛受伤,小C919交付日正式确认,中国迈向高端制造业,以后我们直接生产技术在我国东航党组扩大会上,东航总经理表示,我国首架C919国产大飞机将于今年12月交付给东航。我国的国产大飞机即将实现商业化。(C919)我国商飞市场占比将提高在我国C919完成取证亚历山大338雷霆擒快船获赛季首胜!乔治莱昂纳德缺阵NBA常规赛10月26日继续进行,本场比赛快船这边乔治莫里斯和莱昂纳德缺阵,雷霆方面吉迪不打。最终,雷霆以10894战胜快船。雷霆也终结了自己的三连败迎来了赛季首胜。首节开始,特雷箩筐技术分享AI赋能遥感技术还有什么难点(图片源自网络)近几年来,国内外人工智能的发展和落地应用如火如荼,促成这种现象的原因可以归纳为两个关键词,即大数据与高算力。在地理空间数据分析与应用领域,这种变化也正在发生着,比如恭喜!女排又一美女领证结婚,曾是惠若琪队友,老公是男排主力最近中国女排好消息不断,先是在世锦赛里,靠着稳定的发挥用不全面的阵容拿下了第六的成绩,之后朱婷宣布回归,正式开始在意甲联赛打球!而最近国内各个俱乐部也纷纷开始迎接自己的外援球员,为辽宁队新消息丛明晨深夜就医小将卢梓杰获重用莫兰德表态辽宁队新消息三分射手深夜就医内线再获本土强援莫兰德表态!今天我们来关注一下辽宁队的最新消息,在昨天晚上结束的一场常规赛当中,辽宁队最终有惊无险的战胜了吉林队,在比赛之后呢,辽宁队也分享几款配置高的眼霜别看不上是小牌子,真心好用,关键便宜眼睛是心灵的窗户,眼部周围的肌肤状态直接决定着你看起来的年龄感以及年轻感。女孩子过了25岁以后,即使皮肤状态依然在线,眼周也都会有黑眼圈小细纹等困扰。面部衰老通常都是从脆弱的眼周开欧冠出线夜!皇马输球仍晋级,曼城多特携手出线,尤文险造大逆转欧冠联赛第5轮比赛多场比赛开打,在这个比赛日过后,关于出线权的争夺也初步明朗。各大豪门纷纷突围成功,顺利拿到了16强的席位,意甲曾经的霸主尤文则是意外翻车,在客场输给本菲卡队之后,不是失败阿雷格里,时隔9年被UCL小组赛淘汰也持肯定态度尤文图斯主教练马西米莉亚诺阿雷格里对此表示肯定。26日上午4时(韩国时间),尤文图斯在位于葡萄牙里斯本的埃斯塔迪奥达鲁兹举行的2022至23赛季欧洲足联(欧洲足联)冠军联赛(UCL中国故土贝加尔湖李健的那首歌贝加尔湖畔,我相信很多人最初听那首歌的时候,可能还不知道真的有这么一个地方。如今的贝加尔湖(baykal)汉名,源自英(俄)语的音译。这个湖泊,在中国古代曾被称作北海(中国财富报道光伏再现千亿大单视频加载中10月25日晚,大全能源公告,公司及全资子公司签订了43。2万吨原生多晶硅(太阳能级免洗硅料),合同约定2023年至2028年某客户预计共向公司及内蒙古大全采购43。2万
日韩同时回家世界杯八强无一亚洲球队!西葡争最后两个名额北京时间12月6日凌晨,卡塔尔世界杯结束了两场18决赛,日本队在点球大战中不敌克罗地亚队,被淘汰出局,韩国队14惨败巴西队,无缘晋级。至此,亚洲球队已经全部被淘汰,世界杯八强产生6全球财经连线横琴世界湾区论坛共探世界变革新趋势与湾区经济新机遇视频加载中南方财经全媒体记者赵越澳门报道放眼全球,湾区已经成为区域参与世界经济竞争的重要力量,也是高质量发展的代名词。粤港澳大湾区发展规划纲要发布实施三年多来,粤港澳三地不断凝聚各一场大火导致紫禁城1000名太监集体被辞清皇室退位后,末代皇帝及一众人等仍在故宫中居住,同时在故宫中服侍皇室的太监虽然陆陆续续遣散了一千多名,但仍有一千一百余名太监留在宫内。1923年6月间,又发生了一千名太监集体被辞事铁血中带着温柔俾斯麦为镇压工人运动,发明劳动法和社会保险1847年底,由于受到农业歉收和经济危机的影响,德意志各邦国到处掀起了罢工浪潮和饥民暴动,革命有一触即发之势。到了1848年2月,巴黎二月革命胜利的消息传来后,毗邻法国的部分德意志燃烧的合水村燃烧的合水村卢新民汤文进今日合水村合水村是鄱阳湖畔余干县三塘乡的一个美丽村庄。据族谱记载,南宋末年有汤姓始迁于此,以居两条小溪汇合处而得名合水。1942年7月日寇占领余干期间,为抵虚拟电厂,风口上的新千亿赛道撰文大蔚编辑凯旋11月25日,中国电信浙江公司与华能(浙江)能源开发有限公司上线了全国首个实时调度的5G虚拟电厂项目。该虚拟电厂1号机组顺利完成72小时试运行工作,标志着全国首台(奈雪成乐乐茶第一大股东马斯克否认有自杀倾向自游家否认解散传闻今日行业看点一览苹果再失两名高管,最近几月失血严重据报道,近日,苹果又有两名副总裁离开。几天前JohnStauffer跳槽到游戏公司Roblox,他曾担任苹果互动媒体集团(Inte浅谈高速相干光通信定义相干光通信,英文全称叫做CoherentOpticalCommunication,是光纤通信领域的一项技术,主要是发送端使用相干调制技术,接收端使用外差检测技术进行信息传输光的一往无前小米十周年雷军公开演讲(1)1。1我们的梦想当时国内的手机市场,一类是诺基亚摩托罗拉和三星这样的国际巨头,一类是国产手机中华酷联,就是中兴华为这样的大公司,还有铺天盖地的山寨手机。中国市场主要被国际巨头把持,Dart知识点异常处理头条创作挑战赛本文同步本人掘金平台的文章httpsjuejin。cnpost7129312940531908645推荐使用线上编辑器dartpad。cn进行学习,测试Dart将异常流光岁月ThinkLongTerm(长期主义)首先感谢大家的关注。有人的地方就有江湖,有江湖的地方就有传说。在中国互联网蓬勃发展的这些年里,风云变幻,涌现了很多笑傲江湖的大佬。他们有的百战成名,有的一战封神有的功成身退,有的再