首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >netty源码分析之EventLoop中的线程FastThreadLocalThread和队列

netty源码分析之EventLoop中的线程FastThreadLocalThread和队列

作者头像
山行AI
发布于 2020-04-07 10:18:18
发布于 2020-04-07 10:18:18
1.3K00
代码可运行
举报
文章被收录于专栏:山行AI山行AI
运行总次数:0
代码可运行

每个NioEventLoop有着自己的任务队列(taskQueue=mpscQueue和延迟队列PriorityQueue)和自己的处理线程(FastThreadLocalThread),同时也维护着自己的Selector。如果是bossGroup,在ServerBootstrap初始化时会去Selector上注册ServerSocketChannel,同时在pipeline中添加ServerBootstrapAcceptor。io.netty.channel.nio.NioEventLoop#processSelectedKey(java.nio.channels.SelectionKey, io.netty.channel.nio.AbstractNioChannel)方法中会在(readyOps & (SelectionKey.OPREAD | SelectionKey.OPACCEPT)) != 0 || readyOps == 0成立时进入unsafe.read(),此时也就是表明有连接进入,在io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe#read方法中会调用doReadMessages(readBuf)方法,在io.netty.channel.socket.nio.NioServerSocketChannel#doReadMessages方法中会通过NioSocketChannel nioSocketChannel = new NioSocketChannel(this, ch);buf.add(nioSocketChannel);的方式将NioSocketChannel放入List类型的readBuf中。此时回到io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe#read方法中,会遍历readBuf列表,调用pipeline.fireChannelRead(readBuf.get(i))方法,这样以来就会触发ServerBootstrapAcceptor中的channelRead方法,在ServerBootstrapAcceptor的channelRead方法中调用childGroup.register(child)方法,这个childGroup就是创建ServerBootstrap时传入的workerGroup,这个child就是NioSocketChannel类型的对象。在register之后,每个NioEventLoop线程都会在维护自身的task队列(普通任务队列与定时任务)的同时,在它的run方法中还会不停地执行select,在doRegister方法中会调用pipeline.fireChannelActive();方法,在方法里会在pipeline中进行传播,调用io.netty.channel.DefaultChannelPipeline.HeadContext#read方法,继而调用unsafe.beginRead()方法,在io.netty.channel.nio.AbstractNioChannel#doBeginRead方法中会进行selectionKey.interestOps(interestOps | readInterestOp)方法的调用。关于接入流程可以参考:https://www.cnblogs.com/ZhuChangwu/p/11210437.html,这些处理的主要是网络IO事件,它的任务事件是放入队列中来进行处理的。

io.netty.channel.nio.NioEventLoop

类继承关系:

它继承自SingleThreadEventLoop,它的超类是SingleThreadEventExecutor。而在下面你会发现NioEventLoopGroup中维护着多个SingleThreadEventExecutor。先来看下NioEventLoop和SingleThreadEventLoop、SingleThreadEventExecutor的代码。

SingleThreadEventExecutor

因为这个Executor的主要作用是维护其中的FastThreadLocalThread的生命周期,我们来依照这条线进行分析:

  • 线程创建:
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
 protected SingleThreadEventExecutor(
 
 EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {
 
 ----------------省略部分代码--------------------
 
 this.parent = parent;
 
 this.addTaskWakesUp = addTaskWakesUp;
 
 // 创建线程
 
          thread = threadFactory.newThread(new Runnable() {
 
 @Override
 
 public void run() {
 
 boolean success = false;
 
                  updateLastExecutionTime();
 
 try {
 
 // 回调executor的run方法
 
 SingleThreadEventExecutor.this.run();
 
                      success = true;
 
 } catch (Throwable t) {
 
                      logger.warn("Unexpected exception from an event executor: ", t);
 
 } finally {
 
 for (;;) {
 
 int oldState = STATE_UPDATER.get(SingleThreadEventExecutor.this);
 

使用threadFactory来创建线程,创建的是FastThreadLocalThread,这个在下文中会详细分析。在创建的线程的run方法中会回调SingleThreadEventExecutor的run方法。

  • 线程状态:
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
 private static final int ST_NOT_STARTED = 1;
 
 private static final int ST_STARTED = 2;
 
 private static final int ST_SHUTTING_DOWN = 3;
 
 private static final int ST_SHUTDOWN = 4;
 
 private static final int ST_TERMINATED = 5;
 


 
 private static final AtomicIntegerFieldUpdater<SingleThreadEventExecutor> STATE_UPDATER;
 
 static {
 
 AtomicIntegerFieldUpdater<SingleThreadEventExecutor> updater = PlatformDependent.newAtomicIntegerFieldUpdater(SingleThreadEventExecutor.class, "state");
 
 if (updater == null) {
 
              updater = AtomicIntegerFieldUpdater.newUpdater(SingleThreadEventExecutor.class, "state");
 
 }
 
          STATE_UPDATER = updater;
 
 }
 

通过一个AtomicIntegerFieldUpdater变量来维护着线程的状态变化。

  • 线程唤醒,为什么要有线程唤醒呢,我们来看下这个SingleThreadEventExecutor的实现类NioEventLoop中对run方法的实现:
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
 @Override
 
 protected void run() {
 
 for (;;) {
 
 boolean oldWakenUp = wakenUp.getAndSet(false);
 
 try {
 
 if (hasTasks()) {
 
                      selectNow();
 
 } else {
 
 // wakeup是要在调用selector.wakeup()之前来校检来减少wake-up发生,这是因为selector.wakeup()是一个昂贵的操作
 
                      select(oldWakenUp);
 
 ------------省略部分------
 
 if (wakenUp.get()) {
 
                          selector.wakeup();
 
 }
 
 }
 
                  cancelledKeys = 0;
 
                  needsToSelectAgain = false;
 
 final int ioRatio = this.ioRatio;
 
 if (ioRatio == 100) {
 
                      processSelectedKeys();
 
                      runAllTasks();
 
 } else {
 
 final long ioStartTime = System.nanoTime();
 
                      processSelectedKeys();
 
 final long ioTime = System.nanoTime() - ioStartTime;
 
                      runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
 
 }
 
 if (isShuttingDown()) {
 
                      closeAll();
 
 if (confirmShutdown()) {
 
 break;
 
 }
 
 }
 

NIO中的Selector封装了底层的系统调用,其中wakeup用于唤醒阻塞在select方法上的线程,它的实现很简单,在linux上就是创建一 个管道并加入poll的fd集合,wakeup就是往管道里写一个字节,那么阻塞的poll方法有数据可读就立即返回。

这里有必要再提一下ioRatio,这个参数提供了一个粗略的机制,用来大致控制处理IO相关(socket 读,链接,关闭,挂起等)和非IO相关任务的时间分配比.非IO任务是,由于使用Executor接口,例如Executor#execute(..),而在EventLoopGroup队列中的Runnable对象.参数值越小,越多的时间将消耗在非IO任务上.当前,100将禁止所有超时时间(详见源码runAllTasks(long timeoutNanos))并运行所有等待着的非IO任务.详情参考官方issue:https://github.com/netty/netty/issues/6058。

控制wakeup的属性为:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
 private final boolean addTaskWakesUp;
 

这里关注下io.netty.util.concurrent.SingleThreadEventExecutor#execute方法:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
 @Override
 
 public void execute(Runnable task) {
 
 ----------省略部分-------
 
 boolean inEventLoop = inEventLoop();
 
 if (inEventLoop) {//当前处理线程为EventLoop绑定线程时,放入队列
 
              addTask(task);
 
 } else {
 
              startThread();// 启动新的eventLoop线程
 
              addTask(task);//添加任务入队
 
 if (isShutdown() && removeTask(task)) {
 
                  reject();
 
 }
 
 }
 
 // addTaskWakesUp为true就代表只有在触发addTask(Runnable)方法时才会唤醒executor线程,默认为false
 
 if (!addTaskWakesUp && wakesUpForTask(task)) {
 
              wakeup(inEventLoop);
 
 }
 
 }
 
 // io.netty.channel.SingleThreadEventLoop#wakesUpForTask:
 
 @Override
 
 protected boolean wakesUpForTask(Runnable task) {
 
 return !(task instanceof NonWakeupRunnable);
 
 }
 
 // 添加wakeup任务
 
 protected void wakeup(boolean inEventLoop) {
 
 if (!inEventLoop || STATE_UPDATER.get(this) == ST_SHUTTING_DOWN) {
 
              taskQueue.add(WAKEUP_TASK);
 
 }
 
 }
 

该方法在之前的netty源码分析中详细地分析过,主要用于查看netty的IO线程的状态,当前处理线程为EventLoop绑定线程时,放入队列,否则启动新的EventLoop线程并将任务入队,并在线程处于shutdown状态时将任务出列并执行拒绝策略。如果上面添加的不是NonWakeupRunnable类型的task,并且当前执行线程不是EventLoop线程或者当前线程的状态为shutdown状态时,添加一个WAKEUPTASK,会在io.netty.util.concurrent.SingleThreadEventExecutor#takeTask方法从队列中取task时唤醒阻塞线程。

  • 关闭线程,在SingleThreadEventExecutor中有一个threadLock属性:
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
 private final Semaphore threadLock = new Semaphore(0);

它的主要调用位于:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
 @Override
 
 public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
 
 ------------省略部分代码--------------
 
 if (threadLock.tryAcquire(timeout, unit)) {
 
          threadLock.release();
 
 }
 
 return isTerminated();
 
 }

threadLock是一个初始值为0的信号量。一个初值为0的信号量,当线程请求锁时只会阻塞,这有什么用呢?awaitTermination()方法揭晓答案,用来使其他线程阻塞等待原生线程关闭 。

那么EventLoop线程的作用又是什么呢?

处理IO事件

对于boss NioEventLoop也就是reactor线程来说,轮询到的是基本上是连接事件(OP_ACCEPT):

源码调用链:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
1. io.netty.bootstrap.ServerBootstrap#group(io.netty.channel.EventLoopGroup, io.netty.channel.EventLoopGroup)
 
 public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
 
 super.group(parentGroup);
 
 ----------
 
 this.childGroup = childGroup;}
 
2. 上面将parentGroup传入了super的group方法io.netty.bootstrap.AbstractBootstrap#group(io.netty.channel.EventLoopGroup)public B group(EventLoopGroup group) {
 
 -----------------------
 
 this.group = group;
 
 return (B) this;
 
 }
 
传给了AbstractBootstrap的group属性。在AbstractBootstrap内部的io.netty.bootstrap.AbstractBootstrap#bind()public ChannelFuture bind() {
 
 -----------------------
 
 return doBind(localAddress);
 
 }
 
 doBind方法:
 
 private ChannelFuture doBind(final SocketAddress localAddress) {
 
 final ChannelFuture regFuture = initAndRegister();
 
 final Channel channel = regFuture.channel();
 
 ---------------
 
  io.netty.bootstrap.AbstractBootstrap#initAndRegister方法:
 
 final ChannelFuture initAndRegister() {
 
 final Channel channel = channelFactory().newChannel();
 
 try {
 
            init(channel);
 
 ------------------------------
 
 }
 
 ChannelFuture regFuture = group().register(channel);//这里是parentGroup
 

这里的group调用的是初始时传入的parentGroup,紧接着进入io.netty.channel.EventLoopGroup#register(io.netty.channel.Channel)方法,该方法会根据传入的Channel为ServerSocketChannel和SocketChannel来决定注册不同的事件到Selector上,这里主要是accept事件。执行register方法时会从boosGroup的线程组中使用EventExecutorChooser选择出一个NioEventLoop来进行register操作,所以一般boosGroup中的线程数量都是一个。详细分析参考之前的关于netty源码分析的公众号文章。这里还有一点需要注意的是io.netty.bootstrap.ServerBootstrap#init方法:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
 @Override
 
 void init(Channel channel) throws Exception {
 
 -------------------------------
 
        p.addLast(new ChannelInitializer<Channel>() {
 
 @Override
 
 public void initChannel(Channel ch) throws Exception {
 
                ch.pipeline().addLast(new ServerBootstrapAcceptor(
 
                        currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
 
 }
 
 });
 
 }
 

这里的ServerBootstrapAcceptor就是worker NioEventLoop工作的关键点了。

对于worker NioEventLoop来说,轮询到的基本上是IO读写事件(以OP_READ为例):

这里简要地过一遍它的源码流程:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
//io.netty.bootstrap.ServerBootstrap#group(io.netty.channel.EventLoopGroup, io.netty.channel.EventLoopGroup)
 
 public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
 
 super.group(parentGroup);
 
 ----------
 
 this.childGroup = childGroup;}
 
//  这里赋值给了childGroup属性。
 
//接着看io.netty.bootstrap.ServerBootstrap#init方法,上面已经列出,主要是将childGroup传给了ServerBootstrapAcceptor的childGroup属性。我们来看下具体作用,在io.netty.bootstrap.ServerBootstrap.ServerBootstrapAcceptor#channelRead方法:
 
 public void channelRead(ChannelHandlerContext ctx, Object msg) {
 
 final Channel child = (Channel) msg;
 
            child.pipeline().addLast(childHandler);
 
--------------------------------------------------------
 
 try {
 
                childGroup.register(child).addListener(new ChannelFutureListener() {
 
 -----------------------------------------
 
 });
 
在childGroup.register(child)中child对应的就是每个SocketChannel
 

ServerBootstrapAcceptor就是大名鼎鼎的reactor模型中的acceptor,这里childGroup.register(child)中child对应的就是每个SocketChannel,执行register方法时会从workerGroup的线程组中使用EventExecutorChooser选择出一个NioEventLoop来进行register操作,主要执行Selector的事件注册,这里主要是读写事件。关于EventExecutorChooser和register的介绍之前的文章中有过详细分析,这里不再赘述。

任务处理

处理用户产生的普通任务:

NioEventLoop中的Queue taskQueue被用来承载用户产生的普通Task。任务入列方法io.netty.util.concurrent.SingleThreadEventExecutor#addTask:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
protected void addTask(Runnable task) {
 
 if (task == null) {
 
 throw new NullPointerException("task");
 
 }
 
 if (isShutdown()) {
 
            reject();
 
 }
 
        taskQueue.add(task);
 
 }
 

taskQueue的创建是io.netty.channel.nio.NioEventLoop#newTaskQueue方法:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
 @Override
 
 protected Queue<Runnable> newTaskQueue() {
 
 // This event loop never calls takeTask()
 
 return PlatformDependent.newMpscQueue();
 
 }
 

使用的是mpsc队列,也就是多生产者单消费者队列。

taskQueue被实现为netty的mpscQueue,即多生产者单消费者队列。netty使用该队列将外部用户线程产生的Task聚集,并在reactor线程内部用单线程的方式串行执行队列中的Task。

当用户在非IO线程调用Channel的各种方法执行Channel相关的操作时,比如channel.write()、channel.flush()等,netty会将相关操作封装成一个Task并放入taskQueue中,保证相关操作在IO线程中串行执行。

处理用户产生的定时任务:

关于定时任务就需要看下io.netty.util.concurrent.SingleThreadEventExecutor#schedule(java.lang.Runnable, long, java.util.concurrent.TimeUnit)方法代码:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
 @Override
 
 public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
 
 ----------省略部分代码---------------------
 
 return schedule(new ScheduledFutureTask<Void>(
 
 this, delayedTaskQueue, command, null, ScheduledFutureTask.deadlineNanos(unit.toNanos(delay))));
 
 }
 

ScheduledFutureTask中传入的队列为delayedTaskQueue:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
final Queue<ScheduledFutureTask<?>> delayedTaskQueue = new PriorityQueue<ScheduledFutureTask<?>>();

NioEventLoop中的Queue> delayedTaskQueue = new PriorityQueue被用来承载用户产生的定时Task。它是一个优先队列。

当用户在非IO线程需要产生定时操作时,netty将用户的定时操作封装成ScheduledFutureTask,即一个netty内部的定时Task,并将定时Task放入delayedTaskQueue中等待对应Channel的IO线程串行执行。

为了解决多线程并发写入delayedTaskQueue的问题,netty将添加ScheduledFutureTask到delayedTaskQueue中的操作封装成普通Task,放入taskQueue中,通过NioEventLoop的IO线程对delayedTaskQueue进行单线程写操作。

处理任务队列的逻辑:

  1. 将已到期的定时Task从delayedTaskQueue中转移到taskQueue中
  2. 计算本次循环执行的截止时间
  3. 循环执行taskQueue中的任务,每隔64个任务检查一下是否已过截止时间,直到taskQueue中任务全部执行完或者超过执行截止时间。

io.netty.util.concurrent.SingleThreadEventExecutor#takeTask方法:

在io.netty.util.concurrent.SingleThreadEventExecutor#fetchFromDelayedQueue方法内部进行任务迁移的操作:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
 private void fetchFromDelayedQueue() {
 
 long nanoTime = 0L;
 
 for (;;) {
 
 ScheduledFutureTask<?> delayedTask = delayedTaskQueue.peek();
 
 if (delayedTask == null) {
 
 break;
 
 }
 


 
 if (nanoTime == 0L) {
 
                nanoTime = ScheduledFutureTask.nanoTime();
 
 }
 


 
 if (delayedTask.deadlineNanos() <= nanoTime) {
 
                delayedTaskQueue.remove();// 从delayedTaskQueue中移除
 
                taskQueue.add(delayedTask);// 添加到任务队列中
 
 } else {
 
 break;
 
 }
 
 }
 
 }
 

io.netty.channel.nio.NioEventLoopGroup

NioEventLoopGroup中主要维护一组EventLoop,EventLoop实现了Executor接口,里面维护着executor线程和方法。

NioEventLoopGroup的类继承关系:

MultithreadEventLoopGroup

在这个类的静态代码块中对EventLoopGroup的默认线程数进行了初始化:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
 static {
 
        DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
 
 "io.netty.eventLoopThreads", Runtime.getRuntime().availableProcessors() * 2));
 
 }
 

并且对ThreadFactory进行的初始化:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
 @Override
 
 protected ThreadFactory newDefaultThreadFactory() {
 
 return new DefaultThreadFactory(getClass(), Thread.MAX_PRIORITY);
 
 }
 

io.netty.util.concurrent.DefaultThreadFactory#DefaultThreadFactory(java.lang.Class, int):

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public DefaultThreadFactory(String poolName, boolean daemon, int priority) {
 
 if (poolName == null) {
 
 throw new NullPointerException("poolName");
 
 }
 
 if (priority < Thread.MIN_PRIORITY || priority > Thread.MAX_PRIORITY) {
 
 throw new IllegalArgumentException(
 
 "priority: " + priority + " (expected: Thread.MIN_PRIORITY <= priority <= Thread.MAX_PRIORITY)");
 
 }
 


 
        prefix = poolName + '-' + poolId.incrementAndGet() + '-';
 
 this.daemon = daemon;
 
 this.priority = priority;
 
 }
 

这里我们主要关注一下它的newThread方法:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
 @Override
 
 public Thread newThread(Runnable r) {
 
 Thread t = newThread(new DefaultRunnableDecorator(r), prefix + nextId.incrementAndGet());
 
 --------------------------
 
 }
 


 
 protected Thread newThread(Runnable r, String name) {
 
 return new FastThreadLocalThread(r, name);
 
 }
 

这里创建的线程为FastThreadLocalThread,接着顺便来分析下FastThreadLocalThread,先来看下它与普通线程不一样的属性和方法:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
 private InternalThreadLocalMap threadLocalMap;
 
 /**
 
     * Returns the internal data structure that keeps the thread-local variables bound to this thread.
 
     * Note that this method is for internal use only, and thus is subject to change at any time.
 
     */
 
 public final InternalThreadLocalMap threadLocalMap() {
 
 return threadLocalMap;
 
 }
 


 
 /**
 
     * Sets the internal data structure that keeps the thread-local variables bound to this thread.
 
     * Note that this method is for internal use only, and thus is subject to change at any time.
 
     */
 
 public final void setThreadLocalMap(InternalThreadLocalMap threadLocalMap) {
 
 this.threadLocalMap = threadLocalMap;
 
 }
 

JDK 中自带的 ThreadLocal 在线程池使用环境中,有内存泄漏的风险,很明显,Netty 为了避免这个 bug,重新进行了封装。它主要用于与io.netty.util.concurrent.FastThreadLocal合用,就如同Thread与ThreadLocal合用一样(关于ThreadLocal、InheritThreadLocal和TransmitableThreadLocal的源码之前有一系列的文章分别分析过,需要详细了解的请翻阅历史文章)。我们知道解决hash冲突的办法主要有以下几种:

  1. 开放定址法(线性探测再散列,二次探测再散列,伪随机探测再散列)
  2. 再次哈希法(rehash)
  3. 链地址法
  4. 建立一个公共溢出区

Java中hashmap的解决办法就是采用的链地址法。这里我们补充一下hashmap中的知识点:

  • JDK1.8 HashMap的优化:一方面引入红黑树解决过长链表效率低的问题;另一方面重写resize方法,移除了alternative hashing相关方法,避免重新计算键的hash。
  • HashMap的线程不安全体现在:多线程同时put添加元素会丢失元素,多线程同时扩容会造成死循环。

关于FastThreadLocal,我们简要来分析几点:

缓冲行填充

io.netty.util.internal.InternalThreadLocalMap中有几个long类型的属性:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
 // Cache line padding (must be public)
 
 // With CompressedOops enabled, an instance of this class should occupy at least 128 bytes.
 
 public long rp1, rp2, rp3, rp4, rp5, rp6, rp7, rp8, rp9;
 

对cpu缓存行进行填充,防止因为伪共享导致的缓存失效问题。

fastGet和slowGet方法

io.netty.util.internal.InternalThreadLocalMap#get:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
 public static InternalThreadLocalMap get() {
 
 Thread thread = Thread.currentThread();
 
 if (thread instanceof FastThreadLocalThread) {
 
 return fastGet((FastThreadLocalThread) thread);
 
 } else {
 
 return slowGet();
 
 }
 
 }
 

会根据当前线程类型来决定走fastGet方法还是slowGet方法.

io.netty.util.internal.InternalThreadLocalMap#fastGet:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
 private static InternalThreadLocalMap fastGet(FastThreadLocalThread thread) {
 
 InternalThreadLocalMap threadLocalMap = thread.threadLocalMap();
 
 if (threadLocalMap == null) {
 
            thread.setThreadLocalMap(threadLocalMap = new InternalThreadLocalMap());
 
 }
 
 return threadLocalMap;
 
 }
 


 
// 构造方法:
 
 private InternalThreadLocalMap() {
 
 super(newIndexedVariableTable());
 
 }
 
 private static Object[] newIndexedVariableTable() {
 
 Object[] array = new Object[32];
 
 Arrays.fill(array, UNSET);
 
 return array;
 
 }
 


 
// super构造方法:
 
 UnpaddedInternalThreadLocalMap(Object[] indexedVariables) {
 
 this.indexedVariables = indexedVariables;
 
 }
 

在fastGet方法中针对的是FastThreadLocalThread线程,也就是netty的内部线程(与EventLoop关联使用的线程,用的是io.netty.util.internal.InternalThreadLocalMap。,这个 Map 内部维护的是一个数组,和 JDK 不同,JDK 维护的是一个使用线性探测法的 Map,可见,从底层数据结构上,JDK ThreadLocalMap就已经输了,他们的读取速度相差很大,特别是当数据量很大的时候,Netty 的数据结构速度依然不变,而 JDK ThreadLocalMap由于使用线性探测法,速度会相应的下降。

io.netty.util.internal.InternalThreadLocalMap#slowGet:

代码语言:javascript
代码运行次数:0
运行
复制
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
 private static InternalThreadLocalMap slowGet() {
 
 ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = UnpaddedInternalThreadLocalMap.slowThreadLocalMap;
 
 if (slowThreadLocalMap == null) {
 
 UnpaddedInternalThreadLocalMap.slowThreadLocalMap =
 
                    slowThreadLocalMap = new ThreadLocal<InternalThreadLocalMap>();
 
 }
 


 
 InternalThreadLocalMap ret = slowThreadLocalMap.get();
 
 if (ret == null) {
 
            ret = new InternalThreadLocalMap();
 
            slowThreadLocalMap.set(ret);
 
 }
 
 return ret;
 
 }
 
  • 这个方法针对的是普通线程,非FastThreadLocalThread线程。它使用的是ThreadLocal变量,在ThreadLocal变量内部存放的是InternalThreadLocalMap。在之前的文章中有详细分析过ThreadLocal,它的内部是基于ThreadLocalMap实现的,ThreadLocalMap内部Entry是一个WeakReference类型(弱引用级别比软引用更低。当对象根节点可及、无强引用和软引用、有弱引用指向对象时,若发生GC,该对象将直接被回收)的hashMap的结构。上上面提到过,hashmap是线性探测法的 Map。
  • 这个方法首先使用 JDK 的 ThreadLocal 获取一个 Netty 的 InternalThreadLocalMap,如果没有就创建一个,并将这个 InternalThreadLocalMap 设置到 JDK 的 ThreadLocal 中,然后返回这个 InternalThreadLocalMap。从这里可以看出,为了提高性能,Netty 还是避免使用了JDK 的 threadLocalMap,他的方式是曲线救国:在JDK 的 threadLocal 中设置 Netty 的 InternalThreadLocalMap ,然后,这个 InternalThreadLocalMap 中设置 Netty 的 FastThreadLcoal。
io.netty.util.concurrent.FastThreadLocal#set与get方法
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
 /**
 
     * Set the value for the current thread.
 
     */
 
 public final void set(V value) {
 
 if (value != InternalThreadLocalMap.UNSET) {
 
            set(InternalThreadLocalMap.get(), value);
 
 } else {
 
            remove();
 
 }
 
 }
 


 
public final void set(InternalThreadLocalMap threadLocalMap, V value) {
 
 if (value != InternalThreadLocalMap.UNSET) {
 
 // 设置变量
 
 if (threadLocalMap.setIndexedVariable(index, value)) {
 
                addToVariablesToRemove(threadLocalMap, this);
 
 }
 
 } else {
 
            remove(threadLocalMap);
 
 }
 
 }
 


 
//index是在FastThreadLocal的构造方法中初始化的:
 
public FastThreadLocal() {
 
        index = InternalThreadLocalMap.nextVariableIndex();
 
 }
 


 
//io.netty.util.internal.InternalThreadLocalMap#nextVariableIndex:
 
 public static int nextVariableIndex() {
 
 // 通过AtomicInteger维护
 
 int index = nextIndex.getAndIncrement();
 
 if (index < 0) {
 
            nextIndex.decrementAndGet();
 
 throw new IllegalStateException("too many thread-local indexed variables");
 
 }
 
 return index;
 
 }
 

它的每个变量值在set进去后可以根据index快速定位到指定index在数组中的值,看下get方法就很清晰了:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
 /**
 
     * Returns the current value for the current thread
 
     */
 
 public final V get() {
 
 return get(InternalThreadLocalMap.get());
 
 }
 


 
 /**
 
     * Returns the current value for the specified thread local map.
 
     * The specified thread local map must be for the current thread.
 
     */
 
 @SuppressWarnings("unchecked")
 
 public final V get(InternalThreadLocalMap threadLocalMap) {
 
 Object v = threadLocalMap.indexedVariable(index);
 
 if (v != InternalThreadLocalMap.UNSET) {
 
 return (V) v;
 
 }
 


 
 return initialize(threadLocalMap);
 
 }
 


 
// io.netty.util.internal.InternalThreadLocalMap#indexedVariable
 
 public Object indexedVariable(int index) {
 
 Object[] lookup = indexedVariables;
 
 return index < lookup.length? lookup[index] : UNSET;
 
 }
 

它能够根据index的值快速定位到数组中的元素,而它的索引是通过AtomicInteger来维护的.

拓容
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
private void expandIndexedVariableTableAndSet(int index, Object value) {
 
 Object[] oldArray = indexedVariables;
 
 final int oldCapacity = oldArray.length;
 
 int newCapacity = index;
 
        newCapacity |= newCapacity >>> 1;
 
        newCapacity |= newCapacity >>> 2;
 
        newCapacity |= newCapacity >>> 4;
 
        newCapacity |= newCapacity >>> 8;
 
        newCapacity |= newCapacity >>> 16;
 
        newCapacity ++;
 


 
 Object[] newArray = Arrays.copyOf(oldArray, newCapacity);
 
 Arrays.fill(newArray, oldCapacity, newArray.length, UNSET);
 
        newArray[index] = value;
 
        indexedVariables = newArray;
 
 }
 

这里和hashMap的扩容对比着看就很好理解了,这段代码的作用就是按原来的容量扩容2倍。并且保证结果是2的幂次方。这里 Netty 的做法和 HashMap 一样,按照原来的容量扩容到最近的 2 的幂次方大小,比如原来32,就扩容到64,然后,将原来数组的内容填充到新数组中,剩余的填充 空对象,然后将新数组赋值给成员变量 indexedVariables。完成了一次扩容。

从上面几点可以看出FastThreadLocalThread与FastThreadLocal合并时的主要特点是快,更多细节请参考:https://www.jianshu.com/p/3fc2fbac4bb7和https://www.jianshu.com/p/6adfa89ed06e。而DefaultThreadFactory创建的线程都是FastThreadLocalThread类型的.

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-03-28,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 开发架构二三事 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
国产!全志T113-i 双核Cortex-A7@1.2GHz 工业开发板—视频开发案例
本文主要介绍基于创龙科技TLT113-EVM评估板的视频开发案例,适用开发环境如下。
创龙科技Tronlong
2025/08/26
2750
国产!全志T113-i 双核Cortex-A7@1.2GHz 工业开发板—视频开发案例
基于FPGA的CameraLink视频开发案例
CameraLink协议是一种专门针对机器视觉应用领域的串行通信协议,它使用低压差分信号(LVDS)进行数据的传输和通信。CameraLink标准是在ChannelLink标准的基础上多加了6对差分信号线,其中4对用于并行传输相机控制信号,另外2对用于相机和图像采集卡之间的串行通信(本质就是UART的两根线)。
FPGA开源工作室
2021/02/24
2.3K0
基于FPGA的CameraLink视频开发案例
基于FPGA的两种SDI视频方案(GTX+外接芯片)
SDI接口,全称是“数字分量串行接口(Serial Digital Interface)”。按速率可分为标准清新度SD-SDI、高清标准HD-SDI和3G-SDI,其对应速率分别是270Mb/s、1.485Gb/s和2.97Gb/s。目前在航空航天、军事、医疗、交通等领域,SDI的应用广泛度仅次于CameraLink接口。
FPGA开源工作室
2021/02/24
7.3K0
基于FPGA的两种SDI视频方案(GTX+外接芯片)
基于TI Sitara系列AM5728工业开发板——FPGA视频开发案例分享
本文主要介绍FPGA视频开发案例的使用说明,适用开发环境:Windows 7/10 64bit、Xilinx Vivado 2017.4、Xilinx SDK 2017.4。
创龙科技Tronlong
2023/05/04
7070
基于TI Sitara系列AM5728工业开发板——FPGA视频开发案例分享
基于ZYNQ的CameraLink图像采集与边缘检测开发详解
(1) PL端接入CameraLink相机,通过Base模式采集图像(1280*1024),然后通过VDMA缓存到PS端DDR。
FPGA开源工作室
2020/05/20
5.3K1
最新最全的DSP+FPGA视频/高速AD开发案例合集来了,附上源码
TI C6678 + Xilinx Kintex-7作为DSP+FPGA架构的经典组合,凭借FPGA的高速采集和DSP的高性能算法处理完美结合的特性,一直被广泛应用于视频追踪、图像处理、软件无线电、雷达探测、光电探测、水下探测以及定位导航等嵌入式应用场景。
创龙科技Tronlong
2022/08/30
2.1K0
最新最全的DSP+FPGA视频/高速AD开发案例合集来了,附上源码
嵌入式HLS 案例开发步骤分享——基于Zynq-7010/20工业开发板(4)
本文主要介绍 HLS 案例的使用说明,适用开发环境: Windows 7/10 64bit、Xilinx Vivado
用户8594645
2023/01/02
4220
嵌入式HLS 案例开发步骤分享——基于Zynq-7010/20工业开发板(4)
嵌入式硬件开发学习教程——Xilinx Vivado HLS案例 (3)
本文主要介绍HLS案例的使用说明,适用开发环境:Windows 7/10 64bit、Xilinx Vivado 2017.4、Xilinx Vivado HLS 2017.4、Xilinx SDK 2017.4。
创龙科技Tronlong
2021/11/11
1.9K0
嵌入式硬件开发学习教程——Xilinx Vivado HLS案例 (3)
基于FPGA的光口通信开发案例
自著名华人物理学家高锟先生提出“光传输理论”,实用化的光纤传输产品始于1976年,经历了PDH→SDH→DWDM→ASON→MSTP的发展历程。本世纪初期,ASON/OADM 技术已在通信技术当中广泛应用,逐渐发展成为以骨干网络传输为介质的ROADM技术。
FPGA开源工作室
2021/01/27
2.4K0
基于FPGA的光口通信开发案例
FPGA案例开发资料(下)——TMS320C6678+Kintex-7开发板
本文主要介绍基于Vivado的FPGA案例的使用说明,适用开发环境:Windows 7/10 64bit、Xilinx Vivado 2017.4。其中案例包括led_flash案例、key_test案例、ibert_eyescan案例、udp_10g_echo案例、fmc_ad9706_ad9613案例、bram_srio_target案例。
创龙科技Tronlong
2022/08/14
1.8K0
FPGA案例开发资料(下)——TMS320C6678+Kintex-7开发板
基于 FPGA 的低成本、低延时成像系统
《优秀的IC/FPGA开源项目》是新开的系列,旨在介绍单一项目,会比《优秀的 Verilog/FPGA开源项目》内容介绍更加详细,包括但不限于综合、上板测试等。两者相辅相成,互补互充~
碎碎思
2023/09/01
6920
基于 FPGA 的低成本、低延时成像系统
惊艳!200MSPS采样率,RK3588F高速AD采集与实时显示案例来了!
科技飞速发展,高速数据采集与实时显示技术成为众多领域的关键需求。今天给大家分享一个基于瑞芯微RK3588J + FPGA的高速AD采集与实时显示案例。适用开发环境如下:
创龙科技Tronlong
2025/02/25
3370
惊艳!200MSPS采样率,RK3588F高速AD采集与实时显示案例来了!
Zynq7020 使用 Video Processing Subsystem 实现图像缩放
没玩过图像缩放都不好意思说自己玩儿过FPGA,这是CSDN某大佬说过的一句话,鄙人深信不疑。。。
芯动大师
2024/01/16
7990
Zynq7020 使用 Video Processing Subsystem 实现图像缩放
FPGA Xilinx Zynq 系列(三十二)AXI 接口
今天给大侠带来FPGA Xilinx Zynq 系列第三十二篇,开启十九章, 带来 AXI 接口等相关内容,本篇内容目录简介如下:
FPGA技术江湖
2020/12/30
2.9K0
基于TMS320C6678开发板的ZYNQ PS + PL异构多核案例开发手册(2)
本文主要介绍ZYNQ PS + PL异构多核案例的使用说明,适用开发环境:Windows 7/10 64bit、Xilinx Vivado 2017.4、Xilinx SDK 2017.4。其中测试板卡为TMS320C6678开发板,文章内容包含多个特色案例,如axi_gpio_led_demo案例、axi_timer_pwm_demo案例、axi_uart_demo案例、emio_gpio_led_demo案例、mig_dma案例等,由于篇幅过长,文章分为上下6个小节展示,欢迎大家按照顺序进行文章内容查看。
创龙科技Tronlong
2022/07/31
1.6K0
基于TMS320C6678开发板的ZYNQ PS + PL异构多核案例开发手册(2)
【高清视频方案分享】12G-SDI与CameraLink输入输出,基于RK3588J+FPGA工业平台
CameraLink是一种用于机器视觉和工业成像应用的标准化数字接口协议。它由自动化成像协会(Automated Imaging Association)开发,旨在解决传统模拟视频接口的局限性,提供一种高效、可靠且易于使用的数字解决方案,以实现相机与图像处理系统之间的高速数据传输。
创龙科技Tronlong
2024/12/23
5960
【高清视频方案分享】12G-SDI与CameraLink输入输出,基于RK3588J+FPGA工业平台
FPGA项目开发之AXI Stream FIFO IP
Xilinx Vivado中提供了AXI FIFO和AXI virtual FIFO类似IP,这篇文章主要通过实例来讲解这两个IP的使用方法。
碎碎思
2022/11/14
3.3K0
FPGA项目开发之AXI Stream FIFO IP
ZYNQ7035 PL Cameralink回环实现
本文主要介绍说明XQ6657Z35-EVM评估板Cameralink回环例程的功能、使用步骤以及各个例程的运行效果。
Xines广州星嵌
2023/02/24
9921
ZYNQ7035 PL Cameralink回环实现
解锁4K,Xilinx MPSoC ARM + FPGA高清视频采集与显示方案!
当下,随着数字化多媒体技术以令人惊叹的速度不断演进,高清视频处理成为众多领域关注的焦点。今天为大家分享4K HDMI 高清视频方案,基于Xilinx UltraScale+ MPSoC XCZU7EV高性能平台。
创龙科技Tronlong
2024/12/23
3510
解锁4K,Xilinx MPSoC ARM + FPGA高清视频采集与显示方案!
基于TMS320C6678开发板的ZYNQ PS + PL异构多核案例开发手册(3)
本文主要介绍ZYNQ PS + PL异构多核案例的使用说明,适用开发环境:Windows 7/10 64bit、Xilinx Vivado 2017.4、Xilinx SDK 2017.4。其中测试板卡为TMS320C6678开发板,文章内容包含多个特色案例,如axi_gpio_led_demo案例、axi_timer_pwm_demo案例、axi_uart_demo案例、emio_gpio_led_demo案例、mig_dma案例等,由于篇幅过长,文章分为上下6个小节展示,欢迎大家按照顺序进行文章内容查看。
创龙科技Tronlong
2022/07/31
1.3K0
基于TMS320C6678开发板的ZYNQ PS + PL异构多核案例开发手册(3)
推荐阅读
相关推荐
国产!全志T113-i 双核Cortex-A7@1.2GHz 工业开发板—视频开发案例
更多 >
LV.0
这个人很懒,什么都没有留下~
目录
  • io.netty.channel.nio.NioEventLoop
    • 类继承关系:
    • SingleThreadEventExecutor
    • 处理IO事件
      • 任务处理
  • io.netty.channel.nio.NioEventLoopGroup
    • NioEventLoopGroup的类继承关系:
    • MultithreadEventLoopGroup
      • 缓冲行填充
      • fastGet和slowGet方法
      • io.netty.util.concurrent.FastThreadLocal#set与get方法
      • 拓容
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档