RocketMQ源码分析之RemotingCommand网络通信协议源码分析
一、前言
在分析NameServer的请求和响应流程之前我们需要先看一下他的序列化协议是怎样的,RocketMQ支持的序列化协议有以下2种:JSON;RocketMQ自定义的协议;
json进行序列化其实是省力做法,效率是比较差的,序列化以后的数据格式是比较占用空间,一般成熟的中间件项目一般都会采用自定义的方式进行序列化和反序列化;二、RemotingCommand源码分析
RemotingCommand为RocketMQ中自定义协议组件,其中包含了序列化和反序列化代码逻辑;
但是不向服务直接提供调用,而是通过前文讲解的NettyRemotingServer 类中的NettyEncoder (编码器)和NettyDecoder (解码器)进行具体的调用;
序列化:就是将一段字节数组以固定的顺序的形式存放数据,第一个字节存放什么,后面4个字节存放什么,再后面几个字节存放什么;
反序列化:就是以固定的顺序取数据,你第一个字节存放的是消息的标志位,那你取出来就是消息的标志位,再后面4个为消息体的长度,那取出来就是消息体的长度,你再可以根据消息体的长度去获取对应长度字节的数据;1、数据模型public class RemotingCommand { public static final String SERIALIZE_TYPE_PROPERTY = "rocketmq.serialize.type"; public static final String SERIALIZE_TYPE_ENV = "ROCKETMQ_SERIALIZE_TYPE"; public static final String REMOTING_VERSION_KEY = "rocketmq.remoting.version"; private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING); private static final int RPC_TYPE = 0; // 0, REQUEST_COMMAND private static final int RPC_ONEWAY = 1; // 0, RPC private static final Map, Field[]> CLASS_HASH_MAP = new HashMap, Field[]>(); private static final Map CANONICAL_NAME_CACHE = new HashMap(); // 1, Oneway // 1, RESPONSE_COMMAND private static final Map NULLABLE_FIELD_CACHE = new HashMap(); private static final String STRING_CANONICAL_NAME = String.class.getCanonicalName(); private static final String DOUBLE_CANONICAL_NAME_1 = Double.class.getCanonicalName(); private static final String DOUBLE_CANONICAL_NAME_2 = double.class.getCanonicalName(); private static final String INTEGER_CANONICAL_NAME_1 = Integer.class.getCanonicalName(); private static final String INTEGER_CANONICAL_NAME_2 = int.class.getCanonicalName(); private static final String LONG_CANONICAL_NAME_1 = Long.class.getCanonicalName(); private static final String LONG_CANONICAL_NAME_2 = long.class.getCanonicalName(); private static final String BOOLEAN_CANONICAL_NAME_1 = Boolean.class.getCanonicalName(); private static final String BOOLEAN_CANONICAL_NAME_2 = boolean.class.getCanonicalName(); private static volatile int configVersion = -1; private static AtomicInteger requestId = new AtomicInteger(0); private static SerializeType serializeTypeConfigInThisServer = SerializeType.JSON; static { final String protocol = System.getProperty(SERIALIZE_TYPE_PROPERTY, System.getenv(SERIALIZE_TYPE_ENV)); if (!isBlank(protocol)) { try { serializeTypeConfigInThisServer = SerializeType.valueOf(protocol); } catch (IllegalArgumentException e) { throw new RuntimeException("parser specified protocol error. protocol=" + protocol, e); } } } // code编号,请求编号 private int code; private LanguageCode language = LanguageCode.JAVA; // 编程语言,java private int version = 0; // 版本号 private int opaque = requestId.getAndIncrement(); // 请求id private int flag = 0; // 标识 private String remark; // 备注 private HashMap extFields; // 扩展字段 private transient CommandCustomHeader customHeader; // 自定义header头 // 这一次rpc调用的序列化类型,默认就是json格式 private SerializeType serializeTypeCurrentRPC = serializeTypeConfigInThisServer; // 消息体,会把真正的消息体序列化成字节数组 private transient byte[] body; }2、序列化
org.apache.rocketmq.remoting.netty.NettyEncoder#encodepublic void encode(ChannelHandlerContext ctx, RemotingCommand remotingCommand, ByteBuf out) throws Exception { try { ByteBuffer header = remotingCommand.encodeHeader(); out.writeBytes(header); byte[] body = remotingCommand.getBody(); if (body != null) { out.writeBytes(body); } } catch (Exception e) { log.error("encode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e); if (remotingCommand != null) { log.error(remotingCommand.toString()); } RemotingUtil.closeChannel(ctx.channel()); } }public ByteBuffer encodeHeader() { return encodeHeader(this.body != null ? this.body.length : 0); } public ByteBuffer encodeHeader(final int bodyLength) { // 1> header length size int length = 4; // 2> header data length byte[] headerData; headerData = this.headerEncode(); length += headerData.length; // 3> body data length length += bodyLength; ByteBuffer result = ByteBuffer.allocate(4 + length - bodyLength); // length result.putInt(length); // header length result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC)); // header data result.put(headerData); result.flip(); return result; }
这里会去判断序列化协议的类型,json类型其实没什么好看的,JSON.toJSONString(obj, prettyFormat).getBytes(CHARSET_UTF8); 就没了,我们主要是看RocketMQ的自定义协议;private byte[] headerEncode() { // 把自定义headers放到一个ext fields map里去 this.makeCustomHeaderToNet(); if (SerializeType.ROCKETMQ == serializeTypeCurrentRPC) { return RocketMQSerializable.rocketMQProtocolEncode(this); } else { return RemotingSerializable.encode(this); } }public void makeCustomHeaderToNet() { if (this.customHeader != null) { // 通过反射获取到自定义header类里面的fields Field[] fields = getClazzFields(customHeader.getClass()); if (null == this.extFields) { this.extFields = new HashMap(); } // 对自定义header类的fields进行遍历 for (Field field : fields) { if (!Modifier.isStatic(field.getModifiers())) { String name = field.getName(); if (!name.startsWith("this")) { Object value = null; try { field.setAccessible(true); value = field.get(this.customHeader); } catch (Exception e) { log.error("Failed to access field [{}]", name, e); } // 自定义header这些fields都是放到ext fields里面去 if (value != null) { this.extFields.put(name, value.toString()); } } } } } }private Field[] getClazzFields(Class<? extends CommandCustomHeader> classHeader) { // 如果说你要是自定义了一套header以后,你搞一个类,实现接口 // 然后在这个自定义头的类里,可以定义一堆的field,这些field就是你的自定义的头 Field[] field = CLASS_HASH_MAP.get(classHeader); if (field == null) { // 通过反射直接获取到你自定义类里的头fields拿出来 field = classHeader.getDeclaredFields(); synchronized (CLASS_HASH_MAP) { CLASS_HASH_MAP.put(classHeader, field); } } return field; }public static byte[] markProtocolType(int source, SerializeType type) { byte[] result = new byte[4]; result[0] = type.getCode(); // header length里一共是4个字节,第一个字节是序列化类型code result[1] = (byte) ((source >> 16) & 0xFF); // 第二个字节开始到第四个字节,一共是3个字节都是跟header length是有关系的 result[2] = (byte) ((source >> 8) & 0xFF); result[3] = (byte) (source & 0xFF); return result; }
其实自定义序列化就是搞一个byte数组,采用固定的显示进行构建。
如:第一个字节放请求类型,后面四个字节放消息体总长度,在后面发具体的消息体。消息体前面几位为header长度,后面为header消息体等等,通过固定排列的顺序进行构建,这样解析的时候我们就可以根据字节顺序来读取消息了。public static byte[] rocketMQProtocolEncode(RemotingCommand cmd) { // 用json进行序列化其实是省力做法,效率是比较差的,序列化以后的数据格式是比较占用空间一些 // 常规做法是自己对RemotingCommand协议数据对象进行序列化 // 编码,对象 -> 字节数组 // String remark byte[] remarkBytes = null; int remarkLen = 0; if (cmd.getRemark() != null && cmd.getRemark().length() > 0) { remarkBytes = cmd.getRemark().getBytes(CHARSET_UTF8); remarkLen = remarkBytes.length; } // HashMap extFields // ext fields,是我们可能的自定义headers就在这里,把扩展头序列化为字节数组 byte[] extFieldsBytes = null; int extLen = 0; if (cmd.getExtFields() != null && !cmd.getExtFields().isEmpty()) { extFieldsBytes = mapSerialize(cmd.getExtFields()); extLen = extFieldsBytes.length; } // 计算出来消息头总长度 int totalLen = calTotalLen(remarkLen, extLen); ByteBuffer headerBuffer = ByteBuffer.allocate(totalLen); // int code(~32767) headerBuffer.putShort((short) cmd.getCode()); // LanguageCode language headerBuffer.put(cmd.getLanguage().getCode()); // int version(~32767) headerBuffer.putShort((short) cmd.getVersion()); // int opaque headerBuffer.putInt(cmd.getOpaque()); // int flag headerBuffer.putInt(cmd.getFlag()); // String remark if (remarkBytes != null) { headerBuffer.putInt(remarkBytes.length); headerBuffer.put(remarkBytes); } else { headerBuffer.putInt(0); } // HashMap extFields; if (extFieldsBytes != null) { headerBuffer.putInt(extFieldsBytes.length); headerBuffer.put(extFieldsBytes); } else { headerBuffer.putInt(0); } return headerBuffer.array(); }3、反序列化
org.apache.rocketmq.remoting.netty.NettyDecoder#decodepublic Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception { ByteBuf frame = null; try { frame = (ByteBuf) super.decode(ctx, in); if (null == frame) { return null; } ByteBuffer byteBuffer = frame.nioBuffer(); return RemotingCommand.decode(byteBuffer); } catch (Exception e) { log.error("decode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e); RemotingUtil.closeChannel(ctx.channel()); } finally { if (null != frame) { frame.release(); } } return null; }public static RemotingCommand decode(final ByteBuffer byteBuffer) throws RemotingCommandException { // 解码的过程就是编码过程的逆向过程 int length = byteBuffer.limit(); // 总长度 int oriHeaderLen = byteBuffer.getInt(); // 头长度 int headerLength = getHeaderLength(oriHeaderLen); // 搞一个头长度的字节数组,一次性把headers都读出来放到字节数组里去 byte[] headerData = new byte[headerLength]; byteBuffer.get(headerData); // 对header要做一个解码 RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen)); int bodyLength = length - 4 - headerLength; byte[] bodyData = null; if (bodyLength > 0) { bodyData = new byte[bodyLength]; byteBuffer.get(bodyData); } cmd.body = bodyData; return cmd; }
这里判断header是用什么协议进行序列化的,就会使用什么协议进行反序列化;private static RemotingCommand headerDecode(byte[] headerData, SerializeType type) throws RemotingCommandException { switch (type) { case JSON: RemotingCommand resultJson = RemotingSerializable.decode(headerData, RemotingCommand.class); resultJson.setSerializeTypeCurrentRPC(type); return resultJson; case ROCKETMQ: RemotingCommand resultRMQ = RocketMQSerializable.rocketMQProtocolDecode(headerData); resultRMQ.setSerializeTypeCurrentRPC(type); return resultRMQ; default: break; } return null; }
我们之间看rocketMQ自定义的协议吧,其实就是一个逆向的过程,你之前放的什么,他就根据字节拿出来;public static RemotingCommand rocketMQProtocolDecode(final byte[] headerArray) throws RemotingCommandException { RemotingCommand cmd = new RemotingCommand(); ByteBuffer headerBuffer = ByteBuffer.wrap(headerArray); // int code(~32767) cmd.setCode(headerBuffer.getShort()); // LanguageCode language cmd.setLanguage(LanguageCode.valueOf(headerBuffer.get())); // int version(~32767) cmd.setVersion(headerBuffer.getShort()); // int opaque cmd.setOpaque(headerBuffer.getInt()); // int flag cmd.setFlag(headerBuffer.getInt()); // String remark int remarkLength = headerBuffer.getInt(); if (remarkLength > 0) { if (remarkLength > headerArray.length) { throw new RemotingCommandException("RocketMQ protocol decoding failed, remark length: " + remarkLength + ", but header length: " + headerArray.length); } byte[] remarkContent = new byte[remarkLength]; headerBuffer.get(remarkContent); cmd.setRemark(new String(remarkContent, CHARSET_UTF8)); } // HashMap extFields int extFieldsLength = headerBuffer.getInt(); if (extFieldsLength > 0) { if (extFieldsLength > headerArray.length) { throw new RemotingCommandException("RocketMQ protocol decoding failed, extFields length: " + extFieldsLength + ", but header length: " + headerArray.length); } byte[] extFieldsBytes = new byte[extFieldsLength]; headerBuffer.get(extFieldsBytes); cmd.setExtFields(mapDeserialize(extFieldsBytes)); } return cmd; }三、总结RemotingCommand为rocketMQ的序列化和反序列化的组件,所有消息都需要使用他进行处理;序列化和反序列化是根据约定的协议存放数据,再根据约定的协议取数据;
北控加入高诗岩争夺者,愿送出潜力内线,高诗岩最终会花落谁家?今年休赛期的一个重头戏就是高诗岩要去哪?根据最新的消息来看,由于双方就高诗岩转会的问题上分歧比较大,所以辽宁方面已经叫停了谈判,目前双方陷入了僵局。当然了,我们考虑到之前郭艾伦主动
手把手教你用Jenkins自动部署SpringBoot1。什么是CICD1。1CI(ContinuousIntegration)1。2CD(ContinuousDeliveryContinuousDeployment)2。什么是Jen
嵌入式开发使用堆栈保护提高代码完整性开发可靠的嵌入式软件归结为计划最坏的情况,并确保嵌入式开发人员有适当的防护措施和陷阱来处理这些情况。嵌入式软件中一个被忽视的领域通常是堆栈。堆栈是微控制器用来存储局部变量函数调用返
东盟多国发表声明一个中国原则,天道好轮回,苍天饶过谁?东盟十国有老挝,越南,缅甸,柬埔寨,文莱,马来西亚,印度尼西亚,泰国,菲律宾,新加坡。一起看看为什么东盟会帮助中国发声,东南亚国家逐渐成为越来越多中国年轻人度假和旅游的首选,较低的
私募大佬但斌加仓最高增至9成!回撤仍在继续百亿私募大佬但斌曾被市场誉为茅台王子中国巴菲特,在踏空4月底以来反弹行情后,但斌将股票仓位迅速提升,不过一路来遭遇了加仓美股遇大跌加仓白酒遭回调加仓互联网股票屡受挫,而其基金净值波
心疼郭艾伦这几天被郭艾伦刷屏了,头条新闻全都是关于他申请转会的消息,有的说不该转,对不起辽宁队的培养,对不起辽宁球迷的支持,有的说该转,但没有队要他,俱乐部又发出声音说全力支持去更高水平的联
字节跳动Java岗一二三面全经过分享前言金三银四才过去没多久,眼看着便又要秋招了,所以为大家写了这篇文章,来自一个刚参加完字节面试并高分通过的朋友亲口所述,除了字节的offer,他还分别通过了京东百度以及腾讯阿里巴巴
Java开发掌握这些Linux命令就够了一导学掌握Linux命令是高级Java工程师必备的技能之一,但并不是每个人都能完全掌握,绝大部分Java初中级工程师只知道少数的cdlsmkdirrm等非常简单的命令,但是只会这些
王兴程维背后的男人,为何都是马化腾黄峥厉不厉害?在猫狗的夹缝里创造了拼多多,逼着各大巨头开始重视下沉市场,为普通民众带来福音。王兴厉不厉害?百团大战披荆斩棘,最后几乎一统本地生活江湖,从到店到外卖到万物到家,人们的
钙片饭前吃好,还是饭后吃好?如何判断是否缺钙?不可盲目补充钙片是用来给身体补钙的一种保健品,人和孩子都会选择吃钙片来进行补钙,除了每天定时定量服用之外,还需要选择恰当的时机。有的人比较坚信药补不如食补,所以也会从日常饮食进行改善,对于一些
极限竞速地平线5风火轮连接梦想与现实的云端冒险I感谢小黑盒提供的测评机会本文作者白羽夜茶会茗零前言上市四周年以后,曾经荣获德国科隆游戏展最佳竞速游戏奖和TGA2016最佳体育竞速游戏的极限竞速地平线3于2020年9月27日宣告
美国登月飞船也靠不住?拯救宇航员,NASA很大胆拼一个椅子上天现如今,美国的SLS重型火箭已经运载着无人飞船发射升空,正式开启了美国重返月球之旅的第一步。实际上,在1969年阿波罗11登上月面的那一时刻,美国就曾经制定过未来在月球的长期科考计
全职游戏搬砖等级划分?一青铜入门级选手,多是一些刚毕业的学生,或者因为意外离职的人,机缘巧合之下入坑,运气好有个不错的领路人,恰好有个好游戏,突破月入三千的底线。然而由于经验不足,加之行业的不确定性因素
滴,你的任务已完成,请进入页面查看我在无限游戏里封神(无限)作者壶鱼辣椒简介白柳在失业后被卷入一个无法停止的惊悚直播游戏中,游戏中充满了各种各样的怪物和蕴含杀意的玩家。一开始所有人都以为白柳只是个误入游戏的普通人。
孩子偷拿钱,父亲竟当街罚跪孩子偷拿钱,父亲竟当街罚跪,惩罚式家教真能让娃长记性?最近,看到这样一则新闻一个孩子偷拿了别人的钱,结果父亲当街罚跪50个小时,期间父亲不允许孩子吃饭,因为怕孩子会饿死。网友纷纷感
新型超导双量子比特处理器问世量子处理器的一部分。(图片来源谢尔盖格努斯科夫俄罗斯国家研究型技术大学)俄罗斯国家研究型技术大学和莫斯科国立鲍曼技术大学成功使用新型超导fluxonium量子比特实现了双量子比特操
运维,你还不会查看Linux系统cpu信息?CPU也称为微处理器或简称为处理器。就像大脑如何控制人体一样,CPU控制着计算机的所有部分。因此CPU被认为是计算机的大脑。那我们怎么在Linux系统中查看如IntelCorei3
当你被瞧不起的时候,如此反击才是上策如果你太过于在意别人的想法,那么你终将会在他人的声音当中迷失了自我。人生在世,不可能做到十全十美的事,让每一个人都对你满意,所以在人生这条必经之路上,受到他人的冷眼相待其实是一件再
做人说话办事要会打太极前段时间有个关系不错的朋友找我借钱,我心里已经答应借了,但我却说行,我今天给你想想办法,最迟明天给你答复。你放心!如果我直接了当,恩就小了,还钱难度也会变大,第二天把钱转过去了,让
三中往事这世间,哪里还有个字能比旧更让人回味无穷的?旧时的光阴,旧时的记忆,旧时的人事。这个旧字把我拉回到环境优雅迷人的株洲县三中。1993年的秋日,我与王照常杨元胜刘飞跃等人分配到了京广
从嫦娥奔月到夸父逐日,宇宙探索中你必须知道的中国浪漫近日,我国综合性太阳探测专用卫星夸父一号获得太阳硬X射线图像,并对外发布。这是我国首次获得也是目前国际上地球视角唯一的太阳硬X射线像,其质量达到了国际先进水平。感谢科学家们的努力,
我科学家在笼目超导体研究中有重大发现(记者马荣瑞通讯员王敏)北京时间24日零点,学术期刊自然刊发中国科学技术大学陈仙辉院士团队吴涛教授等人的最新发现利用高压下的核磁共振谱学技术,在笼目超导体铯钒锑中观察到一种由压力诱