netty 并发数,netty长度编码器

  netty 并发数,netty长度编码器

  00-1010编码器概述第一节:writeAndFlush的事件传播我们来看一个最简单的使用场景。让我们遵循writeAndFlush方法。让我们遵循invokeWriteAndFlush方法。我们来看看invokeFlush0方法。

  00-1010上一章我们介绍了解码器。在本章中,我们介绍了编码器。

  其实编码器和解码器差不多。编码器也是一个处理程序,它属于outbounfHandle,就是截取要发送出去的数据,截取后做相应的处理,然后再发送。如果理解了解码器,就更容易理解编码器的相关内容。

  

目录

 

  00-1010之前我们在学习pipeline的时候,讲解了write事件的传播过程,但是在实际使用中,我们一般不会调用channel的write方法,因为这个方法只会写到发送数据的缓存中,不会直接写到channel中。如果要写入通道,需要调用flush方法。

  在实际操作中,我们更多使用的是writeAndFlush方法,该方法不仅可以将数据写入发送缓存,还可以将其刷新到通道中。

  

概述

public void channel read(ChannelHandlerContext CTX,Object msg)抛出异常{ ctx.channel()。writeAndFlush(测试数据);}学过netty的同学对此并不陌生。通过这种方式,可以将数据发送到通道,而另一方可以收到响应。

 

  00-1010将首先转到AbstractChannel中的writeAndFlush:

  public channel future writeAndFlush(Object msg){ return pipeline . writeAndFlush(msg);}继续遵循DefualtChannelPipeline 3360中的writeAndFlush方法。

  public final channel future writeAndFlush(Object msg){ return tail . writeAndFlush(msg);}这里我们看到writeAndFlush是从尾节点传过来的。我们分析过pipeline中的事件传播,相信这并不陌生。

  继续,您将在AbstractChannelHandlerContext中看到writeAndFlush方法:

  public channel future writeAndFlush(Object msg){ return writeAndFlush(msg,new promise());}继续关注:

  public channel future writeAndFlush(Object msg,channel promise promise){ if(msg==null){ throw new NullPointerException( msg );}如果(!validatePromise(promise,true)){ reference count util . release(msg);//取消退货承诺;}写(味精,真,承诺);回报承诺;}继续按照写法:

  Private Write (Object Msg,Boolean Flush,Channel Promise Promise){//FindContextOutbound()查找上一个出站节点//最后到头节点AbstractChannelHandlerContext Next=FindContextOutbound();最终对象m=pipeline.touch(msg,next);event executor executor=next . executor();if(executor . ineventloop()){ if(flush)

   { next.invokeWriteAndFlush(m, promise); } else { //没有调flush next.invokeWrite(m, promise); } } else { AbstractWriteTask task; if (flush) { task = WriteAndFlushTask.newInstance(next, m, promise); } else { task = WriteTask.newInstance(next, m, promise); } safeExecute(executor, task, promise, m); }}这里的逻辑我们也不陌生,找到下一个节点,因为writeAndFlush是从tail节点开始的,并且是outBound的事件,所以这里会找到tail节点的上一个outBoundHandler,有可能是编码器,也有可能是我们业务处理的handler

  if(executor.inEventLoop())判断是否是eventLoop线程,如果不是,则封装成task通过nioEventLoop异步执行,我们这里先按照是eventLoop线程分析

  首先,这里通过flush判断是否调用了flush,这里显然是true,因为我们调用的方法是writeAndFlush

  

 

  

我们跟到invokeWriteAndFlush中

private void invokeWriteAndFlush(Object msg, ChannelPromise promise) { if (invokeHandler()) { //写入 invokeWrite0(msg, promise); //刷新 invokeFlush0(); } else { writeAndFlush(msg, promise); }}

这里就真相大白了,其实在writeAndFlush中,首先调用write, write完成之后再调用flush方法进行的刷新

 

  首先跟到invokeWrite0方法中:

  

private void invokeWrite0(Object msg, ChannelPromise promise) { try { //调用当前handler的wirte()方法 ((ChannelOutboundHandler) handler()).write(this, msg, promise); } catch (Throwable t) { notifyOutboundHandlerException(t, promise); }}

该方法我们在pipeline中已经进行过分析,就是调用当前handler的write方法,如果当前handler中write方法是继续往下传播,在会继续传播写事件,直到传播到head节点,最后会走到HeadContext的write方法中

 

  跟到HeadContext的write方法中:

  

public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { unsafe.write(msg, promise);}

这里通过当前channel的unsafe对象对将当前消息写到缓存中,写入的过程,我们之后的小节进行分析

 

  回到到invokeWriteAndFlush方法中:

  

private void invokeWriteAndFlush(Object msg, ChannelPromise promise) { if (invokeHandler()) { //写入 invokeWrite0(msg, promise); //刷新 invokeFlush0(); } else { writeAndFlush(msg, promise); }}

 

  

我们再看invokeFlush0方法

private void invokeFlush0() { try { ((ChannelOutboundHandler) handler()).flush(this); } catch (Throwable t) { notifyHandlerException(t); }}

同样,这里会调用当前handler的flush方法,如果当前handler的flush方法是继续传播flush事件,则flush事件会继续往下传播,直到最后会调用head节点的flush方法,如果我们熟悉pipeline的话,对这里的逻辑不会陌生

 

  跟到HeadContext的flush方法中:

  

public void flush(ChannelHandlerContext ctx) throws Exception { unsafe.flush();}

这里同样,会通过当前channel的unsafe对象通过调用flush方法将缓存的数据刷新到channel中,有关刷新的逻辑,我们会在以后的小节进行剖析

 

  以上就是writeAndFlush的相关逻辑,整体上比较简单,熟悉pipeline的同学应该很容易理解

  更多关于Netty分布式编码器及写数据事件的资料请关注盛行IT其它相关文章!

郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。

留言与评论(共有 条评论)
   
验证码: