首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >Netty ChannelHandler与ChannelPipeline源码解读

Netty ChannelHandler与ChannelPipeline源码解读

作者头像
良辰美景TT
发布于 2018-09-11 06:26:30
发布于 2018-09-11 06:26:30
56500
代码可运行
举报
运行总次数:0
代码可运行

ChannelHandler

  ChannelHandler基本上是我们第一次接触Netty就会碰到的对象,我们自定义的各种ChannelHandler主要用于处理我们系统的各种业务逻辑,比如发生了active事件后的处理逻辑,发生了读事件的处理逻辑,下面先来看一下ChannelHandler的类继承图:

image.png

  ChannelHandler被分为两部分,分别为ChannelOutboundHandler与ChannelInboundHandler。其中ChannelInboundHandler提供了一些方法再接收数据或Channel状态改变时被调用,ChannelOutboundHandler则提供了与网络I/O相关的方法。 同时Netty也提供了相应的Adapter,主要是为了我们编码的方便,我们可以通过继承Adapter,这样ChannelHandler里便只需要关注需要重写的方法。而不是实现所有接口的方法。

StringDecoder源码

  我们来关注一下StringDecoder这个类,StringDecoder用于对读入的数据根据指定的字符编码进行转换。StringDecoder继承MessageToMessageDecoder,而MessageToMessageDecoder继承ChannelInboundHandlerAdapter。StringDecoder便是一个典型的ChannelInboundHandler啦,先来看看MessageToMessageDecoder里都有那些内容,源码如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public abstract class MessageToMessageDecoder<I> extends ChannelInboundHandlerAdapter {

//matcher用于检验是否对msg进行Decoder
    private final TypeParameterMatcher matcher;

    /**
     * Create a new instance which will try to detect the types to match out of the type parameter of the class.
     */
    protected MessageToMessageDecoder() {
        matcher = TypeParameterMatcher.find(this, MessageToMessageDecoder.class, "I");
    }

    /**
     * Create a new instance
     *
     * @param inboundMessageType    The type of messages to match and so decode
     */
    protected MessageToMessageDecoder(Class<? extends I> inboundMessageType) {
        matcher = TypeParameterMatcher.get(inboundMessageType);
    }

    /**
     * Returns {@code true} if the given message should be handled. If {@code false} it will be passed to the next
     * {@link ChannelInboundHandler} in the {@link ChannelPipeline}.
     */
    public boolean acceptInboundMessage(Object msg) throws Exception {
        return matcher.match(msg);
    }

//可以看出MessageToMessageDecoder只对 channelRead进行了重写,这就是Adapter提供的好处
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//这里的out是个list对象
        CodecOutputList out = CodecOutputList.newInstance();
        try {
//acceptInboundMessage判断是否对msg进行解析
            if (acceptInboundMessage(msg)) {
                @SuppressWarnings("unchecked")
                I cast = (I) msg;
                try {
//这是个留给子类实现的方法啦, 也就是我们的StringDecoder里会实现的方法啦
                    decode(ctx, cast, out);
                } finally {
                    ReferenceCountUtil.release(cast);
                }
            } else {
                out.add(msg);
            }
        } catch (DecoderException e) {
            throw e;
        } catch (Exception e) {
            throw new DecoderException(e);
        } finally {
            int size = out.size();
//对out里的对象触发fireChannelRead,让其它的channelhandler处理
            for (int i = 0; i < size; i ++) {
                ctx.fireChannelRead(out.getUnsafe(i));
            }
            out.recycle();
        }
    }


    protected abstract void decode(ChannelHandlerContext ctx, I msg, List<Object> out) throws Exception;
}

MessageToMessageDecoder方法只做了两件事:1:判断当前个对象是否需要调用decode方法,2:将decode结果的对象调用fireChannelRead方法交给其它的ChannelHandler处理。StringDecoder类里的方法就更简单了,源码如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@Sharable
public class StringDecoder extends MessageToMessageDecoder<ByteBuf> {

    // TODO Use CharsetDecoder instead.
//传入字节码
    private final Charset charset;

    /**
     * Creates a new instance with the current system character set.
     */
    public StringDecoder() {
        this(Charset.defaultCharset());
    }

    /**
     * Creates a new instance with the specified character set.
     */
    public StringDecoder(Charset charset) {
        if (charset == null) {
            throw new NullPointerException("charset");
        }
        this.charset = charset;
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
//这里对msg进行处理
        out.add(msg.toString(charset));
    }
}

StringEncoder源码

  我们再来关注一下StringEncoder的处理流程,StringEncoder用于对需要写的数据进行字符编码,StringEncoder继承自MessageToMessageEncoder,而MessageToMessageEncoder又继承ChannelOutboundHandlerAdapter。下面是MessageToMessageEncoder的源码:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public abstract class MessageToMessageEncoder<I> extends ChannelOutboundHandlerAdapter {

    private final TypeParameterMatcher matcher;

    /**
     * Create a new instance which will try to detect the types to match out of the type parameter of the class.
     */
    protected MessageToMessageEncoder() {
        matcher = TypeParameterMatcher.find(this, MessageToMessageEncoder.class, "I");
    }

    /**
     * Create a new instance
     *
     * @param outboundMessageType   The type of messages to match and so encode
     */
    protected MessageToMessageEncoder(Class<? extends I> outboundMessageType) {
        matcher = TypeParameterMatcher.get(outboundMessageType);
    }

    /**
     * Returns {@code true} if the given message should be handled. If {@code false} it will be passed to the next
     * {@link ChannelOutboundHandler} in the {@link ChannelPipeline}.
     */
    public boolean acceptOutboundMessage(Object msg) throws Exception {
        return matcher.match(msg);
    }

//只需要关注这个方法啦,这里会对面要写的数据进行encode
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        CodecOutputList out = null;
        try {
//跟上面decode一样,需要验证msg能不能处理
            if (acceptOutboundMessage(msg)) {
                out = CodecOutputList.newInstance();
                @SuppressWarnings("unchecked")
                I cast = (I) msg;
                try {
//具体的encode留给子类处理
                    encode(ctx, cast, out);
                } finally {
                    ReferenceCountUtil.release(cast);
                }

                if (out.isEmpty()) {
                    out.recycle();
                    out = null;
                    throw new EncoderException(
                            StringUtil.simpleClassName(this) + " must produce at least one message.");
                }
            } else {
                ctx.write(msg, promise);
            }
        } catch (EncoderException e) {
            throw e;
        } catch (Throwable t) {
            throw new EncoderException(t);
        } finally {
//out不为空的话,就会调用ctx的witer方法触发写数据的逻辑啦
            if (out != null) {
                final int sizeMinusOne = out.size() - 1;
                if (sizeMinusOne == 0) {
                    ctx.write(out.get(0), promise);
                } else if (sizeMinusOne > 0) {
                    // Check if we can use a voidPromise for our extra writes to reduce GC-Pressure
                    // See https://github.com/netty/netty/issues/2525
                    ChannelPromise voidPromise = ctx.voidPromise();
                    boolean isVoidPromise = promise == voidPromise;
                    for (int i = 0; i < sizeMinusOne; i ++) {
                        ChannelPromise p;
                        if (isVoidPromise) {
                            p = voidPromise;
                        } else {
                            p = ctx.newPromise();
                        }
                        ctx.write(out.getUnsafe(i), p);
                    }
                    ctx.write(out.getUnsafe(sizeMinusOne), promise);
                }
                out.recycle();
            }
        }
    }

   
    protected abstract void encode(ChannelHandlerContext ctx, I msg, List<Object> out) throws Exception;
}

  MessageToMessageEncoder类里也只做了三件事:1:判断当前的对象是否需要进行encoder。2:调用子类encoder方法对对象进行encoder。3:将encoder好了的对象调用发送逻辑。下面是StringEncoder源码:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public class StringEncoder extends MessageToMessageEncoder<CharSequence> {

    // TODO Use CharsetEncoder instead.
    private final Charset charset;

    /**
     * Creates a new instance with the current system character set.
     */
    public StringEncoder() {
        this(Charset.defaultCharset());
    }

    /**
     * Creates a new instance with the specified character set.
     */
    public StringEncoder(Charset charset) {
        if (charset == null) {
            throw new NullPointerException("charset");
        }
        this.charset = charset;
    }

    @Override
    protected void encode(ChannelHandlerContext ctx, CharSequence msg, List<Object> out) throws Exception {
        if (msg.length() == 0) {
            return;
        }
//根据 charset将String转成ByteBuf对象
        out.add(ByteBufUtil.encodeString(ctx.alloc(), CharBuffer.wrap(msg), charset));
    }
}

ChannelPipeline

  ChannelPipeline用于组织ChannelHandlerContext(内部含有ChannelHandler),在Netty里采用的是双端链表的方式来管理ChannelHandlerContext。在ChannelPipeline里提供了各种对双端链表处理的方法,同时也提供了各种触发ChannelHandlerContext的方法,比如:fireChannelActive方法,下面是部分源码:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public class DefaultChannelPipeline implements ChannelPipeline {

//双端链表的head对象
    final AbstractChannelHandlerContext head;
//双端链表的tail对象
    final AbstractChannelHandlerContext tail;
//持用的channel对象
    private final Channel channel;

    private final ChannelFuture succeededFuture;
    private final VoidChannelPromise voidPromise;
    private final boolean touch = ResourceLeakDetector.isEnabled();

    private Map<EventExecutorGroup, EventExecutor> childExecutors;
    private volatile MessageSizeEstimator.Handle estimatorHandle;
    private boolean firstRegistration = true;

//的链表的未位增加一个ChannelHandler 
    public final ChannelPipeline addLast(ChannelHandler handler) {
        return addLast(null, handler);
    }

//的链表的未位增加一个ChannelHandler ,需要传入这个ChannelHandler的名称
    @Override
    public final ChannelPipeline addLast(String name, ChannelHandler handler) {
        return addLast(null, name, handler);
    }

//最终会调用到这个方法来对channelHandler处理
    @Override
    public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
        final AbstractChannelHandlerContext newCtx;
//这是一个同步方法,需要锁住这个pipeline对象
        synchronized (this) {
//参数合法性验证
            checkMultiplicity(handler);
//这里会将ChannelHandler 包装成ChannelHandlerContext对象,这也就是为什么双端链表里存的是ChannelHandlerContext啦其中filterName会对为null的name生成一个名称
            newCtx = newContext(group, filterName(name, handler), handler);
//这里才是具体处理链表的方法啦
            addLast0(newCtx);

            // If the registered is false it means that the channel was not registered on an eventloop yet.
            // In this case we add the context to the pipeline and add a task that will call
            // ChannelHandler.handlerAdded(...) once the channel is registered.

            if (!registered) {
                newCtx.setAddPending();
                callHandlerCallbackLater(newCtx, true);
                return this;
            }

            EventExecutor executor = newCtx.executor();
            if (!executor.inEventLoop()) {
                newCtx.setAddPending();
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        callHandlerAdded0(newCtx);
                    }
                });
                return this;
            }
        }
        callHandlerAdded0(newCtx);
        return this;
    }
//下面的方法是对链表进行操作的代码
    private void addLast0(AbstractChannelHandlerContext newCtx) {
        AbstractChannelHandlerContext prev = tail.prev;
        newCtx.prev = prev;
        newCtx.next = tail;
        prev.next = newCtx;
        tail.prev = newCtx;
    }

//pipeline里提供了类似fireChannelActive方法,这些方法最络会调用到channelHandler对应的方法上
    @Override
    public final ChannelPipeline fireChannelActive() {
        AbstractChannelHandlerContext.invokeChannelActive(head);
        return this;
    }
}

ChannelHandlerContext

  ChannelHandlerContext对于连接ChannelHandler与ChannelPipeline。 ChannelHandlerContext内部持有ChannelHandler对象,同时又是ChannelPipeline链表里的节点,串起了ChannelPipeline的整个逻辑,下面来看看ChannelHandlerContext最重要的类AbstractChannelHandlerContext源码:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
        implements ChannelHandlerContext, ResourceLeakHint {
//当前ChannelHandlerContext指向的下一个ChannelHandlerContext
    volatile AbstractChannelHandlerContext next;
//当前ChannelHandlerContext指向的前一个ChannelHandlerContext
    volatile AbstractChannelHandlerContext prev;
//用于标识channelHanlder是否为inbound
    private final boolean inbound;
//用于标识channelHanlder是否为outbound
    private final boolean outbound;
//同时也持胡pipeline对象
    private final DefaultChannelPipeline pipeline;
//channelHandler取的名称
    private final String name;
//是否需要排序
    private final boolean ordered;

//构造方法如下
    AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name,
                                  boolean inbound, boolean outbound) {
        this.name = ObjectUtil.checkNotNull(name, "name");
        this.pipeline = pipeline;
        this.executor = executor;
        this.inbound = inbound;
        this.outbound = outbound;
        // Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor.
        ordered = executor == null || executor instanceof OrderedEventExecutor;
    }

//这个方法是个static方法,用于给pipeline对象调用,
    static void invokeChannelActive(final AbstractChannelHandlerContext next) {
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
//会触发ChannelHandlerContext的invokeChannelActive方法
            next.invokeChannelActive();
        } else {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    next.invokeChannelActive();
                }
            });
        }
    }

//active的逻辑会调用到这个方法里
    private void invokeChannelActive() {
//确认当前channelhandler的状态 
        if (invokeHandler()) {
            try {
//最络会调用到channelhandler的channelActive方法,其中handler()方法是留给子类实现的可以看DefaultChannelHandlerContext源码部分
                ((ChannelInboundHandler) handler()).channelActive(this);
            } catch (Throwable t) {
                notifyHandlerException(t);
            }
        } else {
            fireChannelActive();
        }
    }

}

DefaultChannelHandlerContext源码就很简单了,提供了一个handler方法用于得到当前的ChannelHandler和判断当前ChannelHandler的类型。代码如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
package io.netty.channel;

import io.netty.util.concurrent.EventExecutor;

final class DefaultChannelHandlerContext extends AbstractChannelHandlerContext {

    private final ChannelHandler handler;

    DefaultChannelHandlerContext(
            DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
        super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
        if (handler == null) {
            throw new NullPointerException("handler");
        }
        this.handler = handler;
    }

    @Override
    public ChannelHandler handler() {
        return handler;
    }

    private static boolean isInbound(ChannelHandler handler) {
        return handler instanceof ChannelInboundHandler;
    }

    private static boolean isOutbound(ChannelHandler handler) {
        return handler instanceof ChannelOutboundHandler;
    }
}

ChannelHandlerContext里作为ChannelPipeline的链表节点,决定着事件是否进行向下流转,如果想让事件向下流转,只需要通过ChannelHandlerContext调用相应的fire方法就行了

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2018.07.24 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
Netty 之 ChannelPipeline 源码解析
ChannelPipleline 是 ChannelHandler 的管理容器,它内部维护了一个 ChannelHandler 的链表,可以方便的实现 ChannelHandler 的查找、添加、删除、替换、遍历等。
java404
2019/03/15
8600
Netty7# Netty之事件传递
前面的文章中写了Channel实例化、Channel初始化、Channel注册、异步通知机制、客户端发起连接、事件的轮询和处理机制。Netty作为client/server高效通信框架,事件在ChannelPipeline是如何传递的,本文就聊聊这事。
瓜农老梁
2020/12/29
7980
Netty7# Netty之事件传递
netty系列之:channelPipeline详解
我们在介绍channel的时候提到过,几乎channel中所有的实现都是通过channelPipeline进行的,作为一个pipline,它到底是如何工作的呢?
程序那些事
2022/02/28
7900
Netty源码—5.Pipeline和Handler一
可以在处理复杂的业务逻辑时避免if else的泛滥,可以实现对业务逻辑的模块化处理,不同的逻辑放置到单独的类中进行处理。最后将这些逻辑串联起来,形成一个完整的逻辑处理链。
东阳马生架构
2025/06/09
880
【Netty大动脉之ChannelPipeline】
1、Channel NioServerSocketChannel.class 每一个channel 对应唯一一个Channelpipeline
用户5640963
2020/03/19
6200
万字长文拆解Netty核心机制:ChannelHandler源码全解析
在上一篇文章中,我们深入探讨Netty框架中的NioEventLoop,分析它是如何通过高效的事件循环机制处理网络事件的
菜菜的后端私房菜
2025/02/27
3822
深入分析netty(二)ChannelPipeline
相信大家都知道了,在Netty中每个Channel 都有且仅有一个ChannelPipeline 与之对应,它们的组成关系如下:
周杰伦本人
2022/10/25
4020
深入分析netty(二)ChannelPipeline
Netty的ChannelPipline传播源码解析
上一章节 ,我们基本讲述了Netty对于新连接接入与管道数据处理的源码解析,大家可能发现,在我们前面学习中,涉及到了很多的有关pipeline的操作,在前面介绍这些的时候,我为了保证主线逻辑的清晰,基本都是一概而过,本章节将对pipeline做一个详细的讲解!
止术
2021/08/06
6110
Netty的ChannelPipline传播源码解析
第十五节 netty源码分析之客户端源码分析02
我们已经知道ChannelInitializer的添加过程其实,是会创建一个DefaultChannelHandlerContext然后添加到ChannelHandlerContext双线链表tail的前面 而我们自定义的handler就在ChannelInitializer的initChannel方法中,那么initChannel何时调用将我们的handler添加到链表中( p.addLast),下面我们开始分析 自定义 ChannelHandler 的添加过程, 发生在 AbstractUnsafe.register0 中, 在这个方法中调用了 pipeline.fireChannelRegistered()
用户1418372
2019/02/27
4820
【Netty】ChannelHandler的添加和删除(二)
主要讲述了ChannelPipeline和ChannelHandler的基本知识以及ChannelPipeline的创建,本文将学习ChannelHandler的添加和删除
用户3467126
2019/07/03
1.3K0
【Netty】ChannelHandler的添加和删除(二)
Netty Review - StringEncoder字符串编码器和StringDecoder 解码器的使用与源码解读
Netty是一个高性能的网络应用程序框架,它提供了丰富的功能,包括编解码器,这些编解码器用于在网络中发送和接收数据时进行数据的编码和解码。
小小工匠
2023/12/22
1.1K0
Netty Review - StringEncoder字符串编码器和StringDecoder 解码器的使用与源码解读
Netty源码—5.Pipeline和Handler二
Netty最大的特征之一就是ChannelHandler是可插拔的,可以动态编织ChannelPipeline。比如在客户端首次连接服务端时,需要进行权限认证,认证通过后就可以不用再认证了。下面的AuthHandler便实现了只对第一个传来的数据包进行认证校验。如果通过验证则删除此AuthHandler,这样后续传来的数据包便不会再校验了。
东阳马生架构
2025/06/09
870
第十九节 netty源码分析之 pipleline和handler以及pipeline的数据流向01
我们一路查看下去,找到重载的方法,且记住我们入参里group、和name都是null
用户1418372
2019/03/04
5500
深入Netty事件流程分析(下)
继上一篇Netty事件流程分析,本文主要讲述Netty的责任链创建,添加以及销毁流程,同时我们关注IO事件流程的分析,即监听连接事件,接收请求事件以及写出数据事件的流程,最后也会将结合channel/pipeline/handler的生命周期作一个小结.
小坤探游架构笔记
2020/05/07
8400
第十七节 netty源码分析之pipeline的来源
再看下他们的父类 AbstractChannelHandlerContext 的构造器, 分别以参数 inbound , outbound .来区分head和tail 结合他们实现的接口,header 是一个 outboundHandler, 而 tail 是一个inboundHandler。
用户1418372
2019/03/05
3620
敖丙肝了一个月的Netty知识点
高能预警,本文是我一个月前就开始写的,所以内容会非常长,当然也非常硬核,dubbo源码系列结束之后我就想着写一下netty系列的,但是netty的源码概念又非常多,所以才写到了现在。
敖丙
2021/01/08
1.1K0
敖丙肝了一个月的Netty知识点
Netty源码—4.客户端接入流程
答:boss线程第一个过程轮询出ACCEPT事件,然后boss线程第二个过程通过JDK底层Channel的accept()方法创建一条连接。
东阳马生架构
2025/05/29
1000
Netty Pipeline与ChannelHandler那些事
每个channel内部都会持有一个ChannelPipeline对象pipeline,pipeline默认实现DefaultChannelPipeline内部维护了一个DefaultChannelHandlerContext链表。
luoxn28
2019/12/12
7500
Netty Pipeline与ChannelHandler那些事
netty系列之:netty中常用的字符串编码解码器
字符串是我们程序中最常用到的消息格式,也是最简单的消息格式,但是正因为字符串string太过简单,不能附加更多的信息,所以在netty中选择的是使用byteBuf作为最底层的消息传递载体。
程序那些事
2022/05/12
6390
Netty(八)之Netty服务端启动流程
参数多传了一个DefaultSelectStrategyFactory.INSTANCE ,一个工厂类
CBeann
2023/12/25
3390
Netty(八)之Netty服务端启动流程
相关推荐
Netty 之 ChannelPipeline 源码解析
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档