在高性能网络编程领域,Netty 以其卓越的性能和灵活的架构脱颖而出
Netty 的核心组件之一 NioEventLoopGroup
,在处理网络事件和任务调度中扮演着至关重要的角色
本文将深入剖析 NioEventLoopGroup
的架构设计、初始化流程、事件循环机制以及任务处理逻辑,帮助你全面理解 Netty 的内部实现,提升你的网络编程能力
NioEventLoopGroup是Netty负责事件循环的入口,先来查看下它继承的类与实现的接口:
从图中可以大致得到几个信息:
从NioEventLoopGroup名称可以猜测,它通过组的方式管理内部的多个NioEventLoop来实现事件循环
接下来,再来查看NioEventLoop相关的接口与父类:
从图中可以看出:NioEventLoop是单线程的线程池(父类SingleThreadEventLoop)
从NioEventLoop字段中还可以看到,它有包含NIO的Selector(多路复用模型处理网络IO)
因此NioEventLoop是单线程的Selector,它会循环监听Channel上的事件并进行处理,以此实现事件循环
而NioEventLoopGroup通过管理多个NioEventLoop的方式来进行水平扩展,充分解耦
使用Netty时通常会采用两个NioEventLoopGroup分别来处理连接、读/写事件:
其中处理连接事件的NioEventLoopGroup称为Parent(Boss),处理读写事件的NioEventLoopGroup成为child(work)
它们各司其职,充分解耦,当一方成为系统瓶颈时,只需要增大组内EventLoop数量即可(硬件资源足够的情况)
了解组件负责的功能后,我们马上开始查看源码进行分析其内部实现,并总结出其实现的原理以及思路
要想了解其内部实现,还是要从初始化的源码开始分析
先来一张NioEventLoopGroup实例化的大致流程图,便于结合代码分析:
我们可以指定组内NioEventLoop的数量来初始化NioEventLoopGroup
NioEventLoopGroup parentGroup = new NioEventLoopGroup(1);
NioEventLoopGroup childGroup = new NioEventLoopGroup(10);
初始化NioEventLoopGroup,核心实现在其父类MultithreadEventExecutorGroup中(大致流程如下):
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}
//group的线程池,用于创建、启动event loop的单线程
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
//group的children就是要管理的event loop
//初始化要管理的event loop
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
//创建event loop
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
if (!success) {
//创建失败停止、关闭所有event loop
for (int j = 0; j < i; j ++) {
children[j].shutdownGracefully();
}
for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
// Let the caller handle the interruption.
Thread.currentThread().interrupt();
break;
}
}
}
}
}
//创建选择器,用于选择下一个事件哪个event loop来执行
chooser = chooserFactory.newChooser(children);
//添加终止回调
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};
for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}
Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}
总的来说实例化Group就是:根据元数据创建Group内部组件:Executor、NioEventLoop、EventExecutorChooser
如果实例化时不传入Executor,默认情况下会使用ThreadPerTaskExecutor,它会使用工厂创建线程并执行
(这里的线程就是单线程EventLoop)
public final class ThreadPerTaskExecutor implements Executor {
private final ThreadFactory threadFactory;
public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
this.threadFactory = ObjectUtil.checkNotNull(threadFactory, "threadFactory");
}
@Override
public void execute(Runnable command) {
threadFactory.newThread(command).start();
}
}
遍历children数组调用newChild根据元数据创建NioEventLoop
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
SelectorProvider selectorProvider = (SelectorProvider) args[0];
SelectStrategyFactory selectStrategyFactory = (SelectStrategyFactory) args[1];
RejectedExecutionHandler rejectedExecutionHandler = (RejectedExecutionHandler) args[2];
EventLoopTaskQueueFactory taskQueueFactory = null;
EventLoopTaskQueueFactory tailTaskQueueFactory = null;
int argsLength = args.length;
if (argsLength > 3) {
taskQueueFactory = (EventLoopTaskQueueFactory) args[3];
}
if (argsLength > 4) {
tailTaskQueueFactory = (EventLoopTaskQueueFactory) args[4];
}
return new NioEventLoop(this, executor, selectorProvider,
selectStrategyFactory.newSelectStrategy(),
rejectedExecutionHandler, taskQueueFactory, tailTaskQueueFactory);
}
NioEventLoop中会存在两个selector:
SelectionKey用于判断channel上发生什么事件,事件循环过程中经常需要遍历它来进行处理,因此使用数组代替哈希表进行优化
默认情况下采用优化后的selector
原生的SelectionKey由HashSet实现,优化后使用数组SelectedSelectionKeySetSelector
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
EventLoopTaskQueueFactory queueFactory) {
super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),
rejectedExecutionHandler);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
if (strategy == null) {
throw new NullPointerException("selectStrategy");
}
provider = selectorProvider;
//封装未优化、优化的Selector
final SelectorTuple selectorTuple = openSelector();
selector = selectorTuple.selector;
unwrappedSelector = selectorTuple.unwrappedSelector;
selectStrategy = strategy;
}
在实例化过程中并不会创建NioEventLoop处理事件循环的单线程,而是采用懒加载,执行execute才会由Group的executor来启动
分析完大致的实例化流程再来查看这张核心步骤的流程图回味一下:
注册时会启动事件循环线程,当任务交给NioEventLoop执行时,会调用父类SingleThreadEventExecutor的execute
SingleThreadEventExecutor.execute 中的startThread方法会使用Group的executor创建、启动处理事件循环的线程(未启动的情况下)
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
//当前线程是否为事件循环线程
boolean inEventLoop = inEventLoop();
//添加任务到队列
addTask(task);
if (!inEventLoop) {
//启动线程
startThread();
if (isShutdown()) {
boolean reject = false;
try {
if (removeTask(task)) {
reject = true;
}
} catch (UnsupportedOperationException e) {
}
if (reject) {
reject();
}
}
}
//唤醒
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}
startThread 创建、启动EventLoop线程流程比较绕,需要debug,看代码不容易理解(看下图)
最终会调用Group的Executor的execute方法创建、启动线程
(期间ThreadExecutorMap记录当前eventloop作为上下文)
启动时的doStartThread方法中有一行核心方法,就是NioEventLoop线程启动后要执行的方法(也就是它的run方法)
SingleThreadEventExecutor.this.run();
NioEventLoop.run 处理事件循环的核心方法:
循环处理(for循环) + 阻塞(select) + 处理IO事件(processSelectedKeys) + 处理所有任务(runAllTasks)
大致流程图如下:
再来看下具体代码:
@Override
protected void run() {
int selectCnt = 0;
for (;;) {
try {
int strategy;
try {
//执行selectNow 判断是否有事件就绪
strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
switch (strategy) {
//..
case SelectStrategy.SELECT:
//返回-1 无事件就绪 调用select阻塞
long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
if (curDeadlineNanos == -1L) {
curDeadlineNanos = NONE; // nothing on the calendar
}
nextWakeupNanos.set(curDeadlineNanos);
try {
if (!hasTasks()) {
//阻塞 直到超时或事件就绪
strategy = select(curDeadlineNanos);
}
} finally {
nextWakeupNanos.lazySet(AWAKE);
}
default:
}
} catch (IOException e) {
rebuildSelector0();
selectCnt = 0;
handleLoopException(e);
continue;
}
selectCnt++;
cancelledKeys = 0;
needsToSelectAgain = false;
// 处理io事件所耗时间 占总任务所耗时间的比例 默认50
final int ioRatio = this.ioRatio;
boolean ranTasks;
if (ioRatio == 100) {
try {
if (strategy > 0) {
//处理IO事件
processSelectedKeys();
}
} finally {
//处理所有任务
ranTasks = runAllTasks();
}
} else if (strategy > 0) {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
} else {
ranTasks = runAllTasks(0);
}
//unexpectedSelectorWakeup意外唤醒的情况重建selector
if (ranTasks || strategy > 0) {
} else if (unexpectedSelectorWakeup(selectCnt)) {
selectCnt = 0;
}
//...
} catch (CancelledKeyException e) {
//...
}
}
}
select和selectNow都会返回就绪事件的数量
selectNow会立即返回用于判断当前是否有事件就绪
select会阻塞等待直到超时或有事件就绪
processSelectedKeys方法会遍历SelectedKey处理IO事件,SelectedKey用于判断channel上什么事件就绪便于处理
IO事件指的是channel上会有读、写、连接、接收事件已就绪:
当读事件就绪时可以将数据读到ByteBuf再交给管道中的处理器链调用、当写事件就绪时可以将ByteBuf上的数据写入网卡响应回去、当连接事件就绪时可以进行网络连接...
默认情况下使用优化的selector处理IO事件(优化与未优化的区别就是不同的遍历方式)
优化使用数组processSelectedKeysOptimized,未优化使用Set哈希表processSelectedKeysPlain
private void processSelectedKeysOptimized() {
for (int i = 0; i < selectedKeys.size; ++i) {
final SelectionKey k = selectedKeys.keys[i];
// 方便GC
selectedKeys.keys[i] = null;
//取出附件(附件可以存储任意对象) ,这里是channel
final Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
//处理事件
processSelectedKey(k, (AbstractNioChannel) a);
}
//...
}
}
处理IO事件时,根据连接、写、读/接收等事件调用不同的操作,这个过程中会去触发调用管道的处理器链channelHandler(后文再描述)
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
//失效的情况
if (!k.isValid()) {
final EventLoop eventLoop;
try {
eventLoop = ch.eventLoop();
} catch (Throwable ignored) {
return;
}
if (eventLoop != this || eventLoop == null) {
return;
}
unsafe.close(unsafe.voidPromise());
return;
}
try {
int readyOps = k.readyOps();
//连接事件就绪
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
//处理连接
unsafe.finishConnect();
}
//写事件就绪
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
//数据刷入网卡写回
ch.unsafe().forceFlush();
}
//读事件或接收事件就绪
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
执行任务分为三个队列:
为了平衡IO任务与CPU任务,使用字段ioRatio平衡处理IO事件和执行任务,ioRatio为处理IO事件所占事件循环的百分比,默认50
//IO率 默认50
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
runAllTasks();
}
} else {
//非100的情况下会超时执行任务
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
runAllTasks用于执行队列中的任务,执行流程基本为:
protected boolean runAllTasks(long timeoutNanos) {
//从父类的定时任务队列取出定时任务放入当前任务队列
fetchFromScheduledTaskQueue();
//取出任务
Runnable task = pollTask();
if (task == null) {
//任务为空 执行尾队列中的任务
afterRunningAllTasks();
return false;
}
final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
long runTasks = 0;
long lastExecutionTime;
//循环处理任务,超时或者没任务退出
for (;;) {
//执行任务
safeExecute(task);
//统计数量
runTasks ++;
//每64个任务检查一下超时
if ((runTasks & 0x3F) == 0) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
if (lastExecutionTime >= deadline) {
break;
}
}
//取出任务 便于下次循环 为空就退出
task = pollTask();
if (task == null) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
break;
}
}
//执行尾队列中的任务
afterRunningAllTasks();
this.lastExecutionTime = lastExecutionTime;
return true;
}
在早期JDK NIO存在空转的bug(8后偶发),事件循环期间会调用unexpectedSelectorWakeup将超过512次自转的Selector进行重建(rebuildSelector)以此来避免空转、CPU飙升
private boolean unexpectedSelectorWakeup(int selectCnt) {
if (Thread.interrupted()) {
//..
return true;
}
//SELECTOR_AUTO_REBUILD_THRESHOLD 由 io.netty.selectorAutoRebuildThreshold 决定 默认512
if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
//重新创建selector 替换
rebuildSelector();
return true;
}
return false;
}
查看事件循环的核心代码,可以总结出以下流程图:
本篇文章主要描述NioEventLoopGroup以及NioEventLoop组件处理事件循环的核心流程
下篇文章再来看看pipeline中的ChannelHandler是如何进行调用的~
NioEventLoopGroup常分为parent和child,其中parent用于处理连接/接收事件,child用于处理读/写事件
NioEventLoopGroup负责管理多个NioEventLoop,达到瓶颈时水平扩展(增加数量)可提高吞吐量(硬件资源足够的情况)
NioEventLoop是单线程的Selector,负责事件循环的处理
实例化NioEventLoopGroup的过程会创建Executor(用于启动NioEventLoop线程)、NioEventLoop(只是创建,并没有启动线程)、选择器等内部组件
创建NioEventLoop时对原生select进行优化,使用数组存储取代哈希表存储SelectionKey
注册阶段会调用父类SingleThreadEventExecutor.execute方法,使用NioEventLoopGroup的Executor来创建、启动NioEventLoop事件循环线程
NioEventLoop事件循环处理由select/selectNow判断当前是否有事件就绪,selectNow会立即返回就绪事件数量,而select会阻塞到超时或有事件就绪
当事件就绪时,遍历就绪的事件(SelectionKey),根据连接、写、读/接收事件进行处理
最后执行队列任务来组成事件循环的实现,处理IO事件与任务的比例由ioRatio参数决定,避免一方“饥饿”
事件循环期间检测自传次数,超过阈值重建selector,避免出现bug导致空转让CPU飙升
😁我是菜菜,热爱技术交流、分享与写作,喜欢图文并茂、通俗易懂的输出知识
📚在我的博客中,你可以找到Java技术栈的各个专栏:Java并发编程与JVM原理、Spring和MyBatis等常用框架及Tomcat服务器的源码解析,以及MySQL、Redis数据库的进阶知识,同时还提供关于消息中间件和Netty等主题的系列文章,都以通俗易懂的方式探讨这些复杂的技术点
🏆除此之外,我还是掘金优秀创作者、腾讯云年度影响力作者、华为云年度十佳博主....
👫我对技术交流、知识分享以及写作充满热情,如果你愿意,欢迎加我一起交流(vx:CaiCaiJava666),也可以持续关注我的公众号:菜菜的后端私房菜,我会分享更多技术干货,期待与更多志同道合的朋友携手并进,一同在这条充满挑战与惊喜的技术之旅中不断前行
🤝如果觉得菜菜写的不错,可以点赞、关注、收藏支持一下~
📖本篇文章被收入专栏 Java常用框架、网络基石,感兴趣的朋友可以持续关注~
📝本篇文章、笔记以及案例被收入 Gitee-CaiCaiJava、 Github-CaiCaiJava,除此之外还有更多Java进阶相关知识,感兴趣的朋友可以star持续关注~
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
扫码关注腾讯云开发者
领取腾讯云代金券
Copyright © 2013 - 2025 Tencent Cloud. All Rights Reserved. 腾讯云 版权所有
深圳市腾讯计算机系统有限公司 ICP备案/许可证号:粤B2-20090059 深公网安备号 44030502008569
腾讯云计算(北京)有限责任公司 京ICP证150476号 | 京ICP备11018762号 | 京公网安备号11010802020287
Copyright © 2013 - 2025 Tencent Cloud.
All Rights Reserved. 腾讯云 版权所有