前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
社区首页 >专栏 >深入剖析 Netty 中的 NioEventLoopGroup:架构与实现

深入剖析 Netty 中的 NioEventLoopGroup:架构与实现

原创
作者头像
菜菜的后端私房菜
发布于 2025-02-25 09:21:41
发布于 2025-02-25 09:21:41
18700
代码可运行
举报
运行总次数:0
代码可运行

深入剖析 Netty 中的 NioEventLoopGroup:架构与实现

引言

在高性能网络编程领域,Netty 以其卓越的性能和灵活的架构脱颖而出

Netty 的核心组件之一 NioEventLoopGroup,在处理网络事件和任务调度中扮演着至关重要的角色

本文将深入剖析 NioEventLoopGroup 的架构设计、初始化流程、事件循环机制以及任务处理逻辑,帮助你全面理解 Netty 的内部实现,提升你的网络编程能力

简介

NioEventLoopGroup是Netty负责事件循环的入口,先来查看下它继承的类与实现的接口:

NioEventLoopGroup
NioEventLoopGroup

从图中可以大致得到几个信息:

  1. NioEventLoopGroup是线程池(实现Executor接口)
  2. NioEventLoopGroup可能是多线程的(父类MultithreadEventLoopGroup名字上看是多线程的)
  3. NioEventLoopGroup可以执行延迟/定时任务(实现ScheduledExecutorService接口)

从NioEventLoopGroup名称可以猜测,它通过组的方式管理内部的多个NioEventLoop来实现事件循环

接下来,再来查看NioEventLoop相关的接口与父类:

NioEventLoop
NioEventLoop

从图中可以看出:NioEventLoop是单线程的线程池(父类SingleThreadEventLoop)

从NioEventLoop字段中还可以看到,它有包含NIO的Selector(多路复用模型处理网络IO)

因此NioEventLoop是单线程的Selector,它会循环监听Channel上的事件并进行处理,以此实现事件循环

NioEventLoopGroup通过管理多个NioEventLoop的方式来进行水平扩展,充分解耦

使用Netty时通常会采用两个NioEventLoopGroup分别来处理连接、读/写事件:

其中处理连接事件的NioEventLoopGroup称为Parent(Boss),处理读写事件的NioEventLoopGroup成为child(work)

它们各司其职,充分解耦,当一方成为系统瓶颈时,只需要增大组内EventLoop数量即可(硬件资源足够的情况)

了解组件负责的功能后,我们马上开始查看源码进行分析其内部实现,并总结出其实现的原理以及思路

源码分析

要想了解其内部实现,还是要从初始化的源码开始分析

MultithreadEventExecutorGroup 实例化

先来一张NioEventLoopGroup实例化的大致流程图,便于结合代码分析

NioEventLoopGroup实例化流程
NioEventLoopGroup实例化流程

我们可以指定组内NioEventLoop的数量来初始化NioEventLoopGroup

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
 NioEventLoopGroup parentGroup = new NioEventLoopGroup(1);
 NioEventLoopGroup childGroup = new NioEventLoopGroup(10);

初始化NioEventLoopGroup,核心实现在其父类MultithreadEventExecutorGroup中(大致流程如下):

  1. 初始化Group的线程池(用于创建、启动单线程NioEventLoop)
  2. 初始化组内的NioEventLoop (newChild)
  3. 初始化选择器(选择下一个EventLoop来执行任务,实现EventExecutorGroup接口的next方法)
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
 protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                         EventExecutorChooserFactory chooserFactory, Object... args) {
     if (nThreads <= 0) {
         throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
     }
     
     //group的线程池,用于创建、启动event loop的单线程
     if (executor == null) {
         executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
     }
 
     //group的children就是要管理的event loop
     //初始化要管理的event loop
     children = new EventExecutor[nThreads];
     for (int i = 0; i < nThreads; i ++) {
         boolean success = false;
         try {
             //创建event loop
             children[i] = newChild(executor, args);
             success = true;
         } catch (Exception e) {
             // TODO: Think about if this is a good exception type
             throw new IllegalStateException("failed to create a child event loop", e);
         } finally {
             if (!success) {
                 //创建失败停止、关闭所有event loop
                 for (int j = 0; j < i; j ++) {
                     children[j].shutdownGracefully();
                 }
 
                 for (int j = 0; j < i; j ++) {
                     EventExecutor e = children[j];
                     try {
                         while (!e.isTerminated()) {
                             e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                         }
                     } catch (InterruptedException interrupted) {
                         // Let the caller handle the interruption.
                         Thread.currentThread().interrupt();
                         break;
                     }
                 }
             }
         }
     }
 
     //创建选择器,用于选择下一个事件哪个event loop来执行
     chooser = chooserFactory.newChooser(children);
 
     //添加终止回调
     final FutureListener<Object> terminationListener = new FutureListener<Object>() {
         @Override
         public void operationComplete(Future<Object> future) throws Exception {
             if (terminatedChildren.incrementAndGet() == children.length) {
                 terminationFuture.setSuccess(null);
             }
         }
     };
     for (EventExecutor e: children) {
         e.terminationFuture().addListener(terminationListener);
     }
 
     Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
     Collections.addAll(childrenSet, children);
     readonlyChildren = Collections.unmodifiableSet(childrenSet);
 }

总的来说实例化Group就是:根据元数据创建Group内部组件:Executor、NioEventLoop、EventExecutorChooser

如果实例化时不传入Executor,默认情况下会使用ThreadPerTaskExecutor,它会使用工厂创建线程并执行

(这里的线程就是单线程EventLoop)

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
 public final class ThreadPerTaskExecutor implements Executor {
     private final ThreadFactory threadFactory;
 
     public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
         this.threadFactory = ObjectUtil.checkNotNull(threadFactory, "threadFactory");
     }
 
     @Override
     public void execute(Runnable command) {
         threadFactory.newThread(command).start();
     }
 }

遍历children数组调用newChild根据元数据创建NioEventLoop

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
 @Override
 protected EventLoop newChild(Executor executor, Object... args) throws Exception {
     SelectorProvider selectorProvider = (SelectorProvider) args[0];
     SelectStrategyFactory selectStrategyFactory = (SelectStrategyFactory) args[1];
     RejectedExecutionHandler rejectedExecutionHandler = (RejectedExecutionHandler) args[2];
     EventLoopTaskQueueFactory taskQueueFactory = null;
     EventLoopTaskQueueFactory tailTaskQueueFactory = null;
 
     int argsLength = args.length;
     if (argsLength > 3) {
         taskQueueFactory = (EventLoopTaskQueueFactory) args[3];
     }
     if (argsLength > 4) {
         tailTaskQueueFactory = (EventLoopTaskQueueFactory) args[4];
     }
     return new NioEventLoop(this, executor, selectorProvider,
             selectStrategyFactory.newSelectStrategy(),
             rejectedExecutionHandler, taskQueueFactory, tailTaskQueueFactory);
 }

NioEventLoop中会存在两个selector:

  1. unwrappedSelector(未优化):selector + JDK原生HashSet存储SelectionKey
  2. selector(优化):selector + 数组存储SelectionKey

SelectionKey用于判断channel上发生什么事件,事件循环过程中经常需要遍历它来进行处理,因此使用数组代替哈希表进行优化

默认情况下采用优化后的selector

原生的SelectionKey由HashSet实现,优化后使用数组SelectedSelectionKeySetSelector

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
 NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
              SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
              EventLoopTaskQueueFactory queueFactory) {
     super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),
             rejectedExecutionHandler);
     if (selectorProvider == null) {
         throw new NullPointerException("selectorProvider");
     }
     if (strategy == null) {
         throw new NullPointerException("selectStrategy");
     }
     provider = selectorProvider;
     //封装未优化、优化的Selector
     final SelectorTuple selectorTuple = openSelector();
     selector = selectorTuple.selector;
     unwrappedSelector = selectorTuple.unwrappedSelector;
     selectStrategy = strategy;
 }

在实例化过程中并不会创建NioEventLoop处理事件循环的单线程,而是采用懒加载,执行execute才会由Group的executor来启动

分析完大致的实例化流程再来查看这张核心步骤的流程图回味一下:

NioEventLoopGroup实例化核心流程
NioEventLoopGroup实例化核心流程
NioEventLoop线程启动

注册时会启动事件循环线程,当任务交给NioEventLoop执行时,会调用父类SingleThreadEventExecutor的execute

SingleThreadEventExecutor.execute 中的startThread方法会使用Group的executor创建、启动处理事件循环的线程(未启动的情况下)

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
 public void execute(Runnable task) {
     if (task == null) {
         throw new NullPointerException("task");
     }
     
     //当前线程是否为事件循环线程
     boolean inEventLoop = inEventLoop();
     //添加任务到队列
     addTask(task);
     if (!inEventLoop) {
         //启动线程
         startThread();
         if (isShutdown()) {
             boolean reject = false;
             try {
                 if (removeTask(task)) {
                     reject = true;
                 }
             } catch (UnsupportedOperationException e) {
             }
             if (reject) {
                 reject();
             }
         }
     }
     //唤醒
     if (!addTaskWakesUp && wakesUpForTask(task)) {
         wakeup(inEventLoop);
     }
 }

startThread 创建、启动EventLoop线程流程比较绕,需要debug,看代码不容易理解(看下图)

最终会调用Group的Executor的execute方法创建、启动线程

启动NioEventLoop线程
启动NioEventLoop线程

(期间ThreadExecutorMap记录当前eventloop作为上下文)

NioEventLoop事件循环核心方法

启动时的doStartThread方法中有一行核心方法,就是NioEventLoop线程启动后要执行的方法(也就是它的run方法)

SingleThreadEventExecutor.this.run();

NioEventLoop.run 处理事件循环的核心方法:

循环处理(for循环) + 阻塞(select) + 处理IO事件(processSelectedKeys) + 处理所有任务(runAllTasks)

大致流程图如下:

NioEventLoop处理事件循环大致流程
NioEventLoop处理事件循环大致流程

再来看下具体代码:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
 @Override
 protected void run() {
     int selectCnt = 0;
     for (;;) {
         try {
             int strategy;
             try {
                 //执行selectNow 判断是否有事件就绪
                 strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
                 switch (strategy) {
                 //..
                 case SelectStrategy.SELECT:
                     //返回-1 无事件就绪 调用select阻塞    
                     long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
                     if (curDeadlineNanos == -1L) {
                         curDeadlineNanos = NONE; // nothing on the calendar
                     }
                     nextWakeupNanos.set(curDeadlineNanos);
                     try {
                         if (!hasTasks()) {
                             //阻塞 直到超时或事件就绪
                             strategy = select(curDeadlineNanos);
                         }
                     } finally {
                         nextWakeupNanos.lazySet(AWAKE);
                     }
                 default:
                 }
             } catch (IOException e) {
                 rebuildSelector0();
                 selectCnt = 0;
                 handleLoopException(e);
                 continue;
             }
             
             selectCnt++;
             cancelledKeys = 0;
             needsToSelectAgain = false;
             // 处理io事件所耗时间 占总任务所耗时间的比例 默认50
             final int ioRatio = this.ioRatio;
             boolean ranTasks;
             if (ioRatio == 100) {
                 try {
                     if (strategy > 0) {
                         //处理IO事件
                         processSelectedKeys();
                     }
                 } finally {
                     //处理所有任务
                     ranTasks = runAllTasks();
                 }
             } else if (strategy > 0) {
                 final long ioStartTime = System.nanoTime();
                 try {
                     processSelectedKeys();
                 } finally {
                     // Ensure we always run tasks.
                     final long ioTime = System.nanoTime() - ioStartTime;
                     ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                 }
             } else {
                 ranTasks = runAllTasks(0); 
             }
 
             //unexpectedSelectorWakeup意外唤醒的情况重建selector
             if (ranTasks || strategy > 0) {
             } else if (unexpectedSelectorWakeup(selectCnt)) { 
                 selectCnt = 0;
             }
             //...
         } catch (CancelledKeyException e) {
             //...
         }
     }
 }

select和selectNow都会返回就绪事件的数量

selectNow会立即返回用于判断当前是否有事件就绪

select会阻塞等待直到超时或有事件就绪

processSelectedKeys方法会遍历SelectedKey处理IO事件,SelectedKey用于判断channel上什么事件就绪便于处理

IO事件指的是channel上会有读、写、连接、接收事件已就绪:

当读事件就绪时可以将数据读到ByteBuf再交给管道中的处理器链调用、当写事件就绪时可以将ByteBuf上的数据写入网卡响应回去、当连接事件就绪时可以进行网络连接...

默认情况下使用优化的selector处理IO事件(优化与未优化的区别就是不同的遍历方式)

优化使用数组processSelectedKeysOptimized,未优化使用Set哈希表processSelectedKeysPlain

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
 private void processSelectedKeysOptimized() {
     for (int i = 0; i < selectedKeys.size; ++i) {
         final SelectionKey k = selectedKeys.keys[i];
         // 方便GC
         selectedKeys.keys[i] = null;
 
         //取出附件(附件可以存储任意对象) ,这里是channel
         final Object a = k.attachment();
 
         if (a instanceof AbstractNioChannel) {
             //处理事件
             processSelectedKey(k, (AbstractNioChannel) a);
         } 
         
         //...
     }
 }

处理IO事件时,根据连接、写、读/接收等事件调用不同的操作,这个过程中会去触发调用管道的处理器链channelHandler(后文再描述)

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
 private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
     final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
     //失效的情况
     if (!k.isValid()) {
         final EventLoop eventLoop;
         try {
             eventLoop = ch.eventLoop();
         } catch (Throwable ignored) {
             return;
         }
         if (eventLoop != this || eventLoop == null) {
             return;
         }
         unsafe.close(unsafe.voidPromise());
         return;
     }
 
     try {
         int readyOps = k.readyOps();
         //连接事件就绪
         if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
             int ops = k.interestOps();
             ops &= ~SelectionKey.OP_CONNECT;
             k.interestOps(ops);
             //处理连接
             unsafe.finishConnect();
         }
 
         //写事件就绪
         if ((readyOps & SelectionKey.OP_WRITE) != 0) {
             //数据刷入网卡写回
             ch.unsafe().forceFlush();
         }
 
         //读事件或接收事件就绪
         if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
             unsafe.read();
         }
     } catch (CancelledKeyException ignored) {
         unsafe.close(unsafe.voidPromise());
     }
 }

执行任务分为三个队列:

  1. scheduledTaskQueue:父类AbstractScheduledEventExecutor中,用于存放定时任务的队列
  2. taskQueue:父类SingleThreadEventExecutor中,用于存放真正要执行任务的队列
  3. tailTasks:父类SingleThreadEventLoop中,存放尾部任务,每次执行完任务后执行

为了平衡IO任务与CPU任务,使用字段ioRatio平衡处理IO事件和执行任务,ioRatio为处理IO事件所占事件循环的百分比,默认50

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
 //IO率 默认50
 final int ioRatio = this.ioRatio;
 if (ioRatio == 100) {
     try {
         processSelectedKeys();
     } finally {
         // Ensure we always run tasks.
         runAllTasks();
     }
 } else {
     //非100的情况下会超时执行任务
     final long ioStartTime = System.nanoTime();
     try {
         processSelectedKeys();
     } finally {
         // Ensure we always run tasks.
         final long ioTime = System.nanoTime() - ioStartTime;
         runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
     }
 }

runAllTasks用于执行队列中的任务,执行流程基本为:

  1. 将定时任务放入taskQueue fetchFromScheduledTaskQueue
  2. 取出taskQueue任务进行执行
  3. 执行尾部任务 afterRunningAllTasks
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
 protected boolean runAllTasks(long timeoutNanos) {
     //从父类的定时任务队列取出定时任务放入当前任务队列
     fetchFromScheduledTaskQueue();
     
     //取出任务
     Runnable task = pollTask();
     if (task == null) {
         //任务为空 执行尾队列中的任务
         afterRunningAllTasks();
         return false;
     }
 
     final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
     long runTasks = 0;
     long lastExecutionTime;
     //循环处理任务,超时或者没任务退出
     for (;;) {
         //执行任务
         safeExecute(task);
         //统计数量
         runTasks ++;
 
         //每64个任务检查一下超时
         if ((runTasks & 0x3F) == 0) {
             lastExecutionTime = ScheduledFutureTask.nanoTime();
             if (lastExecutionTime >= deadline) {
                 break;
             }
         }
         
         //取出任务 便于下次循环 为空就退出
         task = pollTask();
         if (task == null) {
             lastExecutionTime = ScheduledFutureTask.nanoTime();
             break;
         }
     }
 
     //执行尾队列中的任务
     afterRunningAllTasks();
     this.lastExecutionTime = lastExecutionTime;
     return true;
 }

在早期JDK NIO存在空转的bug(8后偶发),事件循环期间会调用unexpectedSelectorWakeup将超过512次自转的Selector进行重建(rebuildSelector)以此来避免空转、CPU飙升

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
 private boolean unexpectedSelectorWakeup(int selectCnt) {
     if (Thread.interrupted()) {
         //..
         return true;
     }
     //SELECTOR_AUTO_REBUILD_THRESHOLD 由 io.netty.selectorAutoRebuildThreshold 决定 默认512
     if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
             selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
         //重新创建selector 替换
         rebuildSelector();
         return true;
     }
     return false;
 }

查看事件循环的核心代码,可以总结出以下流程图:

事件循环核心流程
事件循环核心流程

本篇文章主要描述NioEventLoopGroup以及NioEventLoop组件处理事件循环的核心流程

下篇文章再来看看pipeline中的ChannelHandler是如何进行调用的~

总结

NioEventLoopGroup常分为parent和child,其中parent用于处理连接/接收事件,child用于处理读/写事件

NioEventLoopGroup负责管理多个NioEventLoop,达到瓶颈时水平扩展(增加数量)可提高吞吐量(硬件资源足够的情况)

NioEventLoop是单线程的Selector,负责事件循环的处理

实例化NioEventLoopGroup的过程会创建Executor(用于启动NioEventLoop线程)、NioEventLoop(只是创建,并没有启动线程)、选择器等内部组件

创建NioEventLoop时对原生select进行优化,使用数组存储取代哈希表存储SelectionKey

注册阶段会调用父类SingleThreadEventExecutor.execute方法,使用NioEventLoopGroup的Executor来创建、启动NioEventLoop事件循环线程

NioEventLoop事件循环处理由select/selectNow判断当前是否有事件就绪,selectNow会立即返回就绪事件数量,而select会阻塞到超时或有事件就绪

当事件就绪时,遍历就绪的事件(SelectionKey),根据连接、写、读/接收事件进行处理

最后执行队列任务来组成事件循环的实现,处理IO事件与任务的比例由ioRatio参数决定,避免一方“饥饿”

事件循环期间检测自传次数,超过阈值重建selector,避免出现bug导致空转让CPU飙升

最后(点赞、收藏、关注求求啦~)

😁我是菜菜,热爱技术交流、分享与写作,喜欢图文并茂、通俗易懂的输出知识

📚在我的博客中,你可以找到Java技术栈的各个专栏:Java并发编程与JVM原理、Spring和MyBatis等常用框架及Tomcat服务器的源码解析,以及MySQL、Redis数据库的进阶知识,同时还提供关于消息中间件和Netty等主题的系列文章,都以通俗易懂的方式探讨这些复杂的技术点

🏆除此之外,我还是掘金优秀创作者、腾讯云年度影响力作者、华为云年度十佳博主....

👫我对技术交流、知识分享以及写作充满热情,如果你愿意,欢迎加我一起交流(vx:CaiCaiJava666),也可以持续关注我的公众号:菜菜的后端私房菜,我会分享更多技术干货,期待与更多志同道合的朋友携手并进,一同在这条充满挑战与惊喜的技术之旅中不断前行

🤝如果觉得菜菜写的不错,可以点赞、关注、收藏支持一下~

📖本篇文章被收入专栏 Java常用框架、络基石,感兴趣的朋友可以持续关注~

📝本篇文章、笔记以及案例被收入 Gitee-CaiCaiJava、 Github-CaiCaiJava,除此之外还有更多Java进阶相关知识,感兴趣的朋友可以star持续关注~

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 深入剖析 Netty 中的 NioEventLoopGroup:架构与实现
    • 引言
    • 简介
    • 源码分析
      • MultithreadEventExecutorGroup 实例化
      • NioEventLoop线程启动
      • NioEventLoop事件循环核心方法
    • 总结
      • 最后(点赞、收藏、关注求求啦~)
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档