基于Netty的高性能RPC框架Nifty协议传输层编解码全解析
ThriftCodecManager与对象读取
在编写服务端代码的时候,我们创建了 ThriftCodecManager 这个对象,该对象是用来管理编解码器ThriftCodec<?> 的,在初始化的时候会创建各种类型的编解码器放到缓存中, 以供服务处理器ThrfitServiceProcessor 使用。接下来我们就深入分析这个编解码器管理器。
通常我们使用的就是默认变成 ThriftCodec 数组构造方法来创建该对象,内部持有guava提供的缓存private final LoadingCache> typeCodecs; public ThriftCodecManager(ThriftCodec<?>... codecs) { this(new CompilerThriftCodecFactory(ThriftCodecManager.class.getClassLoader()), ImmutableSet.copyOf(codecs)); } public ThriftCodecManager(ThriftCodecFactory factory, Set> codecs){ this(factory, new ThriftCatalog(), codecs); }
主要分三步来添加编解码器到缓存typeCodecs中, 构造typeCodecs, 根据ThriftType来动态构建编解码器添加到缓存中;使用的是guava cache, 这个东西公司内部基础架构部门也用的不少,作为本地缓存提供了挺多的策略,非常推荐花半小时学习一下; 添加支持基本类型的编解码器,比如 StringThriftCodec, IntThriftCodec ;将我们自己定义的编解码器也加入到缓存中;
接下来按照顺序依次介绍 1.1. 读取集合类型参数typeCodecs = CacheBuilder.newBuilder().build(new CacheLoader>() { public ThriftCodec<?> load(ThriftType type) throws Exception { switch (type.getProtocolType()) { case STRUCT: { return factory.generateThriftTypeCodec(ThriftCodecManager.this, type.getStructMetadata()); } case MAP: { ThriftCodec<?> keyCodec = typeCodecs.get(type.getKeyType()); ThriftCodec<?> valueCodec = typeCodecs.get(type.getValueType()); return new MapThriftCodec<>(type, keyCodec, valueCodec); } case SET: { ThriftCodec<?> elementCodec = typeCodecs.get(type.getValueType()); return new SetThriftCodec<>(type, elementCodec); } case LIST: { ThriftCodec<?> elementCodec = typeCodecs.get(type.getValueType()); return new ListThriftCodec<>(type, elementCodec); } } } });
比如 ThriftType=MAP ,分别从缓存获取key和value的编解码器,从而来构建map的编解码器MapThriftCodec
在这之前需要知道编解码器其实就是提供了read和write方法,从协议中读取和写出数据。 public interface ThriftCodec{ /** * The Thrift type this codec supports. The Thrift type contains the Java generic Type of the * codec. */ public ThriftType getType(); /** * Reads a value from supplied Thrift protocol reader. * * @param protocol the protocol to read from * @return the value; not null * @throws Exception if any problems occurred when reading or coercing the value */ public T read(TProtocol protocol) throws Exception; /** * Writes a value to the supplied Thrift protocol writer. * * @param value the value to write; not null * @param protocol the protocol to write to * @throws Exception if any problems occurred when writing or coercing the value */ public void write(T value, TProtocol protocol) throws Exception; }
基本类型的编解码器在之前的博客中说过,这里就看下MapThriftCodec和基本类型的编解码器有什么不同。 public Map read(TProtocol protocol) throws Exception { return new TProtocolReader(protocol).readMap(keyCodec, valueCodec); }
继续往下看: TProtocolReader :public Map readMap(ThriftCodec keyCodec, ThriftCodec valueCodec) throws Exception { TMap tMap = protocol.readMapBegin(); Map map = new HashMap<>(); for (int i = 0; i < tMap.size; i++) { K key = keyCodec.read(protocol); V value = valueCodec.read(protocol); map.put(key, value); } protocol.readMapEnd(); return map; }
是不是很容易理解,拿到key和value的编解码器后,不停的往下读即可。
比较疑问的地方就是这里没有涉及到循环遍历i,实际上这是关于map的协议来决定的,protocal.readMapBegin在TBinaryProtocal中的实现是,依次读取一个字节,一个字节,4个字节分别表示key的类型,value的类型和 元素数量。得到了map中元素数量tMap.size后,由于每次读取key和value的时候buffer中的指针都会移动,读完了value后,能保证下次读到的就是下一个元素的key。最后将结果放到构建的hashMap中即可。 1.2. 读取结构体
方法的形参里面除了基本类型,集合类型外,经常还能遇到结构体类型,对于java来说就是对象,这个时候对应的处理如下。 case STRUCT: { return factory.generateThriftTypeCodec(ThriftCodecManager.this, type.getStructMetadata()); }
做点小改动,在这里使用一个新的 ThriftCodecManager 构造方法,传入编解码器工厂类ReflectionThriftCodecFactory ThriftCodecManager manager = new ThriftCodecManager(new ReflectionThriftCodecFactory());
其实还有个工厂类 CompilerThriftCodecFactory ,看名字是个自建编译器的工厂类,看过内部实现代码多而且不大好理解,所以这里使用反射方式的编解码器工厂类。
ReflectionThriftCodecFactory 继承自ThriftCodecFactory ,提供了获取编解码器的方法。public interface ThriftCodecFactory{ ThriftCodec<?> generateThriftTypeCodec(ThriftCodecManager codecManager, ThriftStructMetadata metadata); }
ReflectionThriftCodecFactory 的实现如下:public class ReflectionThriftCodecFactory implements ThriftCodecFactory { @Override public ThriftCodec<?> generateThriftTypeCodec(ThriftCodecManager codecManager, ThriftStructMetadata metadata) { switch (metadata.getMetadataType()) { case STRUCT: return new ReflectionThriftStructCodec<>(codecManager, metadata); case UNION: return new ReflectionThriftUnionCodec<>(codecManager, metadata); default: throw new IllegalStateException(format("encountered type %s", metadata.getMetadataType())); } } }
那么问题就是, ThriftStructMetadata 如何得到以及干嘛用的, 看名字就知道这是保存结构体元数据的. 由type.getStructMetadata() 得到, 那么问题就是如何构造的ThriftType . 其实在最开始构造ThriftServiceProcessor 的时候就构造好了,
创建过程比较复杂,简单来说就是:构建ThriftServiceProcessor的时候,会将构建ThriftServiceMetadata;而构建ThriftServiceMetadata的时候会构建ThriftMethodMetadata;构建 的时候会构建 List ; 每个ThriftFieldMetadata 都代表一个方法形参,内部持有一个ThriftType , 到这里就知道了ThriftType是属于ThriftFieldMetadata ,在构建ThriftMethodMetadata 的时候会得到ThriftType,如何得到?其实就是根据方法形参得到ThriftType的,从目录类ThriftCatalog 获取到,其内部存有一个Type和ThriftType的映射,来简单看下。
ThriftCatalog :private final ConcurrentMap typeCache = new ConcurrentHashMap<>(); public ThriftType getThriftType(Type javaType) throws IllegalArgumentException{ ThriftType thriftType = typeCache.get(javaType); if (thriftType == null) { thriftType = getThriftTypeUncached(javaType); typeCache.putIfAbsent(javaType, thriftType); } return thriftType; }
基于此,在构建 ThriftMethodProcessor 的时候,再跟进ThriftCodecManager 就能得到Map> ImmutableMap.Builder> builder = ImmutableMap.builder(); for (ThriftFieldMetadata fieldMetadata : methodMetadata.getParameters()) { builder.put(fieldMetadata.getId(), codecManager.getCodec(fieldMetadata.getThriftType())); } parameterCodecs = builder.build();
所以在处理接收到的数据的时候,可以根据fieldid来获取对应的编解码器了。
知道 ThriftType 如何获得后,再回过头来看ReflectionThriftStructCodec 如何解析结构体的。protected final ThriftStructMetadata metadata; protected final SortedMap> fields; @Override public T read(TProtocol protocol) throws Exception { TProtocolReader reader = new TProtocolReader(protocol); Map data = new HashMap<>(metadata.getFields().size()); while (reader.nextField()) { short fieldId = reader.getFieldId(); ThriftCodec<?> codec = fields.get(fieldId); Object value = reader.readField(codec); data.put(fieldId, value); } // build the struct return constructStruct(data); }
代码简化了很多,但是主体逻辑留了下来。从这部分代码可以看到,这和服务端读取数据的逻辑是一样的,具体的可以参考服务端读取数据。最后得到的data中key为 struct(即java对象)的fieldId,value则为属性的值。比如传入的对象 class Dog{int age = 5; String name = "tom"} 那么两个data中的数据为{1=5,2="tom"} ,最后在constructStruct 构建对象。编解码器
在前面其实介绍过编解码器 ThrfitCodec ,为了符合标题,这里再啰嗦一遍。
Thrift提供的编解码器顶层接口为 ThriftCodec ,提供了read和write 方法public interface ThriftCodec{ /** * The Thrift type this codec supports. The Thrift type contains the Java generic Type of the * codec. */ public ThriftType getType(); /** * Reads a value from supplied Thrift protocol reader. * * @param protocol the protocol to read from * @return the value; not null * @throws Exception if any problems occurred when reading or coercing the value */ public T read(TProtocol protocol) throws Exception; /** * Writes a value to the supplied Thrift protocol writer. * * @param value the value to write; not null * @param protocol the protocol to write to * @throws Exception if any problems occurred when writing or coercing the value */ public void write(T value, TProtocol protocol) throws Exception; }
同时Thrift也为我们提供了常用的编解码器,足以应付我们业务的使用。比如常见的基本类型的编解码器,String类型编解码器 StringThriftCodec :public class StringThriftCodec implements ThriftCodec { @Override public ThriftType getType() { return ThriftType.STRING; } @Override public String read(TProtocol protocol) throws Exception { return protocol.readString(); } @Override public void write(String value, TProtocol protocol) throws Exception{ protocol.writeString(value); } }
IntegerThriftCodec :public class IntegerThriftCodec implements ThriftCodec { @Override public ThriftType getType() { return ThriftType.I32; } @Override public Integer read(TProtocol protocol) throws Exception { return protocol.readI32(); } @Override public void write(Integer value, TProtocol protocol) throws Exception { Preconditions.checkNotNull(protocol, "protocol is null"); protocol.writeI32(value); } }
关于结构体编解码器 ReflectionThriftStructCodec 前面一大篇幅都是介绍这个。
不管是哪种编解码器都是非常依赖协议的,只是编解码器做了一层抽象屏蔽了细节,方便我们使用。 协议与传输
协议TProtocal和传输组件TTransport是紧密相连的,协议内部是持有TTransport的,而TTransport可以理解为传输层,是直接与输出数据容器buffer打交道的;比如使用最多的就是TNiftyTransport,内部会持有ChannelBuffer,包含了从netty数据流中获取的ChannelBuffer和之后写到客户端的空的ChannelBuffer。
我们先简单介绍协议定义了哪些接口,然后找个接口来看如何进行数据传输的。 /** * Protocol interface definition. * */ public abstract class TProtocol { protected TTransport trans_; protected TProtocol(TTransport trans) { trans_ = trans; } private boolean serverSide; private String serviceName; // getter, setter /** * Reading methods. */ public abstract TMessage readMessageBegin() throws TException; public abstract void readMessageEnd() throws TException; public abstract TStruct readStructBegin() throws TException; public abstract void readStructEnd() throws TException; public abstract TField readFieldBegin() throws TException; public abstract void readFieldEnd() throws TException; public abstract TMap readMapBegin() throws TException; public abstract void readMapEnd() throws TException; public abstract TList readListBegin() throws TException; public abstract void readListEnd() throws TException; public abstract TSet readSetBegin() throws TException; public abstract void readSetEnd() throws TException; public abstract boolean readBool() throws TException; public abstract byte readByte() throws TException; public abstract short readI16() throws TException; public abstract int readI32() throws TException; public abstract long readI64() throws TException; public abstract double readDouble() throws TException; public abstract String readString() throws TException; public abstract ByteBuffer readBinary() throws TException; /** * Writing methods. */ // ...
里面主要是数据的读和写方法,写和读方法是对应的就不贴了。读的方法基本都会配合 TProtocolReader 来使用,写的方法基本都会配合TProtocolWriter 来使用。开始和结束消息的读取,开始读取方法参数的时候会在初始和结尾进行调用, 可以获得方法的名字和请求的序号。 开始和结束结构体的读取,在正式读取方法参数值的时候和读取完毕后进行调用,在TBinaryProtocal中可以认为是空实现,readStructEnd通常在readMessageEnd之前。 开始和结束参数的读取,每次读取一个参数都会调用,readFieldBegin返回TField表示参数名称,类型和序号,基于此获取编解码器来读取参数值,最后再调用readFieldEnd。 开始和结束集合的读取;
挑个 readI32,readString 来看在TBinaryProtocal中的使用。private byte[] i32rd = new byte[4]; public int readI32() throws TException { byte[] buf = i32rd; int off = 0; if (trans_.getBytesRemainingInBuffer() >= 4) { buf = trans_.getBuffer(); off = trans_.getBufferPosition(); trans_.consumeBuffer(4); } else { readAll(i32rd, 0, 4); } return ((buf[off] & 0xff) << 24) | ((buf[off + 1] & 0xff) << 16) | ((buf[off + 2] & 0xff) << 8) | ((buf[off + 3] & 0xff)); }
这里的trans_在前面说过,就是TNiftyTransport,由nifty包提供的。 trans_.getBytesRemainingInBuffer() 表示的是内部持有的channelBuffer剩余字节数,即bufferEnd - bufferPosition; 如果有四个字节就读取4个字节,否则读取所有;buf = trans_.getBuffer() 是获取transport内部的的buffer数组;off = trans_.getBufferPosition(); 是获取transport内部buffer当前读取到的位置,即bufferPosition;trans_.consumeBuffer(4); 则是transport内部buffer消费4个字节,即bufferPosition += 4;关于返回值,注意到16进制的0xff就是二进制的11111111,最终结果就是将四个字节拼接在一起构成一个int值
关于readByte,readShort,readLong都是类似的。
再来看 readString :public String readString() throws TException { int size = readI32(); checkStringReadLength(size); if (trans_.getBytesRemainingInBuffer() >= size) { String s = new String(trans_.getBuffer(), trans_.getBufferPosition(), size, "UTF-8"); trans_.consumeBuffer(size); return s; } return readStringBody(size); }
首先读取四个字节构成的size,表示需要读取多少byte从而来构造string。在获取buffer和position,从而从buffer的position位置读取size个字节,构造出string;最后需要移动position再返回string结果。
EN71玩具检测报告办理流程,CE认证EN71测试项目一EN71认证简介EN71是欧盟市场玩具类产品规范标准。儿童是全社会关心和爱护的群体,儿童普遍喜爱的玩具市来场发展迅猛,玩具质量问题给儿童带的伤害也时有发生,因此世界各国对本国市场
CC排序算法之冒泡排序和选择法排序文章首发于CSDN,博主HanSmileLiao链接原文链接httpsblog。csdn。netNebulaChienarticledetails120436663转眼大二了,突然
奥迪RS7和S7西装狂徒怎么选从外看是奥迪A7的外形,往内瞧却是保时捷的强劲发动机和奥迪拿手的主动底盘加48V轻混系统,这就是奥迪RS7和S7作为西装狂徒的魅力。很多朋友感兴趣性能车RS7和S7该怎么选,跟着小
车美行业服务不可或缺的新载体随着市场经济的不断提升,人民的生活需求也变得更加丰富,特别是一二线城市,快节奏多元化生活已经成为常态,在人们消费的同时更加渴望快速与便捷,快递柜正是应需而生的产物,尤其是疫情的爆发
香飘飘披露三季度财报稳底盘保增长争突破10月29日晚间,香飘飘(603711。SH)发布三季报显示,其前三季度营收为19。74亿元,同比增长4。29,实现净利润3939。55万元。第三季度营收和净利实现环比双增,其中营
奥诺资本杨科马斯克嘲讽中的逆袭转身最近经常跟人聊天,都会聊到一个人,特斯拉的马斯克。埃隆马斯克有一个能力非常厉害。这能力不是他的创新能力星链计划脑机结口把100万人送上火星特斯拉汽车不是这些。是他的抗怼能力。人都会
Python中利用遍历法在列表中查找特定元素SearchingelementfromlistbytraversalfromrandomimportFlag1timesAppear0whileFlaglistLengthint
AppStore限免Wallax一起专注胶带纸等,共5款在今天限免的iOS限免应用中,波老师精选了以下5款限免应用。复制logo上方名字即可前往AppStore下载。如遇恢复原价,则表示限免已结束,请谨慎下载。具体下载方式复制logo上
4万级别纯电动微型车横评2020年下半年随着国家发布新能源汽车下乡的鼓励政策,一批价格在3万到4万元区间的纯电动微型车随着国家及地方政策成为了热门车型。正好小星早就有所关注这个市场,只不过之前体验的奇瑞E
定了!腾讯大王卡10G通用流量每月,刷够用了实际上腾讯大王卡是腾讯和联通公司一起推出的一款手机卡套餐。之前送出过手动领取通用流量,每月1G,但是要入网12个月或者话费余额超过300块钱。一张证件只能办一张卡,定向流量40G。
上汽奥迪A7L曝光亮点抢先分析最近很多消息指向了将要上市的上汽奥迪国产A7L。作为上汽集团和奥迪品牌的全新合作,很多朋友感兴趣国产以后新一代奥迪A7L相比进口A7会有什么区别呢?随着小星来了解一下上汽奥迪A7L