netty系列之netty中的核心MessageToMessage编码器
简介
在netty中我们需要传递各种类型的消息,这些message可以是字符串,可以是数组,也可以是自定义的对象。不同的对象之间可能需要互相转换,这样就需要一个可以自由进行转换的转换器,为了统一编码规则和方便用户的扩展,netty提供了一套消息之间进行转换的框架。本文将会讲解这个框架的具体实现。框架简介
netty为消息和消息之间的转换提供了三个类,这三个类都是抽象类,分别是MessageToMessageDecoder,MessageToMessageEncoder和MessageToMessageCodec。
先来看下他们的定义:public abstract class MessageToMessageEncoder extends ChannelOutboundHandlerAdapter 复制代码public abstract class MessageToMessageDecoder extends ChannelInboundHandlerAdapter 复制代码public abstract class MessageToMessageCodec extends ChannelDuplexHandler 复制代码
MessageToMessageEncoder继承自ChannelOutboundHandlerAdapter,负责向channel中写消息。
MessageToMessageDecoder继承自ChannelInboundHandlerAdapter,负责从channel中读取消息。
MessageToMessageCodec继承自ChannelDuplexHandler,它是一个双向的handler,可以从channel中读取消息,也可以向channel中写入消息。
有了这三个抽象类,我们再看下这三个类的具体实现。MessageToMessageEncoder
先看一下消息的编码器MessageToMessageEncoder,编码器中最重要的方法就是write,看下write的实现: public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { CodecOutputList out = null; try { if (acceptOutboundMessage(msg)) { out = CodecOutputList.newInstance(); @SuppressWarnings("unchecked") I cast = (I) msg; try { encode(ctx, cast, out); } finally { ReferenceCountUtil.release(cast); } if (out.isEmpty()) { throw new EncoderException( StringUtil.simpleClassName(this) + " must produce at least one message."); } } else { ctx.write(msg, promise); } } catch (EncoderException e) { throw e; } catch (Throwable t) { throw new EncoderException(t); } finally { if (out != null) { try { final int sizeMinusOne = out.size() - 1; if (sizeMinusOne == 0) { ctx.write(out.getUnsafe(0), promise); } else if (sizeMinusOne > 0) { if (promise == ctx.voidPromise()) { writeVoidPromise(ctx, out); } else { writePromiseCombiner(ctx, out, promise); } } } finally { out.recycle(); } } } } 复制代码
write方法接受一个需要转换的原始对象msg,和一个表示channel读写进度的ChannelPromise。
首先会对msg进行一个类型判断,这个判断方法是在acceptOutboundMessage中实现的。 public boolean acceptOutboundMessage(Object msg) throws Exception { return matcher.match(msg); } 复制代码
这里的matcher是一个TypeParameterMatcher对象,它是一个在MessageToMessageEncoder构造函数中初始化的属性: protected MessageToMessageEncoder() { matcher = TypeParameterMatcher.find(this, MessageToMessageEncoder.class, "I"); } 复制代码
这里的I就是要匹配的msg类型。
如果不匹配,则继续调用ctx.write(msg, promise); 将消息不做任何转换的写入到channel中,供下一个handler调用。
如果匹配成功,则会调用核心的encode方法:encode(ctx, cast, out);
注意,encode方法在MessageToMessageEncoder中是一个抽象方法,需要用户在继承类中自行扩展。
encode方法实际上是将msg对象转换成为要转换的对象,然后添加到out中。这个out是一个list对象,具体而言是一个CodecOutputList对象,作为一个list,out是一个可以存储多个对象的列表。
那么out是什么时候写入到channel中去的呢?
别急,在write方法中最后有一个finally代码块,在这个代码块中,会将out写入到channel里面。
因为out是一个List,可能会出现out中的对象部分写成功的情况,所以这里需要特别处理。
首先判断out中是否只有一个对象,如果是一个对象,那么直接写到channel中即可。如果out中多于一个对象,那么又分成两种情况,第一种情况是传入的promise是一个voidPromise,那么调用writeVoidPromise方法。
什么是voidPromise呢?
我们知道Promise有多种状态,可以通过promise的状态变化了解到数据写入的情况。对于voidPromise来说,它只关心一种失败的状态,其他的状态都不关心。
如果用户关心promise的其他状态,则会调用writePromiseCombiner方法,将多个对象的状态合并为一个promise返回。
事实上,在writeVoidPromise和writePromiseCombiner中,out中的对象都是一个一个的取出来,写入到channel中的,所以才会生成多个promise和需要将promise进行合并的情况: private static void writeVoidPromise(ChannelHandlerContext ctx, CodecOutputList out) { final ChannelPromise voidPromise = ctx.voidPromise(); for (int i = 0; i < out.size(); i++) { ctx.write(out.getUnsafe(i), voidPromise); } } private static void writePromiseCombiner(ChannelHandlerContext ctx, CodecOutputList out, ChannelPromise promise) { final PromiseCombiner combiner = new PromiseCombiner(ctx.executor()); for (int i = 0; i < out.size(); i++) { combiner.add(ctx.write(out.getUnsafe(i))); } combiner.finish(promise); } 复制代码MessageToMessageDecoder
和encoder对应的就是decoder了,MessageToMessageDecoder的逻辑和MessageToMessageEncoder差不多。
首先也是需要判断读取的消息类型,这里也定义了一个TypeParameterMatcher对象,用来检测传入的消息类型: protected MessageToMessageDecoder() { matcher = TypeParameterMatcher.find(this, MessageToMessageDecoder.class, "I"); } 复制代码
decoder中重要的方法是channelRead方法,我们看下它的实现: public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { CodecOutputList out = CodecOutputList.newInstance(); try { if (acceptInboundMessage(msg)) { @SuppressWarnings("unchecked") I cast = (I) msg; try { decode(ctx, cast, out); } finally { ReferenceCountUtil.release(cast); } } else { out.add(msg); } } catch (DecoderException e) { throw e; } catch (Exception e) { throw new DecoderException(e); } finally { try { int size = out.size(); for (int i = 0; i < size; i++) { ctx.fireChannelRead(out.getUnsafe(i)); } } finally { out.recycle(); } } } 复制代码
首先检测msg的类型,只有接受的类型才进行decode处理,否则将msg加入到CodecOutputList中。
最后在finally代码块中将out中的对象一个个取出来,调用ctx.fireChannelRead进行读取。
消息转换的关键方法是decode,这个方法也是一个抽象方法,需要在继承类中实现具体的功能。MessageToMessageCodec
前面讲解了一个编码器和一个解码器,他们都是单向的。最后要讲解的codec叫做MessageToMessageCodec,这个codec是一个双向的,即可以接收消息,也可以发送消息。
先看下它的定义:public abstract class MessageToMessageCodec extends ChannelDuplexHandler 复制代码
MessageToMessageCodec继承自ChannelDuplexHandler,接收两个泛型参数分别是INBOUND_IN和OUTBOUND_IN。
它定义了两个TypeParameterMatcher,分别用来过滤inboundMsg和outboundMsg: protected MessageToMessageCodec() { inboundMsgMatcher = TypeParameterMatcher.find(this, MessageToMessageCodec.class, "INBOUND_IN"); outboundMsgMatcher = TypeParameterMatcher.find(this, MessageToMessageCodec.class, "OUTBOUND_IN"); } 复制代码
分别实现了channelRead和write方法,用来读写消息: @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { decoder.channelRead(ctx, msg); } @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { encoder.write(ctx, msg, promise); } 复制代码
这里的decoder和encoder实际上就是前面我们讲到的MessageToMessageDecoder和MessageToMessageEncoder: private final MessageToMessageEncoder
ITX夏日能否HOLD得住9900K风水结合LD03小钢炮装机展示说在前面其实对于这次装机完全是自己出于新奇心的一次瞎折腾,个人真正开始热衷于折腾电脑硬件就是始于银欣的这个结构机箱,初款的乌鸦1对于当初的机箱产业来说可谓是惊艳,颠覆传统机箱的结构
关于海燕T241收音机的打理改造想法国庆期间把家父这台T241打理完毕,遂产生了一些改机想法,目前还只是想法,将会不日开工,付诸实践,让老机更好的发挥余热,成功后再详细发文总结。交流声重。该机型使用220V供电,变压
德仕博1780简评与改造我之前对德仕博这个品牌毫无所知,猜想应该是东莞的某个小厂,1780应该是出口转内销的机型,因为我看到过国外同款马甲机型,再加上是DSP机,所以并不怎么关注。但是身边却一直有朋友推荐
索尼SW77的打理维修经验SW77是索尼90年代初的高端数调机,无论从外观,功能还是内部电路工艺都充满了设计感,有些技术甚至可以说是激进的,贴片电解电容在收音机上的使用就是一个例子。贴片电容体积小,可以提高
索尼SW77调幅AGC的起控点调整我之前有发过SW77的电路分析,文中提到它的调幅电路有完善的AGC控制措施,可以根据电台信号强度自动调整高放级增益而不是简单的衰减信号,这种手段似乎更科学合理一些。它的控制信号取自
SONYICF5900W电路分析前些天我分享了松下名机RF2200的电路特点,由此自然而然想到了它同期的对飙机型5波段的索尼ICF5900(W)。两台机同属于BCL经典机,同发布于1975年前后只相差几个月。作为
索尼CRF160电路分析索尼的便携机系列被命名为ICF,口袋耳机随身听为SRF,CRF系列是桌面级的专业收音机,因此灵敏度和选择性,音质等指标都堪称优秀。这台13波段的CRF160发布于70年代初,采用全
一台FM立体声收音机的制作手头多年积攒的各种零件实在太多,扔了又可惜,只能想办法利用了,于是有了这台调频立体声收音机。外壳是网上买的防水盒,价格便宜记得只要十来块钱但是光泽度好坚固耐用,最大好处是易于手工加
索尼便携机的同步检波单边带解调分析索尼常见的几款经典数调便携机均支持同步检波和单边带解调,例如SW77,SW07,SW7600G,SW7600GR,SW100,EX5等。当年这些高端功能在模拟便携机上实现是很不容易
德生PL660的中波假响应和啸叫分析我之前陆陆续续发了一些德生PL660中波某频点假响应或啸叫的现象视频和初步分析,随着验证的深入,这里总结整理一下我发现这个问题的来龙去脉和原因分析。起初发现啸叫问题,我还怀疑是我这
德生PL600电路分析德生PL600电路分析我之前写过德生9700DX,450,550,660,330,990,501,S2000的电路分析文章,因为它们都极具代表性9700DX五年磨一剑有着漂亮的刻度