netty 编解码器,netty 编解码
00-1010概述第1节: ByteToMessageDecoder我们来看看他的定义,我们来看看它的channelRead方法,我们来看看cumulator属性,我们来回到channel read方法。
00-1010我们上一章遗留了一个问题,就是如果服务器在读取客户端的数据时触发了channelRead事件,Netty是如何处理这类问题的?本章将对此进行详细分析。
在前一章中,我们学习了管道,事件在管道中传输,处理程序可以拦截和处理事件。后面分析的编解码器其实就是一个处理程序,它截取byteBuf中的字节,然后形成业务需要的数据继续传播。
编码器,通常是基于自身的OutBoundHandler,对流出的数据进行处理,所以也叫编码器,通过编码发送数据。
解码器,通常是inboundHandler,在自己的基础上处理流向自己的数据,所以也叫解码器,在使用之前对相反的数据进行解码。
同样,在netty的编码器中,半包和粘性包的问题也将得到相应的处理。
什么是半包,顾名思义就是不完整的数据包,因为netty轮询read事件时,通道中读取的数据不一定每次都是完整的数据包。在这种情况下,它被称为半包。
贴包也不难理解。如果客户端向服务器发送数据包,如果频繁发送,很可能会有多个数据包的数据发送到通道。如果服务器读取它,它可能会读取一个以上的完整数据包,这就是所谓的粘着数据包。
半袋和粘袋请参考下图:
6-0-1
Netty对半包或粘性包的处理其实很简单。通过前面的研究,我们知道每个处理程序都是唯一绑定到通道的,一个处理程序只对应一个通道。因此,通道中的数据在被读取时被解析。如果它不是一个完整的数据包,解析就会失败。保存这个包,然后用这个包组装并解析它,供下次解析使用。在完整的数据包被解析之前,该数据包将被向下传递。
流程在代码中是如何体现的?我们进入源代码分析。
目录
ByteToMessageDecoder解码器,顾名思义,是一个将字节解析成消息的解码器,
00-1010公共抽象类bytethemessagedecoder扩展ChannelInboundHandlerAdapter {//类体省略}这里继承了channelinboundhandler适配器。根据前面的研究,我们知道它是一个入站处理程序,即处理流向自身的事件的处理程序。
其次,这个类由abstract关键字修饰,说明它是一个抽象类。当我们实际使用这个类时,我们使用它的子类而不是直接使用。类定义了解码器的骨架方法,具体实现逻辑交给子类。同样,在半包处理中也是由这个类实现的。
netty中的很多解码器都实现了这个类,我们也可以通过实现这个类来定制解码器。
让我们来关注一下这个类的一个属性,
累积字节数;该属性是与半包处理相关的关键属性。从概述中我们知道,netty会保存不完整的数据包,这个数据包存储在这个属性中。
从前面的学习中我们知道,在ByteBuf读取数据之后会传递channelRead事件,在传播过程中会调用handler的channelRead方法。ByteToMessageDecoder的channelRead方法是编码的关键部分。
概述
public void channel read(ChannelHandlerContext CTX,Msg)throws exception {//如果消息是byteBuf类型if (msg instanceof ByteBuf) {//简单地把它当作一个arrayList,用来存放解析的对象CodeCoutputList Out=CodeCoutputList。new instance();try { byte buf data=(byte buf)msg;
//当前累加器为空, 说明这是第一次从io流里面读取数据 first = cumulation == null; if (first) { //如果是第一次, 则将累加器赋值为刚读进来的对象 cumulation = data; } else { //如果不是第一次, 则把当前累加的数据和读进来的数据进行累加 cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data); } //调用子类的方法进行解析 callDecode(ctx, cumulation, out); } catch (DecoderException e) { throw e; } catch (Throwable t) { throw new DecoderException(t); } finally { if (cumulation != null && !cumulation.isReadable()) { numReads = 0; cumulation.release(); cumulation = null; } else if (++ numReads >= discardAfterReads) { numReads = 0; discardSomeReadBytes(); } //记录list长度 int size = out.size(); decodeWasNull = !out.insertSinceRecycled(); //向下传播 fireChannelRead(ctx, out, size); out.recycle(); } } else { //不是byteBuf类型则向下传播 ctx.fireChannelRead(msg); }}这方法比较长, 带大家一步步剖析
首先判断如果传来的数据是ByteBuf, 则进入if块中
CodecOutputList out = CodecOutputList.newInstance()这里就当成一个ArrayList就好, 用于盛放解码完成的数据
ByteBuf data = (ByteBuf) msg这步将数据转化成ByteBuf
first = cumulation ==null这里表示如果cumulation == null, 说明没有存储板半包数据, 则将当前的数据保存在属性cumulation中
如果cumulation !=null, 说明存储了半包数据, 则通过cumulator.cumulate(ctx.alloc(), cumulation, data)将读取到的数据和原来的数据进行累加, 保存在属性cumulation中
我们看cumulator属性
private Cumulator cumulator = MERGE_CUMULATOR;
这里调用了其静态属性MERGE_CUMULATOR, 我们跟过去:
public static final Cumulator MERGE_CUMULATOR = new Cumulator() { @Override public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) { ByteBuf buffer; //不能到过最大内存 if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes() cumulation.refCnt() > 1) { buffer = expandCumulation(alloc, cumulation, in.readableBytes()); } else { buffer = cumulation; } //将当前数据buffer buffer.writeBytes(in); in.release(); return buffer; }};
这里创建了Cumulator类型的静态对象, 并重写了cumulate方法, 这里cumulate方法, 就是用于将ByteBuf进行拼接的方法:
方法中, 首先判断cumulation的写指针+in的可读字节数是否超过了cumulation的最大长度, 如果超过了, 将对cumulation进行扩容, 如果没超过, 则将其赋值到局部变量buffer中
然后将in的数据写到buffer中, 将in进行释放, 返回写入数据后的ByteBuf
回到channelRead方法中:
最后通过callDecode(ctx, cumulation, out)方法进行解码, 这里传入了Context对象, 缓冲区cumulation和集合out:
我们跟到callDecode(ctx, cumulation, out)方法中:
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { try { //只要累加器里面有数据 while (in.isReadable()) { int outSize = out.size(); //判断当前List是否有对象 if (outSize > 0) { //如果有对象, 则向下传播事件 fireChannelRead(ctx, out, outSize); //清空当前list out.clear(); //解码过程中如ctx被removed掉就break if (ctx.isRemoved()) { break; } outSize = 0; } //当前可读数据长度 int oldInputLength = in.readableBytes(); //子类实现 //子类解析, 解析玩对象放到out里面 decode(ctx, in, out); if (ctx.isRemoved()) { break; } //List解析前大小 和解析后长度一样(什么没有解析出来) if (outSize == out.size()) { //原来可读的长度==解析后可读长度 //说明没有读取数据(当前累加的数据并没有拼成一个完整的数据包) if (oldInputLength == in.readableBytes()) { //跳出循环(下次在读取数据才能进行后续的解析) break; } else { //没有解析到数据, 但是进行读取了 continue; } } //out里面有数据, 但是没有从累加器读取数据 if (oldInputLength == in.readableBytes()) { throw new DecoderException( StringUtil.simpleClassName(getClass()) + ".decode() did not read anything but decoded a message."); } if (isSingleDecode()) { break; } } } catch (DecoderException e) { throw e; } catch (Throwable cause) { throw new DecoderException(cause); }}
这里首先循环判断传入的ByteBuf是否有可读字节, 如果还有可读字节说明没有解码完成, 则循环继续解码
然后判断集合out的大小, 如果大小大于1, 说明out中盛放了解码完成之后的数据, 然后将事件向下传播, 并清空out
因为我们第一次解码out是空的, 所以这里不会进入if块, 这部分我们稍后分析, 这里继续往下看
通过intoldInputLength = in.readableBytes()获取当前ByteBuf, 其实也就是属性cumulation的可读字节数, 这里就是一个备份用于比较, 我们继续往下看:
decode(ctx, in, out)方法是最终的解码操作, 这部会读取cumulation并且将解码后的数据放入到集合out中, 在ByteToMessageDecoder中的该方法是一个抽象方法, 让子类进行实现, 我们使用的netty很多的解码都是继承了ByteToMessageDecoder并实现了decode方法从而完成了解码操作, 同样我们也可以遵循相应的规则进行自定义解码器, 在之后的小节中会讲解netty定义的解码器, 并剖析相关的实现细节, 这里我们继续往下看:
if(outSize == out.size())这个判断表示解析之前的out大小和解析之后out大小进行比较, 如果相同, 说明并没有解析出数据, 我们进入到if块中:
if(oldInputLength == in.readableBytes())表示cumulation的可读字节数在解析之前和解析之后是相同的, 说明解码方法中并没有解析数据, 也就是当前的数据并不是一个完整的数据包, 则跳出循环, 留给下次解析, 否则, 说明没有解析到数据, 但是读取了, 所以跳过该次循环进入下次循环
最后判断if(oldInputLength == in.readableBytes()), 这里代表out中有数据, 但是并没有从cumulation读数据, 说明这个out的内容是非法的, 直接抛出异常
我们回到channRead方法中
我们关注finally中的内容:
finally { if (cumulation != null && !cumulation.isReadable()) { numReads = 0; cumulation.release(); cumulation = null; } else if (++ numReads >= discardAfterReads) { numReads = 0; discardSomeReadBytes(); } //记录list长度 int size = out.size(); decodeWasNull = !out.insertSinceRecycled(); //向下传播 fireChannelRead(ctx, out, size); out.recycle();}
首先判断cumulation不为null, 并且没有可读字节, 则将累加器进行释放, 并设置为null
之后记录out的长度, 通过fireChannelRead(ctx, out, size)将channelRead事件进行向下传播, 并回收out对象
我们跟到fireChannelRead(ctx, out, size)方法中:
static void fireChannelRead(ChannelHandlerContext ctx, CodecOutputList msgs, int numElements) { //遍历List for (int i = 0; i < numElements; i ++) { //逐个向下传递 ctx.fireChannelRead(msgs.getUnsafe(i)); }}
这里遍历out集合, 并将里面的元素逐个向下传递
以上就是有关解码的骨架逻辑
更多关于Netty分布式解码器读取数据的资料请关注盛行IT其它相关文章!
郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。