Loading [MathJax]/jax/output/CommonHTML/config.js
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >Netty源码—8.编解码原理二

Netty源码—8.编解码原理二

原创
作者头像
东阳马生架构
发布于 2025-06-13 06:30:43
发布于 2025-06-13 06:30:43
11900
代码可运行
举报
文章被收录于专栏:Netty应用与源码Netty应用与源码
运行总次数:0
代码可运行

大纲

1.读数据入口

2.拆包原理

3.ByteToMessageDecoder解码步骤

4.解码器抽象的解码过程总结

5.Netty里常见的开箱即用的解码器

6.writeAndFlush()方法的大体步骤

7.MessageToByteEncoder的编码步骤

8.unsafe.write()写队列

9.unsafe.flush()刷新写队列

10.如何把对象变成字节流写到unsafe底层

6.writeAndFlush()方法的大体步骤

(1)writeAndFlush()方法的调用入口

(2)writeAndFlush()方法的执行流程

(1)writeAndFlush()方法的调用入口

入口通常是:ctx.channel().writeAndFlush()。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
    //网络连接tcp三次握手后,就会建立和封装一个Channel(网络连接的通信管道)
    //此时这个Channel就可以实现一个激活
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("Channel Active......");
        ctx.channel().writeAndFlush("test5");
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("Channel Read: " + (String)msg);
        String response = "Hello World......";
        ByteBuf responseByteBuf = Unpooled.buffer();
        responseByteBuf.writeBytes(response.getBytes());

        ctx.channel().writeAndFlush(responseByteBuf);
        System.out.println("Channel Write: " + response);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        System.out.println("Channel Read Complete......");
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

(2)writeAndFlush()方法的执行流程

首先从tail结点开始往前传播。然后逐个调用ChannelHandler的write()方法,直到某个ChannelHandler不再往前传播write事件。接着逐个调用ChannelHandler的flush()方法,直到某个ChannelHandler不再往前传播flush事件。

一般而言,只要每个ChannelHandler都往下传播write事件和flush事件,那么最后都会传播到HeadContext结点的write()方法和flush()方法,然后分别执行unsafe.write()和unsafe.flush()将数据通过底层的unsafe写到JDK底层的Channel。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
    private final DefaultChannelPipeline pipeline;
    ...
    @Override
    public ChannelFuture writeAndFlush(Object msg) {
        return pipeline.writeAndFlush(msg);
    }
    ...
}

public class DefaultChannelPipeline implements ChannelPipeline {
    final AbstractChannelHandlerContext head;
    final AbstractChannelHandlerContext tail;
    private final Channel channel;
    
    protected DefaultChannelPipeline(Channel channel) {
        this.channel = ObjectUtil.checkNotNull(channel, "channel");
        tail = new TailContext(this);
        head = new HeadContext(this);
        head.next = tail;
        tail.prev = head;
    }
    
    @Override
    public final ChannelFuture writeAndFlush(Object msg) {
        //从TailContext开始传播
        //但TailContext没有重写writeAndFlush()方法
        //所以会调用AbstractChannelHandlerContext的writeAndFlush()方法
        return tail.writeAndFlush(msg);
    }
    ...
}

abstract class AbstractChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext, ResourceLeakHint {
    volatile AbstractChannelHandlerContext next;
    volatile AbstractChannelHandlerContext prev;
    ...
    @Override
    public ChannelFuture writeAndFlush(Object msg) {
        return writeAndFlush(msg, newPromise());
    }
    
    @Override
    public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
        if (msg == null) throw new NullPointerException("msg");
        if (!validatePromise(promise, true)) {
            ReferenceCountUtil.release(msg);
            return promise;
        }
        write(msg, true, promise);
        return promise;
    }
    
    private void write(Object msg, boolean flush, ChannelPromise promise) {
        //反向遍历链表进行查找
        AbstractChannelHandlerContext next = findContextOutbound();
        final Object m = pipeline.touch(msg, next);
        EventExecutor executor = next.executor();
        //最终都会由Reactor线程处理Channel的数据读写
        if (executor.inEventLoop()) {
            if (flush) {
                //调用结点的invokeWriteAndFlush()方法
                next.invokeWriteAndFlush(m, promise);
            } else {
                next.invokeWrite(m, promise);
            }
        } else {
            AbstractWriteTask task;
            if (flush) {
                task = WriteAndFlushTask.newInstance(next, m, promise);
            }  else {
                task = WriteTask.newInstance(next, m, promise);
            }
            safeExecute(executor, task, promise, m);
        }
    }
    
    private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
        if (invokeHandler()) {
            //逐个调用ChannelHandler结点的write()方法,但前提是当前ChannelHandler可以往下传
            //即write()方法在最后也像ChannelOutboundHandlerAdapter那样,调用了ctx.write()往下传播
            invokeWrite0(msg, promise);
            //逐个调用ChannelHandler结点的flush()方法,但前提是当前ChannelHandler可以往下传
            //即flush()方法在最后也像ChannelOutboundHandlerAdapter那样,调用了ctx.flush()往下传播
            invokeFlush0();
        } else {
            writeAndFlush(msg, promise);
        }
    }
    
    private void invokeWrite0(Object msg, ChannelPromise promise) {
        try {
            //逐个调用,最终回到HeadContext的write()方法
            ((ChannelOutboundHandler) handler()).write(this, msg, promise);
        } catch (Throwable t) {
            notifyOutboundHandlerException(t, promise);
        }
    }
    
    private void invokeFlush0() {
        try {
            //逐个调用,最终回到HeadContext的flush()方法
            ((ChannelOutboundHandler) handler()).flush(this);
        } catch (Throwable t) {
            notifyHandlerException(t);
        }
    }
    ...
}

public class DefaultChannelPipeline implements ChannelPipeline {
    ...
    final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler {
        private final Unsafe unsafe;
        HeadContext(DefaultChannelPipeline pipeline) {
            super(pipeline, null, HEAD_NAME, false, true);
            unsafe = pipeline.channel().unsafe();
            setAddComplete();
        }
        ...
        @Override
        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
            unsafe.write(msg, promise);
        }

        @Override
        public void flush(ChannelHandlerContext ctx) throws Exception {
            unsafe.flush();
        }
    }
    ...
}

//Skeleton implementation of a ChannelOutboundHandler. This implementation just forwards each method call via the ChannelHandlerContext.
public class ChannelOutboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelOutboundHandler {
    //Calls ChannelHandlerContext#bind(SocketAddress, ChannelPromise) to forward to the next ChannelOutboundHandler in the ChannelPipeline.
    @Override
    public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {
        ctx.bind(localAddress, promise);
    }

    //Calls ChannelHandlerContext#connect(SocketAddress, SocketAddress, ChannelPromise) to forward to the next ChannelOutboundHandler in the ChannelPipeline. 
    @Override
    public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception {
        ctx.connect(remoteAddress, localAddress, promise);
    }

    //Calls ChannelHandlerContext#disconnect(ChannelPromise) to forward to the next ChannelOutboundHandler in the ChannelPipeline.
    @Override
    public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
        ctx.disconnect(promise);
    }

    //Calls ChannelHandlerContext#close(ChannelPromise) to forward to the next ChannelOutboundHandler in the ChannelPipeline.
    @Override
    public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
        ctx.close(promise);
    }

    //Calls ChannelHandlerContext#deregister(ChannelPromise) to forward to the next ChannelOutboundHandler in the ChannelPipeline.
    @Override
    public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
        ctx.deregister(promise);
    }

    //Calls ChannelHandlerContext#read() to forward to the next ChannelOutboundHandler in the ChannelPipeline.
    @Override
    public void read(ChannelHandlerContext ctx) throws Exception {
        ctx.read();
    }

    //Calls ChannelHandlerContext#write(Object, ChannelPromise)} to forward to the next ChannelOutboundHandler in the ChannelPipeline.
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        ctx.write(msg, promise);
    }

    //Calls ChannelHandlerContext#flush() to forward to the next ChannelOutboundHandler in the ChannelPipeline.
    @Override
    public void flush(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }
}

7.MessageToByteEncoder的编码步骤

(1)编码的具体步骤

(2)编码步骤的总结

(3)子类实现编码的例子

(1)编码的具体步骤

步骤一:判断对象

判断当前ChannelHandler结点能否处理写入传入的Java对象。如果能处理,则往下执行,否则直接传递给下一个ChannelHandler结点进行处理。

步骤二:分配内存

给新创建的ByteBuf对象分配一块内存空间,这块内存空间将会存放由Java对象转换来的字节数据。

步骤三:调用encode

子类会实现MessageToByteEncoder的抽象方法encode()来定义自己的编码协议,子类的encode()方法会将Java对象转换来的字节数据写入ByteBuf。

步骤四:释放对象

由于传入的Java对象已经转换成ByteBuf字节流了,所以传入的Java对象已不再使用可进行释放。

步骤五:传播数据

当子类的encode()方法将数据写入了ByteBuf对象以及释放完对象之后,则会往前一个ChannelHandler结点传播该ByteBuf对象,否则往前一个ChannelHandler结点传播空对象。

步骤六:释放内存

如果出现异常或者ByteBuf没有写入数据或者ByteBuf在pipeline中已处理完,则释放分配给ByteBuf对象的内存。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
//ChannelOutboundHandlerAdapter which encodes message in a stream-like fashion from one message to an ByteBuf.
//Example implementation which encodes Integers to a ByteBuf.
//public class IntegerEncoder extends MessageToByteEncoder<Integer> {
//    @code @Override
//    public void encode(ChannelHandlerContext ctx, Integer msg, ByteBuf out) throws Exception {
//        out.writeInt(msg);
//    }
//}
public abstract class MessageToByteEncoder<I> extends ChannelOutboundHandlerAdapter {
    private final TypeParameterMatcher matcher;
    private final boolean preferDirect;

    protected MessageToByteEncoder() {
        this(true);
    }
    
    protected MessageToByteEncoder(Class<? extends I> outboundMessageType) {
        this(outboundMessageType, true);
    }

    //Create a new instance which will try to detect the types to match out of the type parameter of the class.
    //@param preferDirect,true if a direct ByteBuf should be tried to be used as target for the encoded messages. 
    //If false is used it will allocate a heap ByteBuf, which is backed by an byte array.
    protected MessageToByteEncoder(boolean preferDirect) {
        matcher = TypeParameterMatcher.find(this, MessageToByteEncoder.class, "I");
        this.preferDirect = preferDirect;
    }

    //Create a new instance
    //@param outboundMessageType,The tpye of messages to match
    //@param preferDirect,true if a direct ByteBuf should be tried to be used as target for the encoded messages. 
    //If false is used it will allocate a heap ByteBuf, which is backed by an byte array.
    protected MessageToByteEncoder(Class<? extends I> outboundMessageType, boolean preferDirect) {
        matcher = TypeParameterMatcher.get(outboundMessageType);
        this.preferDirect = preferDirect;
    }

    //Returns true if the given message should be handled. 
    //If false it will be passed to the next ChannelOutboundHandler in the ChannelPipeline.
    public boolean acceptOutboundMessage(Object msg) throws Exception {
        return matcher.match(msg);
    }

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        ByteBuf buf = null;
        try {
            //步骤一:判断当前ChannelHandler能否处理写入的消息
            if (acceptOutboundMessage(msg)) {
                @SuppressWarnings("unchecked")
                I cast = (I) msg;//强制转换
                //步骤二:给ByteBuf对象分配内存
                buf = allocateBuffer(ctx, cast, preferDirect);
                try {
                    //步骤三:调用子类实现的encode()方法
                    encode(ctx, cast, buf);
                } finally {
                    //步骤四:释放对象
                    //既然自定义的Java对象msg已经转换为ByteBuf对象了,那么该对象已经没有用,需要释放掉了
                    //注意:当传入的msg的类型是ByteBuf类型时,则不需要释放
                    ReferenceCountUtil.release(cast);
                }
                //步骤五:如果buf中写入了数据,就把buf传到下一个ChannelHandler结点
                if (buf.isReadable()) {
                    ctx.write(buf, promise);
                } else {
                    //步骤六:如果buf中没有写入数据,则释放buf,并将一个空数据传到下一个ChannelHandler结点
                    buf.release();
                    ctx.write(Unpooled.EMPTY_BUFFER, promise);
                }
                buf = null;
            } else {
                ctx.write(msg, promise);
            }
        } catch (EncoderException e) {
            throw e;
        } catch (Throwable e) {
            throw new EncoderException(e);
        } finally {
            if (buf != null) {
                buf.release();//当buf在pipeline中处理完了,需要进行释放
            }
        }
    }

    //Allocate a ByteBuf which will be used as argument of #encode(ChannelHandlerContext, I, ByteBuf).
    //Sub-classes may override this method to returna ByteBuf with a perfect matching initialCapacity.
    protected ByteBuf allocateBuffer(ChannelHandlerContext ctx, @SuppressWarnings("unused") I msg, boolean preferDirect) throws Exception { 
        if (preferDirect) {
            return ctx.alloc().ioBuffer();
        } else {
            return ctx.alloc().heapBuffer();
        }
    }

    //Encode a message into a ByteBuf. 
    //This method will be called for each written message that can be handled by this encoder.
    //@param ctx,the ChannelHandlerContext which this MessageToByteEncoder belongs to
    //@param msg,the message to encode
    //@param out,the ByteBuf into which the encoded message will be written
    //@throws Exception,is thrown if an error accour
    protected abstract void encode(ChannelHandlerContext ctx, I msg, ByteBuf out) throws Exception;
}

(2)编码步骤的总结

在MessageToByteEncoder的编码过程中,首先会判断当前ChannelHandler能否处理传入的Java对象,如果能处理就对新创建的ByteBuf对象分配一块内存空间。然后由子类的encode()方法实现具体的编码协议,并且把编码后的数据存放到分配给ByteBuf对象的内存空间中。最后把ByteBuf对象往前一个ChannelHandler结点进行传播。

如果在编码的过程中出现异常,那么就把已申请出来的、分配给ByteBuf对象的内存空间进行释放。

如果传入的Java对象就是一个ByteBuf对象,那么Netty在自定义编码结束后,会自动帮忙释放该对象,不需要在子类中对该对象进行释放。

(3)子类实现编码的例子

下面的Encoder便实现了将自定义的Response对象转换为字节流并写到Socket底层的效果。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public class Encoder extends MessageToByteEncoder<Response> {
    protected void encode(ChannelHandlerContext ctx, Response response, ByteBuf out) throws Exception {
        out.writeByte(response.getVersion());
        out.writeInt(4+ response.getData().length);
        out.writeBytes(response.getData());    
    }
}

8.unsafe.write()将数据添加到写缓冲区

(1)unsafe.write()的入口

(2)unsafe.write()的主要逻辑

(3)写缓冲区(写队列)的数据结构

(1)unsafe.write()的入口

不管是ctx.channel().write()还是ctx.write(),最终都会来到pipeline中的head结点。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public class DefaultChannelPipeline implements ChannelPipeline {
    ...
    final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler {
        private final Unsafe unsafe;
        HeadContext(DefaultChannelPipeline pipeline) {
            super(pipeline, null, HEAD_NAME, false, true);
            unsafe = pipeline.channel().unsafe();
            setAddComplete();
        }
        ...
        @Override
        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
            unsafe.write(msg, promise);
        }

        @Override
        public void flush(ChannelHandlerContext ctx) throws Exception {
            unsafe.flush();
        }
    }
    ...
}

(2)unsafe.write()的主要逻辑

unsafe.write()方法将数据添加到写缓冲区(写队列)的主要逻辑如下。

一.Direct化ByteBuf对象

如果传进来的ByteBuf对象不是堆外内存,那么就直接转换成堆外内存,并且估算出其大小。

二.添加到写缓冲区

转换成堆外内存的ByteBuf对象首先会被封装成一个Entry对象,然后再将该Entry对象添加到写缓冲区,其中会通过几个指针来标识写缓冲区的状态。

三.设置写状态

如果内存不足,那么是不可以一直往写缓冲区里添加ByteBuf对象的。如果写缓冲区已经大于默认的64KB的大小,则会通过自旋 + CAS设置当前Channel为不可写状态。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
    private final DefaultChannelPipeline pipeline;
    ...
    protected abstract class AbstractUnsafe implements Unsafe {
        //写缓冲区(写队列)
        private volatile ChannelOutboundBuffer outboundBuffer = new ChannelOutboundBuffer(AbstractChannel.this);
        ...
        @Override
        public final void write(Object msg, ChannelPromise promise) {
            //确保该方法的调用是在Reactor线程中
            assertEventLoop();
            //写缓冲区
            ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
            ...
            int size;
            try {
                //转换成堆外内存
                msg = filterOutboundMessage(msg);
                //估算出需要写入的ByteBuf的size
                size = pipeline.estimatorHandle().size(msg);
                if (size < 0) {
                    size = 0;
                }
            } catch (Throwable t) {
                safeSetFailure(promise, t);
                ReferenceCountUtil.release(msg);
                return;
            }
            //将转换成堆外内存的msg添加到写缓冲区outboundBuffer
            outboundBuffer.addMessage(msg, size, promise);
        }
        ...
    }    
    ...
}

public abstract class AbstractNioByteChannel extends AbstractNioChannel {
    ...
    @Override
    protected final Object filterOutboundMessage(Object msg) {
        if (msg instanceof ByteBuf) {
            ByteBuf buf = (ByteBuf) msg;
            if (buf.isDirect()) {
                return msg;
            }
            return newDirectBuffer(buf);
        }
        if (msg instanceof FileRegion) {
            return msg;
        }
        throw new UnsupportedOperationException("unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES); 
    }
    ...
}

(3)写缓冲区(写队列)的数据结构

ChannelOutboundBuffer里的数据结构是一个单向链表,单向链表的每个结点都是一个Entry对象。在一个Entry对象中会包含着待写出的ByteBuf对象及消息回调promise。flushedEntry指针表示第一个被写入Socket缓冲区的结点,unflushedEntry指针表示第一个未被写入Socket缓冲区的结点,tailEntry指针表示ChannelOutboundBuffer缓冲区的最后一个结点。

初次调用ChannelOutboundBuffer的addMessage()方法后,flushedEntry指针指向NULL,unflushedEntry指针和tailEntry指针都指向新添加的结点。调用多次ChannelOutboundBuffer的addMessage()方法后,如果flushedEntry指针一直指向NULL,则表示现在还没有结点的ByteBuf对象写出到Socket缓冲区。如果unflushedEntry指针之后有n个结点,则表示当前还有n个结点的ByteBuf对象还没写出到Socket缓冲区。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public final class ChannelOutboundBuffer {
    private final Channel channel;
    
    //Entry(flushedEntry) --> ... Entry(unflushedEntry) --> ... Entry(tailEntry)
    //The Entry that is the first in the linked-list structure that was flushed
    private Entry flushedEntry;
    //The Entry which is the first unflushed in the linked-list structure
    private Entry unflushedEntry;
    //The Entry which represents the tail of the buffer
    private Entry tailEntry;
    ...
    
    ChannelOutboundBuffer(AbstractChannel channel) {
        this.channel = channel;
    }

    //Add given message to this ChannelOutboundBuffer. 
    //The given {@link ChannelPromise} will be notified once the message was written.
    public void addMessage(Object msg, int size, ChannelPromise promise) {
        Entry entry = Entry.newInstance(msg, size, total(msg), promise);
        if (tailEntry == null) {
            flushedEntry = null;
            tailEntry = entry;
        } else {
            Entry tail = tailEntry;
            tail.next = entry;
            tailEntry = entry;
        }
        if (unflushedEntry == null) {
            unflushedEntry = entry;
        }

        //increment pending bytes after adding message to the unflushed arrays.
        incrementPendingOutboundBytes(size, false);
    }
    
    static final class Entry {
        private static final Recycler<Entry> RECYCLER = new Recycler<Entry>() {
            @Override
            protected Entry newObject(Handle handle) {
                return new Entry(handle);
            }
        };
        private final Handle<Entry> handle;
        Entry next;
        Object msg;
        ByteBuffer[] bufs;
        ByteBuffer buf;
        ChannelPromise promise;
        long progress;
        long total;
        int pendingSize;
        int count = -1;
        boolean cancelled;
  
        private Entry(Handle<Entry> handle) {
            this.handle = handle;
        }

        static Entry newInstance(Object msg, int size, long total, ChannelPromise promise) {
            Entry entry = RECYCLER.get();
            entry.msg = msg;
            entry.pendingSize = size;
            entry.total = total;
            entry.promise = promise;
            return entry;
        }
        ...
    }
}

9.unsafe.flush()刷新写缓冲区的数据

(1)unsafe.flush()的入口

(2)unsafe.flush()的主要逻辑

(1)unsafe.flush()的入口

不管是ctx.channel().flush()还是ctx.flush(),最终都会来到pipeline中的head结点。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public class DefaultChannelPipeline implements ChannelPipeline {
    ...
    final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler {
        private final Unsafe unsafe;
        HeadContext(DefaultChannelPipeline pipeline) {
            super(pipeline, null, HEAD_NAME, false, true);
            unsafe = pipeline.channel().unsafe();
            setAddComplete();
        }
        ...
        @Override
        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
            unsafe.write(msg, promise);
        }

        @Override
        public void flush(ChannelHandlerContext ctx) throws Exception {
            unsafe.flush();
        }
    }
    ...
}

(2)unsafe.flush()的主要逻辑

步骤一:设置flushedEntry指针指向unflushedEntry指针所指向的Entry结点,并统计需要刷新的Entry结点的数量。

步骤二:遍历写缓冲区的Entry结点把对应的ByteBuf对象写到Socket,然后移除Entry结点。如果写缓冲区大小已经小于32KB,则通过自旋 + CAS设置Channel为可写状态。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
    private final DefaultChannelPipeline pipeline;
    ...
    protected abstract class AbstractUnsafe implements Unsafe {
        private volatile ChannelOutboundBuffer outboundBuffer = new ChannelOutboundBuffer(AbstractChannel.this); 
        ...
        @Override
        public final void flush() {
            assertEventLoop();
            ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
            if (outboundBuffer == null) {
                return;
            }
            //步骤一
            outboundBuffer.addFlush();
            //步骤二
            flush0();
        }
        protected void flush0() {
            final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
            ...
            doWrite(outboundBuffer);
            ...
        }
    }
    
    //Flush the content of the given buffer to the remote peer.
    protected abstract void doWrite(ChannelOutboundBuffer in) throws Exception;
}

public abstract class AbstractNioByteChannel extends AbstractNioChannel {
    ...
    @Override
    protected void doWrite(ChannelOutboundBuffer in) throws Exception {
        //默认自旋16次,以提高内存使用率和写的吞吐量
        int writeSpinCount = config().getWriteSpinCount();
        do {
            Object msg = in.current();
            if (msg == null) {
                //重新注册,不关注OP_WRITE事件
                clearOpWrite();
                return;
            }
            writeSpinCount -= doWriteInternal(in, msg);
        } while(writeSpinCount > 0);
        incompleteWrite(setOpWrite);
    }
    
    private int doWriteInternal(ChannelOutboundBuffer in, Object msg) {
        ...
        ByteBuf buf = (ByteBuf) msg;
        if (!buf.isReadable()) {
            //从写缓冲区(写队列)中移除结点
            in.remove();
            return 0;
        }
        //把ByteBuf对象写到Socket里
        final int localFlushedAmount = doWriteBytes(buf);
        if (localFlushedAmount > 0) {
            in.progress(localFlushedAmount);
            if (!buf.isReadable()) {
                //从写缓冲区(写队列)中移除结点
                in.remove();
            }
            return 1;
        }
        ...
    }
    
    protected final void clearOpWrite() {
        final SelectionKey key = selectionKey();
        //Check first if the key is still valid as it may be canceled as part of the deregistration from the EventLoop. 
        if (!key.isValid()) {
            return;
        }
        final int interestOps = key.interestOps();
        if ((interestOps & SelectionKey.OP_WRITE) != 0) {
            key.interestOps(interestOps & ~SelectionKey.OP_WRITE);
        }
    }
    
    @Override
    protected int doWriteBytes(ByteBuf buf) throws Exception {
        final int expectedWrittenBytes = buf.readableBytes();
        return buf.readBytes(javaChannel(), expectedWrittenBytes);
    }
    ...
}

public final class ChannelOutboundBuffer {
    private final Channel channel;
    
    //Entry(flushedEntry) --> ... Entry(unflushedEntry) --> ... Entry(tailEntry)
    //The Entry that is the first in the linked-list structure that was flushed
    private Entry flushedEntry;
    //The Entry which is the first unflushed in the linked-list structure
    private Entry unflushedEntry;
    //The Entry which represents the tail of the buffer
    private Entry tailEntry;
    
    private static final AtomicIntegerFieldUpdater<ChannelOutboundBuffer> UNWRITABLE_UPDATER;
    @SuppressWarnings("UnusedDeclaration")
    private volatile int unwritable;
    
    //The number of flushed entries that are not written yet
    private int flushed;
    ...
    
    //设置flushedEntry指针指向unflushedEntry指针所指向的Entry结点,
    //并统计需要刷新的Entry结点的数量
    public void addFlush() {
        Entry entry = unflushedEntry;
        if (entry != null) {
            if (flushedEntry == null) {
                flushedEntry = entry;
            }
            do {
                flushed ++;//所要flush的结点数
                entry = entry.next;
            } while (entry != null);
            unflushedEntry = null;
        }
    }
    
    public boolean remove() {
        //获取当前正在被flush的结点
        Entry e = flushedEntry;
        Object msg = e.msg;
        //获取该结点的回调对象
        ChannelPromise promise = e.promise;
        int size = e.pendingSize;
        //从写缓冲队列中移除结点
        removeEntry(e);
        if (!e.cancelled) {
            ReferenceCountUtil.safeRelease(msg);
            safeSuccess(promise);
            //如果写缓冲区大小小于32KB,就通过自旋+CAS设置Channel状态为可写
            decrementPendingOutboundBytes(size, false, true);
        }
        //回收实体
        e.recycle();
        return true;
    }
    
    private void removeEntry(Entry e) {
        if (-- flushed == 0) {
            flushedEntry = null;
            if (e == tailEntry) {
                tailEntry = null;
                unflushedEntry = null;
            }
        } else {
            flushedEntry = e.next;
        }
    }

    //Return the current message to write or null if nothing was flushed before and so is ready to be written.
    public Object current() {
        Entry entry = flushedEntry;
        if (entry == null) {
            return null;
        }
        return entry.msg;
    }

    //Notify the ChannelPromise of the current message about writing progress.
    public void progress(long amount) {
        Entry e = flushedEntry;
        assert e != null;
        ChannelPromise p = e.promise;
        if (p instanceof ChannelProgressivePromise) {
            long progress = e.progress + amount;
            e.progress = progress;
            ((ChannelProgressivePromise) p).tryProgress(progress, e.total);
        }
    }
    
    private void decrementPendingOutboundBytes(long size, boolean invokeLater, boolean notifyWritability) {
        if (size == 0) {
            return;
        }
        long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);
        if (notifyWritability && newWriteBufferSize < channel.config().getWriteBufferLowWaterMark()) {
            setWritable(invokeLater);
        }
    }
    
    private void setWritable(boolean invokeLater) {
        for (;;) {
            final int oldValue = unwritable;
            final int newValue = oldValue & ~1;
            if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
                if (oldValue != 0 && newValue == 0) {
                    fireChannelWritabilityChanged(invokeLater);
                }
                break;
            }
        }
    }
    ...
}

10.如何把对象变成字节流写到unsafe底层

当调用ctx.channel().writeAndFlush(user)将自定义的User对象沿着整个Pipeline进行传播时:

首先会调用tail结点的write()方法开始往前传播,传播到一个继承自MessageToByteEncoder的结点。该结点会实现MessageToByteEncoder的encode()方法来把自定义的User对象转换成一个ByteBuf对象。转换的过程首先会由MessageToByteEncoder分配一个ByteBuf对象,然后再调用其子类实现的抽象方法encode()将User对象填充到ByteBuf对象中。填充完之后继续调用write()方法把该ByteBuf对象往前进行传播,默认下最终会传播到head结点。

其中head结点的write()方法会通过底层的unsafe进行如下处理:把当前的ByteBuf对象添加到unsafe维护的一个写缓冲区里,同时计算写缓冲区大小是否超过64KB。如果写缓冲区大小超过了64KB,则设置当前Channel不可写。完成write()方法的传播后,head结点的unsafe对象维护的写缓冲区便对应着一个ByteBuf队列,它是一个单向链表。

然后会调用tail结点的flush()方法开始往前传播,默认下最终会传播到head结点。head结点在接收到flush事件时会通过底层的unsafe进行如下处理:首先进行指针调整,然后通过循环遍历从写缓冲区里把ByteBuf对象取出来。每拿出一个ByteBuf对象都会把它转化为JDK底层可以接受的ByteBuffer对象,最终通过JDK的Channel把该ByteBuffer对象写出去。每写完一个ByteBuffer对象都会把写缓冲区里的当前ByteBuf所在的Entry结点进行删除,并且判断如果当前写缓冲区里的大小已经小于32KB就通过自旋 + CAS重新设置Channel为可写。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
Netty源码—5.Pipeline和Handler二
Netty最大的特征之一就是ChannelHandler是可插拔的,可以动态编织ChannelPipeline。比如在客户端首次连接服务端时,需要进行权限认证,认证通过后就可以不用再认证了。下面的AuthHandler便实现了只对第一个传来的数据包进行认证校验。如果通过验证则删除此AuthHandler,这样后续传来的数据包便不会再校验了。
东阳马生架构
2025/06/09
900
netty案例,netty4.1源码分析篇五《一行简单的writeAndFlush的都做了哪些事》
对于使用netty的小伙伴来说,ctx.writeAndFlush()再熟悉不过了,它可以将我们的消息发送出去。那么它都执行了那些行为呢,是怎么将消息发送出去的呢。
小傅哥
2020/07/14
7090
netty案例,netty4.1源码分析篇五《一行简单的writeAndFlush的都做了哪些事》
Netty 源码解析 ——— writeAndFlush流程分析
本文是Netty文集中“Netty 源码解析”系列的文章。主要对Netty的重要流程以及类进行源码解析,以使得我们更好的去使用Netty。Netty是一个非常优秀的网络框架,对其源码解读的过程也是不断学习的过程。 源码解析 本文主要对Netty的写数据流程进行分析。代码调用仅一句: ctx.writeAndFlush("from server : " + UUID.randomUUID()); 变量 ctx 指的是 ChannelHandlerContext对象,我们跟进ChannelHandlerC
tomas家的小拨浪鼓
2018/06/27
2.6K0
netty源码分析之pipeline(二)
netty源码分析之pipeline(一)中,我们已经了解了pipeline在netty中所处的角色,像是一条流水线,控制着字节流的读写,本文,我们在这个基础上继续深挖pipeline在事件传播,异常传播等方面的细节
平凡的学生族
2019/05/25
7300
Netty源码—9.性能优化和设计模式二
线程的Stack里有3指针:head、pre、cursor。往Stack中插入一个WeakOrderQueue都是往头部插入的(头插法)。head指向第一个WeakOrderQueue,cursor指向当前回收对象的WeakOrderQueue。
东阳马生架构
2025/06/16
790
Netty:io请求处理过程解析
文接上一篇。上篇讲到netty暴露一个端口出来,acceptor, handler, pipeline, eventloop 都已准备好。但是并没体现其如何处理接入新的网络请求,今天我们就一起来看看吧。
烂猪皮
2021/01/04
1.2K0
Netty:io请求处理过程解析
万字长文拆解Netty核心机制:ChannelHandler源码全解析
在上一篇文章中,我们深入探讨Netty框架中的NioEventLoop,分析它是如何通过高效的事件循环机制处理网络事件的
菜菜的后端私房菜
2025/02/27
3912
Netty 源码深度解析(九) - 编码概述1 抽象类 MessageToByteEncoder2 抽象类 MessageToMessageEncoder一个java对象最后是如何转变成字节流,写到s
编码器实现了ChannelOutboundHandler,并将出站数据从 一种格式转换为另一种格式,和我们方才学习的解码器的功能正好相反。Netty 提供了一组类, 用于帮助你编写具有以下功能的编码器:
JavaEdge
2018/12/19
1.7K0
Netty源码—5.Pipeline和Handler一
可以在处理复杂的业务逻辑时避免if else的泛滥,可以实现对业务逻辑的模块化处理,不同的逻辑放置到单独的类中进行处理。最后将这些逻辑串联起来,形成一个完整的逻辑处理链。
东阳马生架构
2025/06/09
960
【Netty 专栏】深入浅出 Netty write
上一章节中,分析了Netty如何处理read事件,本节分析Netty如何把数据写会客户端。
芋道源码
2018/07/31
8380
【Netty 专栏】深入浅出 Netty write
Netty 那些事儿 ——— 关于 “Netty 发送大数据包时 触发写空闲超时” 的一些思考
本文是Netty文集中“Netty 那些事儿”系列的文章。主要结合在开发实战中,我们遇到的一些“奇奇怪怪”的问题,以及如何正确且更好的使用Netty框架,并会对Netty中涉及的重要设计理念进行介绍。 本文是笔者和朋友(笔名:oojeek)一起讨论该问题的一个记录。文章以讨论过程中的思路来展现(也是我们解决问题的思路路线),因此可能会有些乱。再者,如果对Netty写数据流程不了解的朋友,可以先阅读Netty 源码解析 ——— writeAndFlush流程分析该篇文章,下面的讨论中会涉及不少这篇文章提
tomas家的小拨浪鼓
2018/06/27
4K0
Netty的高低水位
假如我们的底层使用Netty作为网络通信框架,业务流程在将业务数据发送到对端之前,实际先要将数据发送到Netty的缓冲区中,然后再从Netty的缓冲区发送到TCP的缓冲区,最后再到对端.
书唐瑞
2022/06/02
9390
Netty的高低水位
18-Netty 编解码器和Handler的调用机制
基本说明 Netty的组件设计: Netty的主要组件有Channel, EventLoop, ChannelFuture, ChannelHandler, ChannelPipeline等 ChannelHandler充当了处理入站和出站数据的应用程序逻辑的容器, 例如: 实现ChannelInboundHandler接口(或ChannelInboundHandlerAdapter), 你就可以接收入站事件和数据, 这些数据会被业务逻辑处理, 当要给客户端发送响应时, 也可以从ChannelInbound
彼岸舞
2022/02/18
1.3K0
18-Netty 编解码器和Handler的调用机制
深入Netty事件流程分析(下)
继上一篇Netty事件流程分析,本文主要讲述Netty的责任链创建,添加以及销毁流程,同时我们关注IO事件流程的分析,即监听连接事件,接收请求事件以及写出数据事件的流程,最后也会将结合channel/pipeline/handler的生命周期作一个小结.
小坤探游架构笔记
2020/05/07
8410
Netty中Channel与Unsafe源码解读
  Channel是netty网络操作抽象类,包括网络的读,写,链路关闭,发起连接等。我们拿出NioServerSocketChannel来进行分析,NioServerSocketChannel的类图如下所示:
良辰美景TT
2018/09/11
8110
Netty中Channel与Unsafe源码解读
【Netty】inBound和outBound事件的传播过程
上一节学习了 ChannelHandler添加和修改的过程。 至此我们已经了解了 pipeline和 ChannelHandlerContext, ChannelHandler着三者之间的关系。pipeline通过维持一个链表结构,链表节点是 ChannelHandlerContext,该节点持有 ChannelHandler。部分对 ChannelHandler的操作直接暴露给 ChannelHandlerContext,因此我们可以直接操作 ChannelHandlerContext来间接操作 ChannelHandler。
用户3467126
2019/07/03
3.6K0
Netty源码—8.编解码原理一
当客户端Channel的Reactor线程NioEventLoop检测到有读事件时,会执行NioByteUnsafe的read()方法。该方法会调用doReadBytes()方法将TCP缓冲区的数据读到由ByteBufAllocator分配的一个ByteBuf对象中,然后通过pipeline.fireChannelRead()方法带上这个ByteBuf对象向下传播ChannelRead事件。
东阳马生架构
2025/06/13
1290
netty源码分析四之客户端接入与数据写出
在上文Bootstrap初始化流程分析中我们知道,在NioServerSocketChannel进行register时,会调用eventLoop.execute方法:
山行AI
2019/10/24
1.2K0
Netty模拟OOM-Metaspace
在模拟OOM之前, 先简单说下Netty服务端向客户端发送数据的时候, 涉及两个存储数据的地方, 如下图所示
书唐瑞
2022/06/02
4310
Netty模拟OOM-Metaspace
netty 入门
我们一般会用Http客户端库来调用web服务,获取数据。如果一个东西是出于一般性目的设计出来的,那么他在某些方面可能就不是最合适的。比如获取大文件,收发邮件,展示实时的金融数据,游戏数据传输等。为了实现这些需求,需要一个为其高度优化的特定协议。还有一个无法避免的问题是你可能需要调用老系统的数据,但是他的协议又是特定。重点来了,如何在不牺牲可靠性和性能的前提下快速实现这么一个系统。
_淡定_
2020/05/12
8180
推荐阅读
相关推荐
Netty源码—5.Pipeline和Handler二
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验