RocketMQ源码分析之RemotingCommand网络通信协议源码分析
一、前言
在分析NameServer的请求和响应流程之前我们需要先看一下他的序列化协议是怎样的,RocketMQ支持的序列化协议有以下2种:JSON;RocketMQ自定义的协议;
json进行序列化其实是省力做法,效率是比较差的,序列化以后的数据格式是比较占用空间,一般成熟的中间件项目一般都会采用自定义的方式进行序列化和反序列化;二、RemotingCommand源码分析
RemotingCommand为RocketMQ中自定义协议组件,其中包含了序列化和反序列化代码逻辑;
但是不向服务直接提供调用,而是通过前文讲解的NettyRemotingServer 类中的NettyEncoder (编码器)和NettyDecoder (解码器)进行具体的调用;
序列化:就是将一段字节数组以固定的顺序的形式存放数据,第一个字节存放什么,后面4个字节存放什么,再后面几个字节存放什么;
反序列化:就是以固定的顺序取数据,你第一个字节存放的是消息的标志位,那你取出来就是消息的标志位,再后面4个为消息体的长度,那取出来就是消息体的长度,你再可以根据消息体的长度去获取对应长度字节的数据;1、数据模型public class RemotingCommand { public static final String SERIALIZE_TYPE_PROPERTY = "rocketmq.serialize.type"; public static final String SERIALIZE_TYPE_ENV = "ROCKETMQ_SERIALIZE_TYPE"; public static final String REMOTING_VERSION_KEY = "rocketmq.remoting.version"; private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING); private static final int RPC_TYPE = 0; // 0, REQUEST_COMMAND private static final int RPC_ONEWAY = 1; // 0, RPC private static final Map, Field[]> CLASS_HASH_MAP = new HashMap, Field[]>(); private static final Map CANONICAL_NAME_CACHE = new HashMap(); // 1, Oneway // 1, RESPONSE_COMMAND private static final Map NULLABLE_FIELD_CACHE = new HashMap(); private static final String STRING_CANONICAL_NAME = String.class.getCanonicalName(); private static final String DOUBLE_CANONICAL_NAME_1 = Double.class.getCanonicalName(); private static final String DOUBLE_CANONICAL_NAME_2 = double.class.getCanonicalName(); private static final String INTEGER_CANONICAL_NAME_1 = Integer.class.getCanonicalName(); private static final String INTEGER_CANONICAL_NAME_2 = int.class.getCanonicalName(); private static final String LONG_CANONICAL_NAME_1 = Long.class.getCanonicalName(); private static final String LONG_CANONICAL_NAME_2 = long.class.getCanonicalName(); private static final String BOOLEAN_CANONICAL_NAME_1 = Boolean.class.getCanonicalName(); private static final String BOOLEAN_CANONICAL_NAME_2 = boolean.class.getCanonicalName(); private static volatile int configVersion = -1; private static AtomicInteger requestId = new AtomicInteger(0); private static SerializeType serializeTypeConfigInThisServer = SerializeType.JSON; static { final String protocol = System.getProperty(SERIALIZE_TYPE_PROPERTY, System.getenv(SERIALIZE_TYPE_ENV)); if (!isBlank(protocol)) { try { serializeTypeConfigInThisServer = SerializeType.valueOf(protocol); } catch (IllegalArgumentException e) { throw new RuntimeException("parser specified protocol error. protocol=" + protocol, e); } } } // code编号,请求编号 private int code; private LanguageCode language = LanguageCode.JAVA; // 编程语言,java private int version = 0; // 版本号 private int opaque = requestId.getAndIncrement(); // 请求id private int flag = 0; // 标识 private String remark; // 备注 private HashMap extFields; // 扩展字段 private transient CommandCustomHeader customHeader; // 自定义header头 // 这一次rpc调用的序列化类型,默认就是json格式 private SerializeType serializeTypeCurrentRPC = serializeTypeConfigInThisServer; // 消息体,会把真正的消息体序列化成字节数组 private transient byte[] body; }2、序列化
org.apache.rocketmq.remoting.netty.NettyEncoder#encodepublic void encode(ChannelHandlerContext ctx, RemotingCommand remotingCommand, ByteBuf out) throws Exception { try { ByteBuffer header = remotingCommand.encodeHeader(); out.writeBytes(header); byte[] body = remotingCommand.getBody(); if (body != null) { out.writeBytes(body); } } catch (Exception e) { log.error("encode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e); if (remotingCommand != null) { log.error(remotingCommand.toString()); } RemotingUtil.closeChannel(ctx.channel()); } }public ByteBuffer encodeHeader() { return encodeHeader(this.body != null ? this.body.length : 0); } public ByteBuffer encodeHeader(final int bodyLength) { // 1> header length size int length = 4; // 2> header data length byte[] headerData; headerData = this.headerEncode(); length += headerData.length; // 3> body data length length += bodyLength; ByteBuffer result = ByteBuffer.allocate(4 + length - bodyLength); // length result.putInt(length); // header length result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC)); // header data result.put(headerData); result.flip(); return result; }
这里会去判断序列化协议的类型,json类型其实没什么好看的,JSON.toJSONString(obj, prettyFormat).getBytes(CHARSET_UTF8); 就没了,我们主要是看RocketMQ的自定义协议;private byte[] headerEncode() { // 把自定义headers放到一个ext fields map里去 this.makeCustomHeaderToNet(); if (SerializeType.ROCKETMQ == serializeTypeCurrentRPC) { return RocketMQSerializable.rocketMQProtocolEncode(this); } else { return RemotingSerializable.encode(this); } }public void makeCustomHeaderToNet() { if (this.customHeader != null) { // 通过反射获取到自定义header类里面的fields Field[] fields = getClazzFields(customHeader.getClass()); if (null == this.extFields) { this.extFields = new HashMap(); } // 对自定义header类的fields进行遍历 for (Field field : fields) { if (!Modifier.isStatic(field.getModifiers())) { String name = field.getName(); if (!name.startsWith("this")) { Object value = null; try { field.setAccessible(true); value = field.get(this.customHeader); } catch (Exception e) { log.error("Failed to access field [{}]", name, e); } // 自定义header这些fields都是放到ext fields里面去 if (value != null) { this.extFields.put(name, value.toString()); } } } } } }private Field[] getClazzFields(Class<? extends CommandCustomHeader> classHeader) { // 如果说你要是自定义了一套header以后,你搞一个类,实现接口 // 然后在这个自定义头的类里,可以定义一堆的field,这些field就是你的自定义的头 Field[] field = CLASS_HASH_MAP.get(classHeader); if (field == null) { // 通过反射直接获取到你自定义类里的头fields拿出来 field = classHeader.getDeclaredFields(); synchronized (CLASS_HASH_MAP) { CLASS_HASH_MAP.put(classHeader, field); } } return field; }public static byte[] markProtocolType(int source, SerializeType type) { byte[] result = new byte[4]; result[0] = type.getCode(); // header length里一共是4个字节,第一个字节是序列化类型code result[1] = (byte) ((source >> 16) & 0xFF); // 第二个字节开始到第四个字节,一共是3个字节都是跟header length是有关系的 result[2] = (byte) ((source >> 8) & 0xFF); result[3] = (byte) (source & 0xFF); return result; }
其实自定义序列化就是搞一个byte数组,采用固定的显示进行构建。
如:第一个字节放请求类型,后面四个字节放消息体总长度,在后面发具体的消息体。消息体前面几位为header长度,后面为header消息体等等,通过固定排列的顺序进行构建,这样解析的时候我们就可以根据字节顺序来读取消息了。public static byte[] rocketMQProtocolEncode(RemotingCommand cmd) { // 用json进行序列化其实是省力做法,效率是比较差的,序列化以后的数据格式是比较占用空间一些 // 常规做法是自己对RemotingCommand协议数据对象进行序列化 // 编码,对象 -> 字节数组 // String remark byte[] remarkBytes = null; int remarkLen = 0; if (cmd.getRemark() != null && cmd.getRemark().length() > 0) { remarkBytes = cmd.getRemark().getBytes(CHARSET_UTF8); remarkLen = remarkBytes.length; } // HashMap extFields // ext fields,是我们可能的自定义headers就在这里,把扩展头序列化为字节数组 byte[] extFieldsBytes = null; int extLen = 0; if (cmd.getExtFields() != null && !cmd.getExtFields().isEmpty()) { extFieldsBytes = mapSerialize(cmd.getExtFields()); extLen = extFieldsBytes.length; } // 计算出来消息头总长度 int totalLen = calTotalLen(remarkLen, extLen); ByteBuffer headerBuffer = ByteBuffer.allocate(totalLen); // int code(~32767) headerBuffer.putShort((short) cmd.getCode()); // LanguageCode language headerBuffer.put(cmd.getLanguage().getCode()); // int version(~32767) headerBuffer.putShort((short) cmd.getVersion()); // int opaque headerBuffer.putInt(cmd.getOpaque()); // int flag headerBuffer.putInt(cmd.getFlag()); // String remark if (remarkBytes != null) { headerBuffer.putInt(remarkBytes.length); headerBuffer.put(remarkBytes); } else { headerBuffer.putInt(0); } // HashMap extFields; if (extFieldsBytes != null) { headerBuffer.putInt(extFieldsBytes.length); headerBuffer.put(extFieldsBytes); } else { headerBuffer.putInt(0); } return headerBuffer.array(); }3、反序列化
org.apache.rocketmq.remoting.netty.NettyDecoder#decodepublic Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception { ByteBuf frame = null; try { frame = (ByteBuf) super.decode(ctx, in); if (null == frame) { return null; } ByteBuffer byteBuffer = frame.nioBuffer(); return RemotingCommand.decode(byteBuffer); } catch (Exception e) { log.error("decode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e); RemotingUtil.closeChannel(ctx.channel()); } finally { if (null != frame) { frame.release(); } } return null; }public static RemotingCommand decode(final ByteBuffer byteBuffer) throws RemotingCommandException { // 解码的过程就是编码过程的逆向过程 int length = byteBuffer.limit(); // 总长度 int oriHeaderLen = byteBuffer.getInt(); // 头长度 int headerLength = getHeaderLength(oriHeaderLen); // 搞一个头长度的字节数组,一次性把headers都读出来放到字节数组里去 byte[] headerData = new byte[headerLength]; byteBuffer.get(headerData); // 对header要做一个解码 RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen)); int bodyLength = length - 4 - headerLength; byte[] bodyData = null; if (bodyLength > 0) { bodyData = new byte[bodyLength]; byteBuffer.get(bodyData); } cmd.body = bodyData; return cmd; }
这里判断header是用什么协议进行序列化的,就会使用什么协议进行反序列化;private static RemotingCommand headerDecode(byte[] headerData, SerializeType type) throws RemotingCommandException { switch (type) { case JSON: RemotingCommand resultJson = RemotingSerializable.decode(headerData, RemotingCommand.class); resultJson.setSerializeTypeCurrentRPC(type); return resultJson; case ROCKETMQ: RemotingCommand resultRMQ = RocketMQSerializable.rocketMQProtocolDecode(headerData); resultRMQ.setSerializeTypeCurrentRPC(type); return resultRMQ; default: break; } return null; }
我们之间看rocketMQ自定义的协议吧,其实就是一个逆向的过程,你之前放的什么,他就根据字节拿出来;public static RemotingCommand rocketMQProtocolDecode(final byte[] headerArray) throws RemotingCommandException { RemotingCommand cmd = new RemotingCommand(); ByteBuffer headerBuffer = ByteBuffer.wrap(headerArray); // int code(~32767) cmd.setCode(headerBuffer.getShort()); // LanguageCode language cmd.setLanguage(LanguageCode.valueOf(headerBuffer.get())); // int version(~32767) cmd.setVersion(headerBuffer.getShort()); // int opaque cmd.setOpaque(headerBuffer.getInt()); // int flag cmd.setFlag(headerBuffer.getInt()); // String remark int remarkLength = headerBuffer.getInt(); if (remarkLength > 0) { if (remarkLength > headerArray.length) { throw new RemotingCommandException("RocketMQ protocol decoding failed, remark length: " + remarkLength + ", but header length: " + headerArray.length); } byte[] remarkContent = new byte[remarkLength]; headerBuffer.get(remarkContent); cmd.setRemark(new String(remarkContent, CHARSET_UTF8)); } // HashMap extFields int extFieldsLength = headerBuffer.getInt(); if (extFieldsLength > 0) { if (extFieldsLength > headerArray.length) { throw new RemotingCommandException("RocketMQ protocol decoding failed, extFields length: " + extFieldsLength + ", but header length: " + headerArray.length); } byte[] extFieldsBytes = new byte[extFieldsLength]; headerBuffer.get(extFieldsBytes); cmd.setExtFields(mapDeserialize(extFieldsBytes)); } return cmd; }三、总结RemotingCommand为rocketMQ的序列化和反序列化的组件,所有消息都需要使用他进行处理;序列化和反序列化是根据约定的协议存放数据,再根据约定的协议取数据;
日韩同时回家世界杯八强无一亚洲球队!西葡争最后两个名额北京时间12月6日凌晨,卡塔尔世界杯结束了两场18决赛,日本队在点球大战中不敌克罗地亚队,被淘汰出局,韩国队14惨败巴西队,无缘晋级。至此,亚洲球队已经全部被淘汰,世界杯八强产生6
全球财经连线横琴世界湾区论坛共探世界变革新趋势与湾区经济新机遇视频加载中南方财经全媒体记者赵越澳门报道放眼全球,湾区已经成为区域参与世界经济竞争的重要力量,也是高质量发展的代名词。粤港澳大湾区发展规划纲要发布实施三年多来,粤港澳三地不断凝聚各
一场大火导致紫禁城1000名太监集体被辞清皇室退位后,末代皇帝及一众人等仍在故宫中居住,同时在故宫中服侍皇室的太监虽然陆陆续续遣散了一千多名,但仍有一千一百余名太监留在宫内。1923年6月间,又发生了一千名太监集体被辞事
铁血中带着温柔俾斯麦为镇压工人运动,发明劳动法和社会保险1847年底,由于受到农业歉收和经济危机的影响,德意志各邦国到处掀起了罢工浪潮和饥民暴动,革命有一触即发之势。到了1848年2月,巴黎二月革命胜利的消息传来后,毗邻法国的部分德意志
燃烧的合水村燃烧的合水村卢新民汤文进今日合水村合水村是鄱阳湖畔余干县三塘乡的一个美丽村庄。据族谱记载,南宋末年有汤姓始迁于此,以居两条小溪汇合处而得名合水。1942年7月日寇占领余干期间,为抵
虚拟电厂,风口上的新千亿赛道撰文大蔚编辑凯旋11月25日,中国电信浙江公司与华能(浙江)能源开发有限公司上线了全国首个实时调度的5G虚拟电厂项目。该虚拟电厂1号机组顺利完成72小时试运行工作,标志着全国首台(
奈雪成乐乐茶第一大股东马斯克否认有自杀倾向自游家否认解散传闻今日行业看点一览苹果再失两名高管,最近几月失血严重据报道,近日,苹果又有两名副总裁离开。几天前JohnStauffer跳槽到游戏公司Roblox,他曾担任苹果互动媒体集团(Inte
浅谈高速相干光通信定义相干光通信,英文全称叫做CoherentOpticalCommunication,是光纤通信领域的一项技术,主要是发送端使用相干调制技术,接收端使用外差检测技术进行信息传输光的
一往无前小米十周年雷军公开演讲(1)1。1我们的梦想当时国内的手机市场,一类是诺基亚摩托罗拉和三星这样的国际巨头,一类是国产手机中华酷联,就是中兴华为这样的大公司,还有铺天盖地的山寨手机。中国市场主要被国际巨头把持,
Dart知识点异常处理头条创作挑战赛本文同步本人掘金平台的文章httpsjuejin。cnpost7129312940531908645推荐使用线上编辑器dartpad。cn进行学习,测试Dart将异常
流光岁月ThinkLongTerm(长期主义)首先感谢大家的关注。有人的地方就有江湖,有江湖的地方就有传说。在中国互联网蓬勃发展的这些年里,风云变幻,涌现了很多笑傲江湖的大佬。他们有的百战成名,有的一战封神有的功成身退,有的再