上一节学习了 ChannelHandler
添加和修改的过程。 至此我们已经了解了 pipeline
和 ChannelHandlerContext
, ChannelHandler
着三者之间的关系。pipeline通过维持一个链表结构,链表节点是 ChannelHandlerContext
,该节点持有 ChannelHandler
。部分对 ChannelHandler
的操作直接暴露给 ChannelHandlerContext
,因此我们可以直接操作 ChannelHandlerContext
来间接操作 ChannelHandler
。
本节以 ChannelRead
事件为例,学习 inBound
和 outBound
事件的传播过程。总体如下图
handler
之间的传播信息通过 fireXXX
方法:其区别是从哪个节点开始传播。
ctx.fireChannelRead(msg); 从头节点HeadContext开始传播 ctx.pipeline().fireChannelRead(msg);从当前节点往下传播事件
@Override
public void channelRead
(ChannelHandlerContext ctx, Object msg) throws Exception {
//调用通道的fireChannelRead方法是从头节点HeadContext开始传播
ctx . fireChannelRead (msg);
//调用数据节点的传播方法是从当前节点往下传播事件
ctx . pipeline(). fireChannelRead(msg);
}
1.首先获取当前context的pipeline对象, 然后通过pipeline对象调用自身的fireChannelRead方法进行传播, 因为默认创建的DefaultChannelpipeline
public final ChannelPipeline fireChannelRead(Object msg) {
AbstractChannelHandlerContext.invokeChannelRead(head, msg);
return this;
}
2.调用的是AbstractChannelHandlerContext类的静态方法invokeChannelRead, 参数传入head节点和事件的消息
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRead(m);
}
else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRead(m);
}
});
}
}
3.这里的Object m m通常就是我们传入的msg, 而next, 目前是head节点, 然后再判断是否为当前eventLoop线程, 如果不是则将方法包装成task交给eventLoop线程处理。
跟到invokeChannelRead方法中:
private void invokeChannelRead(Object msg) {
if (invokeHandler()) {
try {
((ChannelInboundHandler) handler()).channelRead(this, msg);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelRead(msg);
}
}
4.首先通过invokeHandler()判断当前handler是否已添加, 如果添加, 则执行当前handler的chanelRead方法, 通过fireChannelRead方法传递事件的过程中, 其实就是找到相关handler执行其channelRead方法, 由于我们在这里的handler就是head节点, 所以我们跟到HeadContext的channelRead方法中: HeadContext的channelRead方法:
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//向下传递channelRead事件
ctx.fireChannelRead(msg);
}
5.fireChannelRead()调用了findContextInbound()通过inbound属性轮询出下一个ChannelInboundHandler。
@Override
public ChannelHandlerContext fireChannelRead(final Object msg) {
//先找到下一个节点,再执行channelRead方法
//findContextInbound : 找到下一个节点
invokeChannelRead(findContextInbound(), msg);
return this;
}
private AbstractChannelHandlerContext findContextInbound() {
AbstractChannelHandlerContext ctx = this;
//通过inbound属性轮询出下一个inboundHandlerContext
do {
ctx = ctx.next;
} while (!ctx.inbound);
return ctx;
}
6.从头节点开始,逐个往下传递并触发用户回调函数,在这过程当中,最后传到尾节点TailContext
以channelRead为例,当走到这个方法则表明,通道内未对传播的内容进行处理,并且占用的内存未释放,在尾节点打印了日志并最终释放了内存。
这种方式是直接从当前节点开始传播的。主要体现在以下,其余的步骤跟ctx.pipeline().fireChannelRead(msg);是一模一样。
第一步就直接来到了AbstractChannelHandlerContext#fireChannelRead
@Override
public ChannelHandlerContext fireChannelRead(final Object msg) {
invokeChannelRead(findContextInbound(MASK_CHANNEL_READ), msg);
return this;
}
与ctx.pipeline().fireChannelRead(msg);相比直接跳过了,传入head节点和事件的消息等操作。
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRead(m);
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRead(m);
}
});
}
}
区别如下图:
最终inbound事件的传播过程,是从头节点开始,逐个往下传递并触发用户回调函数,在这过程当中,可以手动调用pipeline的传播事件的方法,从任何一个节点开始从头开始触发传播事件,也可以直接通过ChannelHandlerContext的传播事件方法,一次从本节点开始往下传播事件。最后传到尾节点TailContext
学习outbound事件的传播,和inbound事件有相似之处。ChannelOutboundHandler的执行顺序正好和ChannelInboundHandler相反,是倒序的。 以write事件为例,进行学习研究。
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//写法1 ctx.channel().write("test data");
//写法2 ctx.write("test data");}
当然, 直接调用write方法是不能往对方channel中写入数据的, 因为这种方式只能写入到缓冲区, 还要调用flush方法才能将缓冲区数据刷到channel中, 或者直接调用writeAndFlush方法
1.跟踪ctx.channel().write("hello world");
@Override public ChannelFuture write(Object msg) {
//从pipeline开始调用
return pipeline.write(msg);
}
@Override
public final ChannelFuture write(Object msg) {
//从尾节点开始传播
return tail.write(msg);
}
@Override
public ChannelFuture write(Object msg) {
//添加一个回调Promise,包装channel和executor
return write(msg, newPromise());
}
2.最终调用到AbstractChannelHandlerContext#write()方法,主要是做了两件事
1.findContextOutbound 方法找到下一个 2.ChannelOutboundHandlerContext 判断是否需要flush,选择执行write回调方法之后是否执行flush回调方法
private void write(Object msg, boolean flush, ChannelPromise promise) {
ObjectUtil.checkNotNull(msg, "msg");
try {
if (isNotValidPromise(promise, true)) {
ReferenceCountUtil.release(msg);
// cancelled
return;
}
} catch (RuntimeException e) {
ReferenceCountUtil.release(msg);
throw e;
}
//查找下一个ChannelOutboundHandlerContext
AbstractChannelHandlerContext next = findContextOutbound();
final Object m = pipeline.touch(msg, next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
//判断是否刷新
if (flush) {
//执行写并刷新方法
next.invokeWriteAndFlush(m, promise);
} else {
//执行写方法
next.invokeWrite(m, promise);
}
} else {
final AbstractWriteTask task;
if (flush) {
task = WriteAndFlushTask.newInstance(next, m, promise);
} else {
task = WriteTask.newInstance(next, m, promise);
}
if (!safeExecute(executor, task, promise, m)) {
// We failed to submit the AbstractWriteTask. We need to cancel it so we decrement the pending bytes
// and put it back in the Recycler for re-use later.
// See https://github.com/netty/netty/issues/8343.
task.cancel();
}
}
}
3.findContextOutbound方法找到下一个ChannelOutboundHandlerContext
private AbstractChannelHandlerContext findContextOutbound() {
//循环往前查找,通过outbound属性判断
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.prev;
} while (!ctx.outbound);
return ctx;
}
4.执行write回调方法
private void invokeWrite(Object msg, ChannelPromise promise) {
//判断handler的状态是可以执行回调函数的
if (invokeHandler()) {
invokeWrite0(msg, promise);
} else {
write(msg, promise);
}
}
private void invokeWrite0(Object msg, ChannelPromise promise) {
try {
//执行回调函数write
((ChannelOutboundHandler) handler()).write(this, msg, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
}
5.invokeWriteAndFlush执行完write回调方法之后执行flush回调方法
private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
if (invokeHandler()) {
//执行write
invokeWrite0(msg, promise);
//执行flush
invokeFlush0();
} else {
writeAndFlush(msg, promise);
}
}
private void invokeFlush0() {
try {
//回调flush方法
((ChannelOutboundHandler) handler()).flush(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
}
6.通过跟踪源码,也不难发现无论是从tail节点开始还是从当前节点开始调用write方法,最终都会到head节点。而头节点正是使用unsafe来具体完成这些操作的。
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
unsafe.write(msg, promise); }
@Override
public void flush(ChannelHandlerContext ctx) {
unsafe.flush();
}
学习了 inBound
和 outBound
事件的传播过程。