前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【Netty】inBound和outBound事件的传播过程

【Netty】inBound和outBound事件的传播过程

作者头像
用户3467126
发布2019-07-03 19:10:58
3.4K0
发布2019-07-03 19:10:58
举报
文章被收录于专栏:爱编码

简介

上一节学习了 ChannelHandler添加和修改的过程。 至此我们已经了解了 pipelineChannelHandlerContextChannelHandler着三者之间的关系。pipeline通过维持一个链表结构,链表节点是 ChannelHandlerContext,该节点持有 ChannelHandler。部分对 ChannelHandler的操作直接暴露给 ChannelHandlerContext,因此我们可以直接操作 ChannelHandlerContext来间接操作 ChannelHandler

本节以 ChannelRead事件为例,学习 inBoundoutBound事件的传播过程。总体如下图

inBound事件的传播

handler之间的传播信息通过 fireXXX方法:其区别是从哪个节点开始传播。

ctx.fireChannelRead(msg); 从头节点HeadContext开始传播 ctx.pipeline().fireChannelRead(msg);从当前节点往下传播事件

代码语言:javascript
复制
@Override

public void  channelRead
(ChannelHandlerContext ctx,  Object msg) throws Exception {

//调用通道的fireChannelRead方法是从头节点HeadContext开始传播

ctx . fireChannelRead (msg);

//调用数据节点的传播方法是从当前节点往下传播事件

ctx . pipeline(). fireChannelRead(msg);

}
分析ctx.pipeline().fireChannelRead(msg);

1.首先获取当前context的pipeline对象, 然后通过pipeline对象调用自身的fireChannelRead方法进行传播, 因为默认创建的DefaultChannelpipeline

代码语言:javascript
复制
public final ChannelPipeline fireChannelRead(Object msg) {    
AbstractChannelHandlerContext.invokeChannelRead(head, msg);    
return this;
}

2.调用的是AbstractChannelHandlerContext类的静态方法invokeChannelRead, 参数传入head节点和事件的消息

代码语言:javascript
复制
    static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {        
    final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);        
    EventExecutor executor = next.executor();        
    if (executor.inEventLoop()) {            
    next.invokeChannelRead(m);        
    } 
    else {            
    executor.execute(new Runnable() {                
    @Override                
    public void run() {                    
    next.invokeChannelRead(m);                
    }            
    });        
    }    
    }

3.这里的Object m m通常就是我们传入的msg, 而next, 目前是head节点, 然后再判断是否为当前eventLoop线程, 如果不是则将方法包装成task交给eventLoop线程处理。

跟到invokeChannelRead方法中:

代码语言:javascript
复制
private void invokeChannelRead(Object msg) {    
    if (invokeHandler()) {        
        try {            
            ((ChannelInboundHandler) handler()).channelRead(this, msg);         
            } catch (Throwable t) {            
                notifyHandlerException(t);        
            }    
    } else {        
    fireChannelRead(msg);    
    }
}

4.首先通过invokeHandler()判断当前handler是否已添加, 如果添加, 则执行当前handler的chanelRead方法, 通过fireChannelRead方法传递事件的过程中, 其实就是找到相关handler执行其channelRead方法, 由于我们在这里的handler就是head节点, 所以我们跟到HeadContext的channelRead方法中: HeadContext的channelRead方法:

代码语言:javascript
复制
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {    
      //向下传递channelRead事件    
      ctx.fireChannelRead(msg);
}

5.fireChannelRead()调用了findContextInbound()通过inbound属性轮询出下一个ChannelInboundHandler。

代码语言:javascript
复制
 @Override    
 public ChannelHandlerContext fireChannelRead(final Object msg) {
         //先找到下一个节点,再执行channelRead方法
        //findContextInbound : 找到下一个节点
        invokeChannelRead(findContextInbound(), msg);
        return this;
     }
 private AbstractChannelHandlerContext findContextInbound() {
       AbstractChannelHandlerContext ctx = this;
        //通过inbound属性轮询出下一个inboundHandlerContext
        do {
             ctx = ctx.next;
         } while (!ctx.inbound);
        return ctx;
    }

6.从头节点开始,逐个往下传递并触发用户回调函数,在这过程当中,最后传到尾节点TailContext

以channelRead为例,当走到这个方法则表明,通道内未对传播的内容进行处理,并且占用的内存未释放,在尾节点打印了日志并最终释放了内存。

分析 ctx.fireChannelRead(msg);

这种方式是直接从当前节点开始传播的。主要体现在以下,其余的步骤跟ctx.pipeline().fireChannelRead(msg);是一模一样。

第一步就直接来到了AbstractChannelHandlerContext#fireChannelRead

代码语言:javascript
复制
  @Override
      public ChannelHandlerContext fireChannelRead(final Object msg) {
         invokeChannelRead(findContextInbound(MASK_CHANNEL_READ), msg);
         return this;
      }

与ctx.pipeline().fireChannelRead(msg);相比直接跳过了,传入head节点和事件的消息等操作。

代码语言:javascript
复制
 static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
        final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
        EventExecutor executor = next.executor();
         if (executor.inEventLoop()) { 
            next.invokeChannelRead(m);
         } else {
             executor.execute(new Runnable() {
                 @Override                
                 public void run() { 
                    next.invokeChannelRead(m);
                 } 
             });
            }
      }

区别如下图:

最终inbound事件的传播过程,是从头节点开始,逐个往下传递并触发用户回调函数,在这过程当中,可以手动调用pipeline的传播事件的方法,从任何一个节点开始从头开始触发传播事件,也可以直接通过ChannelHandlerContext的传播事件方法,一次从本节点开始往下传播事件。最后传到尾节点TailContext

outBound事件的传播

学习outbound事件的传播,和inbound事件有相似之处。ChannelOutboundHandler的执行顺序正好和ChannelInboundHandler相反,是倒序的。 以write事件为例,进行学习研究。

代码语言:javascript
复制
public void channelActive(ChannelHandlerContext ctx) throws Exception {
    //写法1    ctx.channel().write("test data");    
    //写法2    ctx.write("test data");}

当然, 直接调用write方法是不能往对方channel中写入数据的, 因为这种方式只能写入到缓冲区, 还要调用flush方法才能将缓冲区数据刷到channel中, 或者直接调用writeAndFlush方法

1.跟踪ctx.channel().write("hello world");

代码语言:javascript
复制
 @Override    public ChannelFuture write(Object msg) {
         //从pipeline开始调用
        return pipeline.write(msg);
      }
    @Override
    public final ChannelFuture write(Object msg) {
         //从尾节点开始传播
          return tail.write(msg);
       }
      @Override
      public ChannelFuture write(Object msg) {
         //添加一个回调Promise,包装channel和executor
         return write(msg, newPromise());
     }

2.最终调用到AbstractChannelHandlerContext#write()方法,主要是做了两件事

1.findContextOutbound 方法找到下一个 2.ChannelOutboundHandlerContext 判断是否需要flush,选择执行write回调方法之后是否执行flush回调方法

代码语言:javascript
复制
  private void write(Object msg, boolean flush, ChannelPromise promise) {
          ObjectUtil.checkNotNull(msg, "msg");
         try {
              if (isNotValidPromise(promise, true)) {
                  ReferenceCountUtil.release(msg);
                 // cancelled
                 return;
               }
           } catch (RuntimeException e) { 
              ReferenceCountUtil.release(msg);
              throw e;
          }
          //查找下一个ChannelOutboundHandlerContext
          AbstractChannelHandlerContext next = findContextOutbound();
          final Object m = pipeline.touch(msg, next);
          EventExecutor executor = next.executor();
          if (executor.inEventLoop()) {
              //判断是否刷新
              if (flush) { 
                 //执行写并刷新方法
                 next.invokeWriteAndFlush(m, promise);
             } else {
                 //执行写方法
                 next.invokeWrite(m, promise);
             }
         } else { 
            final AbstractWriteTask task;
            if (flush) { 
               task = WriteAndFlushTask.newInstance(next, m, promise);
               }  else { 
                  task = WriteTask.newInstance(next, m, promise);
              }
              if (!safeExecute(executor, task, promise, m)) {
                  // We failed to submit the AbstractWriteTask. We need to cancel it so we decrement the pending bytes
                 // and put it back in the Recycler for re-use later.
                // See https://github.com/netty/netty/issues/8343.
                task.cancel();
            } 
       } 
      }

3.findContextOutbound方法找到下一个ChannelOutboundHandlerContext

代码语言:javascript
复制
  private AbstractChannelHandlerContext findContextOutbound() {
          //循环往前查找,通过outbound属性判断
         AbstractChannelHandlerContext ctx = this;
         do { 
            ctx = ctx.prev;
         } while (!ctx.outbound);
         return ctx;
     }

4.执行write回调方法

代码语言:javascript
复制
 private void invokeWrite(Object msg, ChannelPromise promise) {
        //判断handler的状态是可以执行回调函数的
        if (invokeHandler()) {
             invokeWrite0(msg, promise);
        } else {
            write(msg, promise);
         }
    }
    private void invokeWrite0(Object msg, ChannelPromise promise) {
            try {
              //执行回调函数write
                ((ChannelOutboundHandler) handler()).write(this, msg, promise);
            } catch (Throwable t) {
              notifyOutboundHandlerException(t, promise);
            }
    }

5.invokeWriteAndFlush执行完write回调方法之后执行flush回调方法

代码语言:javascript
复制
 private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
         if (invokeHandler()) {
             //执行write
            invokeWrite0(msg, promise);
             //执行flush
              invokeFlush0();
         } else {
             writeAndFlush(msg, promise);
          }
     }
    private void invokeFlush0() {
         try {
            //回调flush方法
            ((ChannelOutboundHandler) handler()).flush(this);
          } catch (Throwable t) {
            notifyHandlerException(t);
         } 
    }

6.通过跟踪源码,也不难发现无论是从tail节点开始还是从当前节点开始调用write方法,最终都会到head节点。而头节点正是使用unsafe来具体完成这些操作的。

代码语言:javascript
复制
  @Override
          public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
             unsafe.write(msg, promise);        }
        @Override
                public void flush(ChannelHandlerContext ctx) {
            unsafe.flush(); 
        }

总结

学习了 inBoundoutBound事件的传播过程。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2019-06-19,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 爱编码 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 简介
    • inBound事件的传播
      • outBound事件的传播
        • 总结
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档