首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >Netty源码—9.性能优化和设计模式二

Netty源码—9.性能优化和设计模式二

原创
作者头像
东阳马生架构
发布2025-06-16 23:13:34
发布2025-06-16 23:13:34
1480
举报
文章被收录于专栏:Netty应用与源码Netty应用与源码

大纲

1.Netty的两大性能优化工具

2.FastThreadLocal的实现之构造方法

3.FastThreadLocal的实现之get()方法

4.FastThreadLocal的实现之set()方法

5.FastThreadLocal的总结

6.Recycler的设计理念

7.Recycler的使用

8.Recycler的四个核心组件

9.Recycler的初始化

10.Recycler的对象获取

11.Recycler的对象回收

12.异线程收割对象

13.Recycler的总结

14.Netty设计模式之单例模式

15.Netty设计模式之策略模式

16.Netty设计模式之装饰器模式

17.Netty设计模式之观察者模式

18.Netty设计模式之迭代器模式

19.Netty设计模式之责任链模式

12.异线程收割对象

说明一:

线程的Stack里有3指针:head、pre、cursor。往Stack中插入一个WeakOrderQueue都是往头部插入的(头插法)。head指向第一个WeakOrderQueue,cursor指向当前回收对象的WeakOrderQueue。

说明二:

scavenge()方法会从其他线程回收的对象中尝试转移对象。如果成功则返回,否则设置cursor为head,以便下次从头开始获取。

说明三:

scavengeSome()方法会首先判断cursor是否为null。如果cursor为null,则设置cursor为head结点。接着执行do while循环不断尝试从其他线程的WeakOrderQueue中,转移一些对象到当前线程的Stack的一个DefaultHandle数组中。

说明四:

cursor.transfer()方法会把WeakOrderQueue里的对象传输到当前线程的Stack的elements数组里。如果传输成功就结束do while循环,如果传输失败就获取cursor的下一个结点cursor.next继续处理。

说明五:

cursor.transfer()方法每次都只取WeakOrderQueue里的一个Link,然后传输Link里的elements数组元素到目标Stack的elements数组中。如果cursor = null或success = true,则do while循环结束。所有的WeakOrderQueue默认时最多总共可以有:2K / 16 = 128个Link。

代码语言:javascript
复制
public abstract class Recycler<T> {
    ...
    private static final class Stack<T> {
        private WeakOrderQueue cursor, prev;
        private volatile WeakOrderQueue head;
        ...
        //尝试从其他线程回收的对象中转移一些到elements数组
        private boolean scavenge() {
            if (scavengeSome()) {
                return true;
            }
            prev = null;
            cursor = head;
            return false;
        }

        private boolean scavengeSome() {
            WeakOrderQueue prev;
            WeakOrderQueue cursor = this.cursor;
            //首先判断cursor是否为null,如果cursor为null,则设置cursor为head结点
            if (cursor == null) {
                prev = null;
                cursor = head;
                if (cursor == null) {
                    return false;
                }
            } else {
                prev = this.prev;
            }

            boolean success = false;
            do {
                //从其他线程的WeakOrderQueue也就是cursor中,
                //转移一些对象到当前线程的Stack<T>的一个DefaultHandle数组中
                if (cursor.transfer(this)) {
                    success = true;
                    break;
                }
                WeakOrderQueue next = cursor.getNext();
                ...
                cursor = next;
            } while (cursor != null && !success);

            this.prev = prev;
            this.cursor = cursor;
            return success;
        }
    }
    
    private static final class WeakOrderQueue extends WeakReference<Thread> {
        ...
        //从当前的WeakOrderQueue中,转移一些对象到目标Stack<T>的一个DefaultHandle数组中
        boolean transfer(Stack<?> dst) {
            Link head = this.head.link;
            if (head == null) {
                return false;
            }

            if (head.readIndex == LINK_CAPACITY) {
                if (head.next == null) {
                    return false;
                }
                head = head.next;
                this.head.relink(head);
            }

            final int srcStart = head.readIndex;
            int srcEnd = head.get();
            final int srcSize = srcEnd - srcStart;
            if (srcSize == 0) {
                return false;
            }

            final int dstSize = dst.size;
            final int expectedCapacity = dstSize + srcSize;

            if (expectedCapacity > dst.elements.length) {
                final int actualCapacity = dst.increaseCapacity(expectedCapacity);
                srcEnd = min(srcStart + actualCapacity - dstSize, srcEnd);
            }

            if (srcStart != srcEnd) {
                final DefaultHandle[] srcElems = head.elements;
                final DefaultHandle[] dstElems = dst.elements;
                int newDstSize = dstSize;
                for (int i = srcStart; i < srcEnd; i++) {
                    DefaultHandle<?> element = srcElems[i];
                    if (element.recycleId == 0) {
                        element.recycleId = element.lastRecycledId;
                    } else if (element.recycleId != element.lastRecycledId) {
                        throw new IllegalStateException("recycled already");
                    }
                    srcElems[i] = null;

                    if (dst.dropHandle(element)) {
                        //Drop the object.
                        continue;
                    }
                    element.stack = dst;
                    dstElems[newDstSize ++] = element;
                }

                if (srcEnd == LINK_CAPACITY && head.next != null) {
                    //Add capacity back as the Link is GCed.
                    this.head.relink(head.next);
                }

                head.readIndex = srcEnd;
                if (dst.size == newDstSize) {
                    return false;
                }
                dst.size = newDstSize;
                return true;
            } else {
                //The destination stack is full already.
                return false;
            }
        }
        ...
    }
    ...
}

13.Recycler的总结

(1)获取对象和回收对象的思路总结

(2)获取对象的具体步骤总结

(3)对象池的设计核心

(1)获取对象和回收对象的思路总结

对象池有两个重要的组成部分:Stack和WeakOrderQueue。

从Recycler获取对象时,优先从Stack中查找可用对象。如果Stack中没有可用对象,会尝试从WeakOrderQueue迁移一些对象到Stack中。

Recycler回收对象时,分为同线程对象回收和异线程对象回收这两种情况。同线程回收直接向Stack添加对象,异线程回收会向WeakOrderQueue中的最后一个Link添加对象。

同线程回收和异线程回收都会控制回收速率,默认每8个对象会回收一个,其他的全部丢弃。

(2)获取对象的具体步骤总结

如何从一个对象池里获取对象:

步骤一:首先通过FastThreadLocal方式拿到当前线程的Stack。

步骤二:如果这个Stack里的elements数组有对象,则直接弹出。如果这个Stack里的elements数组没有对象,则从当前Stack关联的其他线程的WeakOrderQueue里的Link结点的elements数组中转移对象,到当前Stack里的elements数组里。

步骤三:如果转移成功,那么当前Stack里的elements数组就有对象了,这时就可以直接弹出。如果转移失败,那么接下来就直接创建一个对象然后和当前Stack进行关联。

步骤四:关联之后,后续如果是当前线程自己进行对象回收,则将该对象直接存放到当前线程的Stack里。如果是其他线程进行对象回收,则将该对象存放到其他线程与当前线程的Stack关联的WeakOrderQueue里。

(3)对象池的设计核心

为什么要分同线程和异线程进行处理,并设计一套比较复杂的数据结构?因为对象池的使用场景一般是高并发的环境,希望通过对象池来减少对象的频繁创建带来的性能损耗。所以在高并发的环境下,从对象池中获取对象和回收对象就只能通过以空间来换时间的思路进行处理,而ThreadLocal恰好是通过以空间换时间的思路来实现的,因此引入了FastThreadLocal来管理对象池里的对象。但是如果仅仅使用FastThreadLocal管理同线程创建和回收的对象,那么并不能充分体现对象池的作用。所以通过FastThreadLocal获取的Stack对象,应该不仅可以管理同线程的对象,也可以管理异线程的对象。为此,Recycler便分同线程和异线程进行处理并设计了一套比较复杂的数据结构。

14.Netty设计模式之单例模式

(1)单例模式的特点

(2)单例模式的例子

(3)Netty中的单例模式

(1)单例模式的特点

一.一个类全局只有一个对象

二.延迟创建

三.避免线程安全问题

(2)单例模式的例子

代码语言:javascript
复制
public class Singleton {
    private volatile static Singleton singleton;
    private Singleton() {
    
    }
    public static Singleton getInstance() {
        if (singleton == null) {
            synchronized(Singleton.class) {
                if (singleton == null) {
                    singleton = new Singleton();
                }
            }
        }
        return singleton;
    }
}

(3)Netty中的单例模式

Netty中的单例模式大都使用饿汉模式,比如ReadTimeoutException、MqttEncoder。

代码语言:javascript
复制
public final class ReadTimeoutException extends TimeoutException {
    public static final ReadTimeoutException INSTANCE = 
        PlatformDependent.javaVersion() >= 7 ? new ReadTimeoutException(true) : new ReadTimeoutException();

    ReadTimeoutException() {

    }
    
    private ReadTimeoutException(boolean shared) {
        super(shared);
    }
}

@ChannelHandler.Sharable
public final class MqttEncoder extends MessageToMessageEncoder<MqttMessage> {
    public static final MqttEncoder INSTANCE = new MqttEncoder();
    private MqttEncoder() {
               
    }
    ...
}

15.Netty设计模式之策略模式

(1)策略模式的特点

(2)策略模式的例子

(3)Netty中的策略模式

(1)策略模式的特点

一.封装一系列可相互替换的算法家族

二.动态选择某一个策略

(2)策略模式的例子

代码语言:javascript
复制
public class Strategy {
    private Cache cacheMemory = new CacheMemoryImpl();
    private Cache cacheRedis = new CacheRedisImpl();
    
    public interface Cache {
        boolean add(String key, Object object);
    }
    
    public class CacheMemoryImpl implements Cache {
        public boolean add(String key, Object object) {
            //保存到Memory
            return false;
        }
    }
    
    public class CacheRedisImpl implements Cache {
        public boolean add(String key, Object object) {
            //保存到Redis
            return false;
        }
    }
    
    public Cache getCache(String key) {
        if (key.length() < 10) {
            return cacheRedis;
        } else {
            return cacheMemory;
        }
    }
}

(3)Netty中的策略模式

Netty的DefaultEventExecutorChooserFactory中的newChooser()方法就可以动态选择某具体策略;EventExecutorChooser接口中的next()方法就有两种实现算法:PowerOfTwoEventExecutorChooser.next()和GenericEventExecutorChooser.next()。

代码语言:javascript
复制
public final class DefaultEventExecutorChooserFactory implements EventExecutorChooserFactory {
    public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory();
    private DefaultEventExecutorChooserFactory() {
    
    }

    @SuppressWarnings("unchecked")
    @Override
    public EventExecutorChooser newChooser(EventExecutor[] executors) {
        if (isPowerOfTwo(executors.length)) {
            return new PowerOfTwoEventExecutorChooser(executors);
        } else {
            return new GenericEventExecutorChooser(executors);
        }
    }

    private static boolean isPowerOfTwo(int val) {
        return (val & -val) == val;
    }

    private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
        private final AtomicInteger idx = new AtomicInteger();
        private final EventExecutor[] executors;

        PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
            this.executors = executors;
        }

        @Override
        public EventExecutor next() {
            return executors[idx.getAndIncrement() & executors.length - 1];
        }
    }

    private static final class GenericEventExecutorChooser implements EventExecutorChooser {
        private final AtomicInteger idx = new AtomicInteger();
        private final EventExecutor[] executors;

        GenericEventExecutorChooser(EventExecutor[] executors) {
            this.executors = executors;
        }

        @Override
        public EventExecutor next() {
            return executors[Math.abs(idx.getAndIncrement() % executors.length)];
        }
    }
}

16.Netty设计模式之装饰器模式

(1)装饰器模式的特点

(2)装饰器模式的例子

(3)Netty中的装饰器模式

(1)装饰器模式的特点

一.装饰者和被装饰者继承自同一个接口

二.装饰者给被装饰者动态修改行为(丰富类的功能)

(2)装饰器模式的例子

代码语言:javascript
复制
public class Decorate {
    //优惠方案
    public interface OnSalePlan {
        float getPrice(float oldPrice);    
    }
    
    //无优惠
    public static class NonePlan implements OnSalePlan {//被装饰者
        static final OnSalePlan INSTANCE = new NonePlan();
        private NonePlan() {                   
        }
        public float getPrice(float oldPrice) {
            return oldPrice;
        }
    }
    
    //立减优惠
    public static class KnockPlan implements OnSalePlan {//被装饰者
        private float amount;//立减金额
        public KnockPlan(float amount) {
            this.amount = amount;
        }
        public float getPrice(float oldPrice) {
            return oldPrice - amount;
        }
    }
    
    //打折优惠
    public static class DiscountPlan implements OnSalePlan {//装饰者
        private int discount;//折扣
        private OnSalePlan previousPlan;//被装饰者
        //直接打折的构造方法
        public DiscountPlan(int discount) {
            this(discount, NonePlan.INSTANCE);
        }
        //在满减的基础上打折的构造方法
        public DiscountPlan(int discount, OnSalePlan previousPlan) {
            this.discount = discount;
            this.previousPlan = previousPlan;
        }
        public float getPrice(float oldPrice) {
            return previousPlan.getPrice(oldPrice) * discount;
        }
    }
    
    public static void main(String[] args) {
        KnockPlan previousPlan = new KnockPlan(50);//被装饰者
        DiscountPlan complexDiscountPlan = new DiscountPlan(5, previousPlan);
        System.out.println(complexDiscountPlan.getPrice(100));
    }
}

(3)Netty中的装饰器模式

Netty中的SimpleLeakAwareByteBuf、UnreleasableByteBuf、WrappedByteBuf便用到了装饰器模式。每次调用WrappedByteBuf的方法,都会委托到被装饰的ByteBuf。这个WrappedByteBuf其实是Netty里装饰ByteBuf的一个基类,它基本是直接使用了ByteBuf的方法。

WrappedByteBuf有两个子类:SimpleLeakAwareByteBuf和UnreleasableByteBuf。这两个子类都是装饰者,被装饰者都是ByteBuf,在它们的构造函数中传入。

代码语言:javascript
复制
class WrappedByteBuf extends ByteBuf {
    protected final ByteBuf buf;//被装饰者
    
    protected WrappedByteBuf(ByteBuf buf) {
        this.buf = ObjectUtil.checkNotNull(buf, "buf");
    }
    
    @Override
    public final int capacity() {
        return buf.capacity();
    }
    
    @Override
    public final ByteBufAllocator alloc() {
        return buf.alloc();
    }
    
    @Override
    public final int readableBytes() {
        return buf.readableBytes();
    }
    
    @Override
    public ByteBuf readBytes(byte[] dst) {
        buf.readBytes(dst);
        return this;
    }
    
    @Override
    public ByteBuf writeBytes(byte[] src) {
        buf.writeBytes(src);
        return this;
    }
    
    @Override
    public boolean release() {
        return buf.release();
    }
    ...
}

final class UnreleasableByteBuf extends WrappedByteBuf {
    ...
    UnreleasableByteBuf(ByteBuf buf) {
        super(buf instanceof UnreleasableByteBuf ? buf.unwrap() : buf);
    }
    ...
}

class SimpleLeakAwareByteBuf extends WrappedByteBuf {
    ...
    SimpleLeakAwareByteBuf(ByteBuf wrapped, ByteBuf trackedByteBuf, ResourceLeakTracker<ByteBuf> leak) {
        super(wrapped);
        this.trackedByteBuf = ObjectUtil.checkNotNull(trackedByteBuf, "trackedByteBuf");
        this.leak = ObjectUtil.checkNotNull(leak, "leak");
    }
    ...
}

17.Netty设计模式之观察者模式

(1)观察者模式的特点

(2)观察者模式的例子

(3)Netty中的观察者模式

(1)观察者模式的特点

一.观察者和被观察者

二.观察者订阅消息,被观察者发布消息

三.订阅则能收到消息,取消订阅则不能收到消息

(2)观察者模式的例子

女神是被观察者,男孩、男人、老男人是观察者。第一步需要注册观察者到被观察者的一个列表中,第二步当被观察者触发某个动作后需遍历观察者列表执行观察者的方法。

代码语言:javascript
复制
public class ObserverTest {
    //被观察者需要实现的接口
    public interface Observerable {
        void registerObserver(Observer o);
        void removeObserver(Observer o);
        void notifyObserver();
    }
    
    //观察者需要实现的接口
    public interface Observer {
        void notify(String message);
    }
    
    //女神:被观察者
    public static class Girl implements Observerable {
        private String message;
        List<Observer> observerList;
        
        public Girl() {
            observerList = new ArrayList();
        }
      
        public void registerObserver(Observer o) {
            observerList.add(o);
        }

        public void removeObserver(Observer o) {
            observerList.remove(o);
        }
      
        public void notifyObserver() {
            for (Observer observer : observerList) {
                observer.notify(message);
            }
        }
      
        public void gotoBathe() {
            message = "女神去洗澡了";
            notifyObserver();
        }
       
        public void gotoSleep() {
            message = "女神去睡觉了";
            notifyObserver();
        }
    }
    
    //男人:观察者
    public static class Man implements Observer {
        public void notify(String message) {
            //收到女神的情况,准备下一步行动
        }
    }
    
    public static void main(String[] args) {
        Girl girl = new Girl();
        Man man = new Man();
        girl.registerObserver(man);
        girl.gotoSleep();
    }
}

(3)Netty中的观察者模式

Netty的writeAndFlush()方法就是典型的观察者模式。Netty的Future或Promise模式实现了writeAndFlush()的异步化,并且每次写成功或者写失败都能收到回调。

我们在调用writeAndFlush()方法后,Netty会创建一个被观察者ChannelFuture。然后在调用channelFuture.addListener()方法时,其实就是往被观察者ChannelFuture中添加一系列的观察者。

代码语言:javascript
复制
public void write(NioSocketChannel channel, Object object) {
    ChannelFuture channelFuture = channel.writeAndFlush(object);
    channelFuture.addListener(future -> {
        if (future.isSuccess()) ...    
    });
    channelFuture.addListener(future -> {
        if (future.isSuccess()) ...    
    });
}

每一个writeAndFlush()方法被调用时都是从pipeline开始往前传播,也就是从tail结点开始执行writeAndFlush()方法并从后往前传播。tail结点的writeAndFlush()方法会去new一个Promise(),这个new Promise()就是创建一个被观察者DefaultChannelPromise。DefaultChannelPromise继承自ChannelPromise,ChannelPromise继承自ChannelFuture。

代码语言:javascript
复制
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
    ...
    @Override
    public ChannelFuture writeAndFlush(Object msg) {
        return pipeline.writeAndFlush(msg);
    }
    ...
}

public class DefaultChannelPipeline implements ChannelPipeline {
    ...
    final AbstractChannelHandlerContext tail;
    @Override
    public final ChannelFuture writeAndFlush(Object msg) {
        //调用TailContext父类的writeAndFlush()方法
        return tail.writeAndFlush(msg);
    }
    
    final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
        ...
    }
    ...
}

abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {
    ...
    @Override
    public ChannelFuture writeAndFlush(Object msg) {
        //传入一个由newPromise()创建的DefaultChannelPromise
        return writeAndFlush(msg, newPromise());
    }
    
    @Override
    public ChannelPromise newPromise() {
        return new DefaultChannelPromise(channel(), executor());
    }
    
    @Override
    public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
        //调用AbstractChannelHandlerContext的write()方法
        write(msg, true, promise);
        //返回传进来的DefaultChannelPromise,也就是ChannelFuture
        return promise;
    }
    ...
}

public class DefaultChannelPromise extends DefaultPromise<Void> implements ChannelPromise, FlushCheckpoint {
    private final Channel channel;
    ...
    //Creates a new instance.
    //@param channel,the Channel associated with this future
    public DefaultChannelPromise(Channel channel, EventExecutor executor) {
        super(executor);
        this.channel = checkNotNull(channel, "channel");
    }
    ...
}

TailContext父类AbstractChannelHandlerContext的writeAndFlush()方法执行源码:

代码语言:javascript
复制
abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {
    ...
    @Override
    public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
        //调用AbstractChannelHandlerContext的write()方法
        write(msg, true, promise);
        //返回传进来的DefaultChannelPromise,也就是ChannelFuture
        return promise;
    }
    
    private void write(Object msg, boolean flush, ChannelPromise promise) {
        ObjectUtil.checkNotNull(msg, "msg");
        try {
            if (isNotValidPromise(promise, true)) {
                ReferenceCountUtil.release(msg);
                return;
            }
        } catch (RuntimeException e) {
            ReferenceCountUtil.release(msg);
            throw e;
        }
      
        //寻找下一个结点
        final AbstractChannelHandlerContext next = findContextOutbound(flush ? (MASK_WRITE | MASK_FLUSH) : MASK_WRITE);
        final Object m = pipeline.touch(msg, next);
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            if (flush) {
                //调用AbstractChannelHandlerContext的invokeWriteAndFlush()方法
                next.invokeWriteAndFlush(m, promise);
            } else {
                next.invokeWrite(m, promise);
            }
        } else {
            final WriteTask task = WriteTask.newInstance(next, m, promise, flush);
            if (!safeExecute(executor, task, promise, m, !flush)) {
                task.cancel();
            }
        }
    }
    
    void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
        if (invokeHandler()) {
            invokeWrite0(msg, promise);
            invokeFlush0();
        } else {
            writeAndFlush(msg, promise);
        }
    }
    
    private void invokeWrite0(Object msg, ChannelPromise promise) {
        try {
            //最终会调用到HeadContext的write()方法
            ((ChannelOutboundHandler) handler()).write(this, msg, promise);
        } catch (Throwable t) {
            notifyOutboundHandlerException(t, promise);
        }
    }
    
    private void invokeFlush0() {
        try {
            //最终会调用到HeadContext的flush()方法
            ((ChannelOutboundHandler) handler()).flush(this);
        } catch (Throwable t) {
            invokeExceptionCaught(t);
        }
    }
    ...
}

public class DefaultChannelPipeline implements ChannelPipeline {
    ...
    final AbstractChannelHandlerContext head;
    final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler {
        private final Unsafe unsafe;
        ...
        @Override
        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
            //调用AbstractChannel.AbstractUnsafe的write()方法
            unsafe.write(msg, promise);
        }

        @Override
        public void flush(ChannelHandlerContext ctx) {
            //调用AbstractChannel.AbstractUnsafe的flush()方法
            unsafe.flush();
        }
    }
    ...
}

public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
    ...
    protected abstract class AbstractUnsafe implements Unsafe {
        ...
        @Override
        public final void flush() {
            assertEventLoop();
            ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
            if (outboundBuffer == null) {
                return;
            }
            outboundBuffer.addFlush();
            flush0();
        }

        @SuppressWarnings("deprecation")
        protected void flush0() {
            ...
            //调用AbstractNioByteChannel的doWrite()方法
            doWrite(outboundBuffer);
            ...
        }
    }
    
    //Flush the content of the given buffer to the remote peer.
    protected abstract void doWrite(ChannelOutboundBuffer in) throws Exception;
}

public abstract class AbstractNioByteChannel extends AbstractNioChannel {
    ...
    @Override
    protected void doWrite(ChannelOutboundBuffer in) throws Exception {
        int writeSpinCount = config().getWriteSpinCount();
        do {
            Object msg = in.current();
            if (msg == null) {
                clearOpWrite();
                return;
            }
            writeSpinCount -= doWriteInternal(in, msg);
        } while (writeSpinCount > 0);
        incompleteWrite(writeSpinCount < 0);
    }
    
    private int doWriteInternal(ChannelOutboundBuffer in, Object msg) throws Exception {
        if (msg instanceof ByteBuf) {
            ByteBuf buf = (ByteBuf) msg;
            if (!buf.isReadable()) {
                //调用ChannelOutboundBuffer的remove()方法
                in.remove();
                return 0;
            }

            final int localFlushedAmount = doWriteBytes(buf);
            if (localFlushedAmount > 0) {
                in.progress(localFlushedAmount);
                if (!buf.isReadable()) {
                    //调用ChannelOutboundBuffer的remove()方法
                    in.remove();
                }
                return 1;
            }
        }
        ...
    }
    ...
}

public final class ChannelOutboundBuffer {
    private Entry flushedEntry;
    ...
    //Will remove the current message, mark its ChannelPromise as success and return true. 
    //If no flushed message exists at the time this method is called it will return false to signal that no more messages are ready to be handled.
    public boolean remove() {
        Entry e = flushedEntry;
        if (e == null) {
            clearNioBuffers();
            return false;
        }
        Object msg = e.msg;

        ChannelPromise promise = e.promise;
        int size = e.pendingSize;
        removeEntry(e);

        if (!e.cancelled) {
            //only release message, notify and decrement if it was not canceled before.
            ReferenceCountUtil.safeRelease(msg);
            //调用ChannelOutboundBuffer的safeSuccess()方法通知promise的观察者
            safeSuccess(promise);
            decrementPendingOutboundBytes(size, false, true);
        }

        //recycle the entry
        e.recycle();
        return true;
    }
    
    private static void safeSuccess(ChannelPromise promise) {
        //Only log if the given promise is not of type VoidChannelPromise as trySuccess(...) is expected to return false.
        //触发调用DefaultPromise的trySuccess()方法通知promise的观察者
        PromiseNotificationUtil.trySuccess(promise, null, promise instanceof VoidChannelPromise ? null : logger);
    }
    ...
}

DefaultPromise添加观察者和通知观察者的源码如下。注意:DefaultPromise.listeners是一个Object,第一次添加时listeners = listener,第二次添加时会将新增的和当前的listeners转为一个数组,然后再往数组里添加元素。

代码语言:javascript
复制
public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
    private Object listeners;
    ...
    //添加观察者
    @Override
    public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
        checkNotNull(listener, "listener");
        synchronized (this) {
            addListener0(listener);
        }
        if (isDone()) {
            notifyListeners();
        }
        return this;
    }
    
    private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) {
        if (listeners == null) {
            listeners = listener;
        } else if (listeners instanceof DefaultFutureListeners) {
            ((DefaultFutureListeners) listeners).add(listener);
        } else {
            listeners = new DefaultFutureListeners((GenericFutureListener<?>) listeners, listener);
        }
    }
    
    ...
    
    //通知观察者
    @Override
    public boolean trySuccess(V result) {
        return setSuccess0(result);
    }
    
    private boolean setSuccess0(V result) {
        return setValue0(result == null ? SUCCESS : result);
    }
    
    private boolean setValue0(Object objResult) {
        if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
            RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
            if (checkNotifyWaiters()) {
                //调用DefaultPromise的notifyListeners()方法通知观察者
                notifyListeners();
            }
            return true;
        }
        return false;
    }
    
    private void notifyListeners() {
        EventExecutor executor = executor();
        if (executor.inEventLoop()) {
            final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
            final int stackDepth = threadLocals.futureListenerStackDepth();
            if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
                threadLocals.setFutureListenerStackDepth(stackDepth + 1);
                try {
                    notifyListenersNow();
                } finally {
                    threadLocals.setFutureListenerStackDepth(stackDepth);
                }
                return;
            }
        }

        safeExecute(executor, new Runnable() {
            @Override
            public void run() {
                notifyListenersNow();
            }
        });
    }
    
    private void notifyListenersNow() {
        Object listeners;
        synchronized (this) {
            if (notifyingListeners || this.listeners == null) {
                return;
            }
            notifyingListeners = true;
            listeners = this.listeners;
            this.listeners = null;
        }
        for (;;) {
            if (listeners instanceof DefaultFutureListeners) {
                notifyListeners0((DefaultFutureListeners) listeners);
            } else {
                notifyListener0(this, (GenericFutureListener<?>) listeners);
            }
            synchronized (this) {
                if (this.listeners == null) {
                    notifyingListeners = false;
                    return;
                }
                listeners = this.listeners;
                this.listeners = null;
            }
        }
    }

    private void notifyListeners0(DefaultFutureListeners listeners) {
        GenericFutureListener<?>[] a = listeners.listeners();
        int size = listeners.size();
        for (int i = 0; i < size; i ++) {
            notifyListener0(this, a[i]);
        }
    }

    @SuppressWarnings({ "unchecked", "rawtypes" })
    private static void notifyListener0(Future future, GenericFutureListener l) {
        try {
            l.operationComplete(future);
        } catch (Throwable t) {
            if (logger.isWarnEnabled()) {
                logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t);
            }
        }
    }
    ...
}

writeAndFlush()方法返回的ChannelFuture添加观察者和通知观察者的调用路径总结如下:

代码语言:javascript
复制
//添加观察者
=> ChannelFuture.addListener() //最后会调用到DefaultPromise的addListener
=> DefaultChannelPromise.addListener()
=> DefaultPromise.addListener() //使用了同步块是因为可能有多个线程同时添加Listener
=> DefaultPromise.addListener0()

//通知观察者
=> AbstractChannel.writeAndFlush()
=> unsafe.write()和unsafe.flush()
=> in.remove()删除数据缓冲区
=> DefaultPromise.trySuccess()
=> DefaultPromise.notifyListenersNow()
=> DefaultPromise.notifyListeners0()

18.Netty设计模式之迭代器模式

(1)迭代器模式的特点

(2)迭代器模式的例子

(1)迭代器模式的特点

一.迭代器接口

二.对容器里各个对象进行访问

(2)迭代器模式的例子

通过迭代器,Netty能够实现内存的零拷贝。

代码语言:javascript
复制
public class IterableTest {
    public static void main(String[] args) {
        ByteBuf header = Unpooled.wrappedBuffer(new byte[]{1,2,3});
        ByteBuf body = Unpooled.wrappedBuffer(new byte[]{4,5,6});
        ByteBuf merge = merge(header, body);
        merge.forEachByte(value -> {
            System.out.println(value);
            return true;
        });
    }
    
    //实现一:对ByteBuf进行拷贝才能遍历各字节
    public static ByteBuf merge(ByteBuf header, ByteBuf body) {
        ByteBuf byteBuf = ByteBufAllocator.DEFAULT.ioBuffer();
        byteBuf.writeBytes(header);
        byteBuf.writeBytes(body);
        return byteBuf;
    }
    
    //实现二:CompositeByteBuf将两个ByteBuf以组件的形式添加进去,可实现零拷贝
    public static ByteBuf merge(ByteBuf header, ByteBuf body) {
        CompositeByteBuf byteBuf = ByteBufAllocator.DEFAULT.compositeBuffer(2);//组件个数为2
        byteBuf.addComponent(true, header);
        byteBuf.addComponent(true, body);
        return byteBuf;
    }
}

CompositeByteBuf将两个ByteBuf以组件的形式添加进去,可实现零拷贝。CompositeByteBuf的forEachByte()方法利用迭代器模式实现了对容器里各对象的遍历访问。

代码语言:javascript
复制
public class CompositeByteBuf extends AbstractReferenceCountedByteBuf implements Iterable<ByteBuf> {
    private Component[] components;
    ...
    //Add the given ByteBuf and increase the writerIndex if increaseWriterIndex is true.
    //ByteBuf#release() ownership of buffer is transferred to this CompositeByteBuf.
    //@param buffer the ByteBuf to add. ByteBuf#release() ownership is transferred to this CompositeByteBuf. 
    public CompositeByteBuf addComponent(boolean increaseWriterIndex, ByteBuf buffer) {
        return addComponent(increaseWriterIndex, componentCount, buffer);
    }
    
    public CompositeByteBuf addComponent(boolean increaseWriterIndex, int cIndex, ByteBuf buffer) {
        checkNotNull(buffer, "buffer");
        addComponent0(increaseWriterIndex, cIndex, buffer);
        consolidateIfNeeded();
        return this;
    }
    
    private int addComponent0(boolean increaseWriterIndex, int cIndex, ByteBuf buffer) {
        assert buffer != null;
        boolean wasAdded = false;
        try {
            checkComponentIndex(cIndex);
            Component c = newComponent(ensureAccessible(buffer), 0);
            int readableBytes = c.length();
            if (capacity() + readableBytes < 0) {
                throw new IllegalArgumentException("Can't increase by " + readableBytes);
            }

            addComp(cIndex, c);
            wasAdded = true;
            if (readableBytes > 0 && cIndex < componentCount - 1) {
                updateComponentOffsets(cIndex);
            } else if (cIndex > 0) {
                c.reposition(components[cIndex - 1].endOffset);
            }
            if (increaseWriterIndex) {
                writerIndex += readableBytes;
            }
            return cIndex;
        } finally {
            if (!wasAdded) {
                buffer.release();
            }
        }
    }
    
    private void addComp(int i, Component c) {
        shiftComps(i, 1);
        components[i] = c;
    }
    
    @Override
    public byte getByte(int index) {
        Component c = findComponent(index);
        return c.buf.getByte(c.idx(index));
    }
    
    private Component findComponent(int offset) {
        Component la = lastAccessed;
        if (la != null && offset >= la.offset && offset < la.endOffset) {
            ensureAccessible();
            return la;
        }
        checkIndex(offset);
        return findIt(offset);
    }
    
    private Component findIt(int offset) {
        //遍历components容器里的各个对象
        for (int low = 0, high = componentCount; low <= high;) {
            int mid = low + high >>> 1;
            Component c = components[mid];
            if (offset >= c.endOffset) {
                low = mid + 1;
            } else if (offset < c.offset) {
                high = mid - 1;
            } else {
                lastAccessed = c;
                return c;
            }
        }
        throw new Error("should not reach here");
    }
    ...
}

19.Netty设计模式之责任链模式

(1)责任链模式简介

(2)责任链模式的几个要素

(3)Netty中的责任链模式

(1)责任链模式简介

责任链模式可以使得多个对象都有机会处理同一个请求,从而避免请求的发送者何接收者之间的耦合。然后将这些对象连成一条链,并且沿着这条链传递这个请求,直到有一个对象可以处理它为止。在这个过程的处理中,每个对象都可以只处理它所关心的那一部分。如果其中有一个对象发现它不适合把这个事件继续往下传播,那么可以随时终止这个传播。

(2)责任链模式的几个要素

一.责任处理器接口

这个接口相当于责任链里的每道关卡,每道关卡都可以对一个请求进行相应处理。对应于Netty的ChannelHandler接口,有两类接口ChannelInboundHandler和ChannelOutboundHandler实现了对ChannelHandler的增强。

二.创建链的机制

创建完链之后能随时动态地添加删除责任处理器的接口,对应于Netty的ChannelPipeline链。

三.上下文的机制

当责任处理器接口在处理一些事件时,需要感知上下文,通过上下文能拿到它对应需要的一些对象。对应于Netty的ChannelHandlerContext,通过ChannelHandlerContext的channel()方法可以获取Channel,通过ChannelHandlerContext的executor()方法可以获取Reactor线程。

四.责任终止的机制

也就是每一个具体的责任处理器它都有权终止这个事件继续传播,对应于Netty的ChannelHandler接口实现中,只要不显式调用ctx.fireXXX()便可以实现XXX事件传播的终止。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档