本系列 Netty 源码解析文章基于 4.1.56.Final 版本
对于一个高性能网络通讯框架来说,最最重要也是最核心的工作就是如何高效的接收客户端连接,这就好比我们开了一个饭店,那么迎接客人就是饭店最重要的工作,我们要先把客人迎接进来,不能让客人一看人多就走掉,只要客人进来了,哪怕菜做的慢一点也没关系。
本文笔者就来为大家介绍下 netty 这块最核心的内容,看看 netty 是如何高效的接收客户端连接的。
下图为笔者在一个月黑风高天空显得那么深邃遥远的夜晚,闲来无事,于是捧起 Netty 关于如何接收连接这部分源码细细品读的时候,意外的发现了一个影响 Netty 接收连接吞吐的一个 Bug。
于是笔者就在 Github 提了一个Issue#11708,阐述了下这个 Bug 产生的原因以及导致的结果并和 Netty 的作者一起讨论了下修复措施。如上图所示。
Issue#11708:https://github.com/netty/netty/issues/11708
这里先不详细解释这个 Issue,也不建议大家现在就打开这个 Issue 查看,笔者会在本文的介绍中随着源码深入的解读慢慢的为大家一层一层地拨开迷雾。
之所以在文章的开头把这个拎出来,笔者是想让大家带着怀疑,审视,欣赏,崇敬,敬畏的态度来一起品读世界顶级程序员编写的代码。由衷的感谢他们在这一领域做出的贡献。
好了,问题抛出来后,我们就带着这个疑问来开始本文的内容吧~~~
按照老规矩,再开始本文的内容之前,我们先来回顾下前边几篇文章的概要内容帮助大家梳理一个框架全貌出来。
笔者这里再次想和读者朋友们强调的是本文可以独立观看,并不依赖前边系列文章的内容,只是大家如果对相关细节部分感兴趣的话,可以在阅读完本文之后在去回看相关文章。
在前边的系列文章中,笔者为大家介绍了驱动 Netty 整个框架运转的核心引擎 Reactor 的创建,启动,运行的全流程。从现在开始 Netty 的整个核心框架就开始运转起来开始工作了,本文要介绍的主要内容就是 Netty 在启动之后要做的第一件事件:监听端口地址,高效接收客户端连接。
在《聊聊Netty那些事儿之从内核角度看IO模型》一文中,我们是从整个网络框架的基石 IO 模型的角度整体阐述了下 Netty 的 IO 线程模型。
而 Netty 中的 Reactor 正是 IO 线程在 Netty 中的模型定义。Reactor 在 Netty 中是以 Group 的形式出现的,分为:
EventLoopGroup bossGroup
,main reactor group 中的 reactor 主要负责监听客户端连接事件,高效的处理客户端连接。也是本文我们要介绍的重点。EventLoopGroup workerGroup
,sub reactor group 中的 reactor 主要负责处理客户端连接上的 IO 事件,以及异步任务的执行。最后我们得出 Netty 的整个 IO 模型如下:
本文我们讨论的重点就是 MainReactorGroup 的核心工作上图中所示的步骤 1,步骤 2,步骤 3。
在从整体上介绍完 Netty 的 IO 模型之后,我们又在《Reactor在Netty中的实现(创建篇)》中完整的介绍了 Netty 框架的骨架主从 Reactor 组的搭建过程,阐述了 Reactor 是如何被创建出来的,并介绍了它的核心组件如下图所示:
thread
即为 Reactor 中的 IO 线程,主要负责监听 IO 事件,处理 IO 任务,执行异步任务。selector
则是 JDK NIO 对操作系统底层 IO 多路复用技术实现的封装。用于监听 IO 就绪事件。taskQueue
用于保存 Reactor 需要执行的异步任务,这些异步任务可以由用户在业务线程中向 Reactor 提交,也可以是 Netty 框架提交的一些自身核心的任务。scheduledTaskQueue
则是保存 Reactor 中执行的定时任务。代替了原有的时间轮来执行延时任务。tailQueue
保存了在 Reactor 需要执行的一些尾部收尾任务,在普通任务执行完后 Reactor 线程会执行尾部任务,比如对 Netty 的运行状态做一些统计数据,例如任务循环的耗时、占用物理内存的大小等等在骨架搭建完毕之后,我们随后又在在《详细图解Netty Reactor启动全流程》》一文中介绍了本文的主角服务端 NioServerSocketChannel 的创建,初始化,绑定端口地址,向 main reactor 注册监听OP_ACCEPT事件
的完整过程。
main reactor 如何处理 OP_ACCEPT 事件将会是本文的主要内容。
自此 Netty 框架的 main reactor group 已经启动完毕,开始准备监听 OP_accept 事件,当客户端连接上来之后,OP_ACCEPT 事件活跃,main reactor 开始处理 OP_ACCEPT 事件接收客户端连接了。
而 netty 中的 IO 事件分为:OP_ACCEPT 事件,OP_READ 事件,OP_WRITE 事件和 OP_CONNECT 事件,netty 对于 IO 事件的监听和处理统一封装在 Reactor 模型中,这四个 IO 事件的处理过程也是我们后续文章中要单独拿出来介绍的,本文我们聚焦 OP_ACCEPT 事件的处理。
而为了让大家能够对 IO 事件的处理有一个完整性的认识,笔者写了《一文聊透Netty核心引擎Reactor的运转架构》这篇文章,在文章中详细介绍了 Reactor 线程的整体运行框架。
Reactor 线程会在一个死循环中 996 不停的运转,在循环中会不断的轮询监听 Selector 上的 IO 事件,当 IO 事件活跃后,Reactor 从 Selector 上被唤醒转去执行 IO 就绪事件的处理,在这个过程中我们引出了上述四种 IO 事件的处理入口函数。
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
//获取Channel的底层操作类Unsafe
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) {
......如果SelectionKey已经失效则关闭对应的Channel......
}
try {
//获取IO就绪事件
int readyOps = k.readyOps();
//处理Connect事件
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
int ops = k.interestOps();
//移除对Connect事件的监听,否则Selector会一直通知
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
//触发channelActive事件处理Connect事件
unsafe.finishConnect();
}
//处理Write事件
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
ch.unsafe().forceFlush();
}
//处理Read事件或者Accept事件
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
本文笔者将会为大家重点介绍OP_ACCEPT事件
的处理入口函数unsafe.read()
的整个源码实现。
当客户端连接完成三次握手之后,main reactor 中的 selector 产生OP_ACCEPT事件
活跃,main reactor 随即被唤醒,来到了OP_ACCEPT事件
的处理入口函数开始接收客户端连接。
当Main Reactor
轮询到NioServerSocketChannel
上的OP_ACCEPT事件
就绪时,Main Reactor 线程就会从JDK Selector
上的阻塞轮询 APIselector.select(timeoutMillis)
调用中返回。转而去处理NioServerSocketChannel
上的OP_ACCEPT事件
。
public final class NioEventLoop extends SingleThreadEventLoop {
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
..............省略.................
try {
int readyOps = k.readyOps();
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
..............处理OP_CONNECT事件.................
}
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
..............处理OP_WRITE事件.................
}
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
//本文重点处理OP_ACCEPT事件
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
}
processSelectedKey
中的参数AbstractNioChannel ch
正是 Netty 服务端NioServerSocketChannel
。因为此时的执行线程为 main reactor 线程,而 main reactor 上注册的正是 netty 服务端 NioServerSocketChannel 负责监听端口地址,接收客户端连接。ch.unsafe()
获取到的 NioUnsafe 操作类正是 NioServerSocketChannel 中对底层 JDK NIO ServerSocketChannel 的 Unsafe 底层操作类。
Unsafe接口
是 Netty 对 Channel 底层操作行为的封装,比如 NioServerSocketChannel 的底层 Unsafe 操作类干的事情就是绑定端口地址
,处理OP_ACCEPT事件
。
这里我们看到,Netty 将OP_ACCEPT事件
处理的入口函数封装在NioServerSocketChannel
里的底层操作类 Unsafe 的read
方法中。
而 NioServerSocketChannel 中的 Unsafe 操作类实现类型为NioMessageUnsafe
定义在上图继承结构中的AbstractNioMessageChannel父类中
。
下面我们到NioMessageUnsafe#read
方法中来看下 Netty 对OP_ACCPET事件
的具体处理过程:
我们还是按照老规矩,先从整体上把整个 OP_ACCEPT 事件的逻辑处理框架提取出来,让大家先总体俯视下流程全貌,然后在针对每个核心点位进行各个击破。
main reactor 线程是在一个do...while{...}
循环 read loop 中不断的调用 JDK NIO serverSocketChannel.accept()
方法来接收完成三次握手的客户端连接NioSocketChannel
的,并将接收到的客户端连接 NioSocketChannel 临时保存在List<Object> readBuf
集合中,后续会服务端 NioServerSocketChannel 的 pipeline 中通过 ChannelRead 事件来传递,最终会在 ServerBootstrapAcceptor 这个 ChannelHandler 中被处理初始化,并将其注册到 Sub Reator Group 中。
这里的 read loop 循环会被限定只能读取 16 次,当 main reactor 从 NioServerSocketChannel 中读取客户端连接 NioSocketChannel 的次数达到 16 次之后,无论此时是否还有客户端连接都不能在继续读取了。
因为我们在《一文聊透Netty核心引擎Reactor的运转架构》一文中提到,netty 对 reactor 线程压榨的比较狠,要干的事情很多,除了要监听轮询 IO 就绪事件,处理 IO 就绪事件,还需要执行用户和 netty 框架本省提交的异步任务和定时任务。
所以这里的 main reactor 线程不能在 read loop 中无限制的执行下去,因为还需要分配时间去执行异步任务,不能因为无限制的接收客户端连接而耽误了异步任务的执行。所以这里将 read loop 的循环次数限定为 16 次。
如果 main reactor 线程在 read loop 中读取客户端连接 NioSocketChannel 的次数已经满了 16 次,即使此时还有客户端连接未接收,那么 main reactor 线程也不会再去接收了,而是转去执行异步任务,当异步任务执行完毕后,还会在回来执行剩余接收连接的任务。
main reactor 线程退出 read loop 循环的条件有两个:
以上就是 Netty 在接收客户端连接时的整体核心逻辑,下面笔者将这部分逻辑的核心源码实现框架提取出来,方便大家根据上述核心逻辑与源码中的处理模块对应起来,还是那句话,这里只需要总体把握核心处理流程,不需要读懂每一行代码,笔者会在文章的后边分模块来各个击破它们。
public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
private final class NioMessageUnsafe extends AbstractNioUnsafe {
//存放连接建立后,创建的客户端SocketChannel
private final List<Object> readBuf = new ArrayList<Object>();
@Override
public void read() {
//必须在Main Reactor线程中执行
assert eventLoop().inEventLoop();
//注意下面的config和pipeline都是服务端ServerSocketChannel中的
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
//创建接收数据Buffer分配器(用于分配容量大小合适的byteBuffer用来容纳接收数据)
//在接收连接的场景中,这里的allocHandle只是用于控制read loop的循环读取创建连接的次数。
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.reset(config);
boolean closed = false;
Throwable exception = null;
try {
try {
do {
//底层调用NioServerSocketChannel->doReadMessages 创建客户端SocketChannel
int localRead = doReadMessages(readBuf);
//已无新的连接可接收则退出read loop
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
//统计在当前事件循环中已经读取到得Message数量(创建连接的个数)
allocHandle.incMessagesRead(localRead);
} while (allocHandle.continueReading());//判断是否已经读满16次
} catch (Throwable t) {
exception = t;
}
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
//在NioServerSocketChannel对应的pipeline中传播ChannelRead事件
//初始化客户端SocketChannel,并将其绑定到Sub Reactor线程组中的一个Reactor上
pipeline.fireChannelRead(readBuf.get(i));
}
//清除本次accept 创建的客户端SocketChannel集合
readBuf.clear();
allocHandle.readComplete();
//触发readComplete事件传播
pipeline.fireChannelReadComplete();
....................省略............
} finally {
....................省略............
}
}
}
}
}
这里首先要通过断言 assert eventLoop().inEventLoop()
确保处理接收客户端连接的线程必须为 Main Reactor 线程。
而 main reactor 中主要注册的是服务端 NioServerSocketChannel,主要负责处理OP_ACCEPT事件
,所以当前 main reactor 线程是在 NioServerSocketChannel 中执行接收连接的工作。
所以这里我们通过config()
获取到的是 NioServerSocketChannel 的属性配置类NioServerSocketChannelConfig
,它是在 Reactor 的启动阶段被创建出来的。
public NioServerSocketChannel(ServerSocketChannel channel) {
//父类AbstractNioChannel中保存JDK NIO原生ServerSocketChannel以及要监听的事件OP_ACCEPT
super(null, channel, SelectionKey.OP_ACCEPT);
//DefaultChannelConfig中设置用于Channel接收数据用的buffer->AdaptiveRecvByteBufAllocator
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
同理这里通过pipeline()
获取到的也是 NioServerSocketChannel 中的pipeline
。它会在 NioServerSocketChannel 向 main reactor 注册成功之后被初始化。
前边提到 main reactor 线程会被限定只能在 read loop 中向 NioServerSocketChannel 读取 16 次客户端连接,所以在开始 read loop 之前,我们需要创建一个能够保存记录读取次数的对象,在每次 read loop 循环之后,可以根据这个对象来判断是否结束 read loop。
这个对象就是这里的 RecvByteBufAllocator.Handle allocHandle
专门用于统计 read loop 中接收客户端连接的次数,以及判断是否该结束 read loop 转去执行异步任务。
当这一切准备就绪之后,main reactor 线程就开始在do{....}while(...)
循环中接收客户端连接了。
在 read loop 中通过调用doReadMessages函数
接收完成三次握手的客户端连接,底层会调用到 JDK NIO ServerSocketChannel 的 accept 方法,从内核全连接队列中取出客户端连接。
返回值localRead
表示接收到了多少客户端连接,客户端连接通过 accept 方法只会一个一个的接收,所以这里的localRead
正常情况下都会返回1
,当localRead <= 0
时意味着已经没有新的客户端连接可以接收了,本次 main reactor 接收客户端的任务到这里就结束了,跳出 read loop。开始新的一轮 IO 事件的监听处理。
public static SocketChannel accept(final ServerSocketChannel serverSocketChannel) throws IOException {
try {
return AccessController.doPrivileged(new PrivilegedExceptionAction<SocketChannel>() {
@Override
public SocketChannel run() throws IOException {
return serverSocketChannel.accept();
}
});
} catch (PrivilegedActionException e) {
throw (IOException) e.getCause();
}
}
随后会将接收到的客户端连接占时存放到List<Object> readBuf
集合中。
private final class NioMessageUnsafe extends AbstractNioUnsafe {
//存放连接建立后,创建的客户端SocketChannel
private final List<Object> readBuf = new ArrayList<Object>();
}
调用allocHandle.incMessagesRead
统计本次事件循环中接收到的客户端连接个数,最后在 read loop 末尾通过allocHandle.continueReading
判断是否达到了限定的 16 次。从而决定 main reactor 线程是继续接收客户端连接还是转去执行异步任务。
main reactor 线程退出 read loop 的两个条件:
当满足以上两个退出条件时,main reactor 线程就会退出 read loop,由于在 read loop 中接收到的客户端连接全部暂存在List<Object> readBuf
集合中,随后开始遍历 readBuf,在 NioServerSocketChannel 的 pipeline 中传播 ChannelRead 事件。
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
//NioServerSocketChannel对应的pipeline中传播read事件
//io.netty.bootstrap.ServerBootstrap.ServerBootstrapAcceptor.channelRead
//初始化客户端SocketChannel,并将其绑定到Sub Reactor线程组中的一个Reactor上
pipeline.fireChannelRead(readBuf.get(i));
}
最终 pipeline 中的 ChannelHandler(ServerBootstrapAcceptor)会响应 ChannelRead 事件,并在相应回调函数中初始化客户端 NioSocketChannel,并将其注册到 Sub Reactor Group 中。此后客户端 NioSocketChannel 绑定到的 sub reactor 就开始监听处理客户端连接上的读写事件了。
Netty 整个接收客户端的逻辑过程如下图步骤 1,2,3 所示。
以上内容就是笔者提取出来的整体流程框架,下面我们来将其中涉及到的重要核心模块拆开,一个一个详细解读下。
Reactor 在处理对应 Channel 上的 IO 数据时,都会采用一个ByteBuffer
来接收 Channel 上的 IO 数据。而本小节要介绍的 RecvByteBufAllocator 正是用来分配 ByteBuffer 的一个分配器。
还记得这个RecvByteBufAllocator
在哪里被创建的吗??
在《聊聊Netty那些事儿之Reactor在Netty中的实现(创建篇)》一文中,在介绍NioServerSocketChannel
的创建过程中提到,对应 Channel 的配置类 NioServerSocketChannelConfig 也会随着 NioServerSocketChannel 的创建而创建。
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
在创建NioServerSocketChannelConfig
的过程中会创建RecvByteBufAllocator
。
public DefaultChannelConfig(Channel channel) {
this(channel, new AdaptiveRecvByteBufAllocator());
}
这里我们看到 NioServerSocketChannel 中的 RecvByteBufAllocator 实际类型为AdaptiveRecvByteBufAllocator
,顾名思义,这个类型的 RecvByteBufAllocator 可以根据 Channel 上每次到来的 IO 数据大小来自适应动态调整 ByteBuffer 的容量。
对于服务端 NioServerSocketChannel 来说,它上边的 IO 数据就是客户端的连接,它的长度和类型都是固定的,所以在接收客户端连接的时候并不需要这样的一个 ByteBuffer 来接收,我们会将接收到的客户端连接存放在List<Object> readBuf
集合中
对于客户端 NioSocketChannel 来说,它上边的 IO 数据时客户端发送来的网络数据,长度是不定的,所以才会需要这样一个可以根据每次 IO 数据的大小来自适应动态调整容量的 ByteBuffer 来接收。
那么看起来这个 RecvByteBufAllocator 和本文的主题不是很关联,因为在接收连接的过程中并不会怎么用到它,这个类笔者还会在后面的文章中详细介绍,之所以这里把它拎出来单独介绍是因为它和本文开头提到的 Bug 有关系,这个 Bug 就是由这个类引起的。
在本文中,我们是通过 NioServerSocketChannel 中的 unsafe 底层操作类来获取 RecvByteBufAllocator.Handle 的
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
protected abstract class AbstractUnsafe implements Unsafe {
@Override
public RecvByteBufAllocator.Handle recvBufAllocHandle() {
if (recvHandle == null) {
recvHandle = config().getRecvByteBufAllocator().newHandle();
}
return recvHandle;
}
}
我们看到最终会在 NioServerSocketChannel 的配置类 NioServerSocketChannelConfig 中获取到AdaptiveRecvByteBufAllocator
public class DefaultChannelConfig implements ChannelConfig {
//用于Channel接收数据用的buffer分配器 类型为AdaptiveRecvByteBufAllocator
private volatile RecvByteBufAllocator rcvBufAllocator;
}
AdaptiveRecvByteBufAllocator
中会创建自适应动态调整容量的 ByteBuffer 分配器。
public class AdaptiveRecvByteBufAllocator extends DefaultMaxMessagesRecvByteBufAllocator {
@Override
public Handle newHandle() {
return new HandleImpl(minIndex, maxIndex, initial);
}
private final class HandleImpl extends MaxMessageHandle {
.................省略................
}
}
这里的newHandle
方法返回的具体类型为MaxMessageHandle
,这个MaxMessageHandle
里边保存了每次从Channel
中读取IO数据
的容量指标,方便下次读取时分配合适大小的buffer
。
每次在使用allocHandle
前需要调用allocHandle.reset(config);
重置里边的统计指标。
public abstract class MaxMessageHandle implements ExtendedHandle {
private ChannelConfig config;
//每次事件轮询时,最多读取16次
private int maxMessagePerRead;
//本次事件轮询总共读取的message数,这里指的是接收连接的数量
private int totalMessages;
//本次事件轮询总共读取的字节数
private int totalBytesRead;
@Override
public void reset(ChannelConfig config) {
this.config = config;
//默认每次最多读取16次
maxMessagePerRead = maxMessagesPerRead();
totalMessages = totalBytesRead = 0;
}
}
ServerBootstrap
中通过ChannelOption.MAX_MESSAGES_PER_READ
选项设置。ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.MAX_MESSAGES_PER_READ, 自定义次数)
allocHandle.incMessagesRead
增加记录接收到的连接个数。 @Override
public final void incMessagesRead(int amt) {
totalMessages += amt;
}
@Override
public void lastBytesRead(int bytes) {
lastBytesRead = bytes;
if (bytes > 0) {
totalBytesRead += bytes;
}
}
MaxMessageHandler 中还有一个非常重要的方法就是在每次 read loop 末尾会调用allocHandle.continueReading()
方法来判断读取连接次数是否已满 16 次,来决定 main reactor 线程是否退出循环。
do {
//底层调用NioServerSocketChannel->doReadMessages 创建客户端SocketChannel
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
//统计在当前事件循环中已经读取到得Message数量(创建连接的个数)
allocHandle.incMessagesRead(localRead);
} while (allocHandle.continueReading());
红框中圈出来的两个判断条件和本文主题无关,我们这里不需要关注,笔者会在后面的文章详细介绍。
totalMessages < maxMessagePerRead
:在本文的接收客户端连接场景中,这个条件用于判断 main reactor 线程在 read loop 中的读取次数是否超过了 16 次。如果超过 16 次就会返回 false,main reactor 线程退出循环。totalBytesRead > 0
:用于判断当客户端 NioSocketChannel 上的 OP_READ 事件活跃时,sub reactor 线程在 read loop 中是否读取到了网络数据。以上内容就是 RecvByteBufAllocator.Handle 在接收客户端连接场景下的作用,大家这里仔细看下这个allocHandle.continueReading()
方法退出循环的判断条件,再结合整个do{....}while(...)
接收连接循环体,感受下是否哪里有些不对劲?Bug 即将出现~~~
netty 不论是在本文中处理接收客户端连接的场景还是在处理接收客户端连接上的网络数据场景都会在一个do{....}while(...)
循环 read loop 中不断的处理。
同时也都会利用在上一小节中介绍的RecvByteBufAllocator.Handle
来记录每次 read loop 接收到的连接个数和从连接上读取到的网络数据大小。
从而在 read loop 的末尾都会通过allocHandle.continueReading()
方法判断是否应该退出 read loop 循环结束连接的接收流程或者是结束连接上数据的读取流程。
无论是用于接收客户端连接的 main reactor 也好还是用于接收客户端连接上的网络数据的 sub reactor 也好,它们的运行框架都是一样的,只不过是具体分工不同。
所以 netty 这里想用统一的RecvByteBufAllocator.Handle
来处理以上两种场景。
而RecvByteBufAllocator.Handle
中的totalBytesRead
字段主要记录 sub reactor 线程在处理客户端 NioSocketChannel 中 OP_READ 事件活跃时,总共在 read loop 中读取到的网络数据,而这里是 main reactor 线程在接收客户端连接所以这个字段并不会被设置。totalBytesRead 字段的值在本文中永远会是0
。
所以无论同时有多少个客户端并发连接到服务端上,在接收连接的这个 read loop 中永远只会接受一个连接就会退出循环,因为allocHandle.continueReading()方法
中的判断条件totalBytesRead > 0
永远会返回false
。
do {
//底层调用NioServerSocketChannel->doReadMessages 创建客户端SocketChannel
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
//统计在当前事件循环中已经读取到得Message数量(创建连接的个数)
allocHandle.incMessagesRead(localRead);
} while (allocHandle.continueReading());
而 netty 的本意是在这个 read loop 循环中尽可能多的去接收客户端的并发连接,同时又不影响 main reactor 线程执行异步任务。但是由于这个 Bug,main reactor 在这个循环中只执行一次就结束了。这也一定程度上就影响了 netty 的吞吐。
让我们想象下这样的一个场景,当有 16 个客户端同时并发连接到了服务端,这时 NioServerSocketChannel 上的OP_ACCEPT事件
活跃,main reactor 从 Selector 上被唤醒,随后执行OP_ACCEPT事件
的处理。
public final class NioEventLoop extends SingleThreadEventLoop {
@Override
protected void run() {
int selectCnt = 0;
for (;;) {
try {
int strategy;
try {
strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
switch (strategy) {
case SelectStrategy.CONTINUE:
............省略.........
case SelectStrategy.BUSY_WAIT:
............省略.........
case SelectStrategy.SELECT:
............监听轮询IO事件.........
default:
}
} catch (IOException e) {
............省略.........
}
............处理IO就绪事件.........
............执行异步任务.........
}
}
但是由于这个 Bug 的存在,main reactor 在接收客户端连接的这个 read loop 中只接收了一个客户端连接就匆匆返回了。
private final class NioMessageUnsafe extends AbstractNioUnsafe {
do {
int localRead = doReadMessages(readBuf);
.........省略...........
} while (allocHandle.continueReading());
}
然后根据下图中这个 Reactor 的运行结构去执行异步任务,随后绕一大圈又会回到NioEventLoop#run
方法中重新发起一轮 OP_ACCEPT 事件轮询。
由于现在还有 15 个客户端并发连接没有被接收,所以此时 Main Reactor 线程并不会在selector.select()
上阻塞,最终绕一圈又会回到NioMessageUnsafe#read
方法的do{.....}while()
循环。在接收一个连接之后又退出循环。
本来我们可以在一次 read loop 中把这 16 个并发的客户端连接全部接收完毕的,因为这个 Bug,main reactor 需要不断的发起 OP_ACCEPT 事件的轮询,绕了很大一个圈子。同时也增加了许多不必要的 selector.select()系统调用开销
这时大家在看这个Issue#11708中的讨论是不是就清晰很多了~~
Issue#11708:https://github.com/netty/netty/issues/11708
笔者在写这篇文章的时候,Netty 最新版本是 4.1.68.final,这个 Bug 在 4.1.69.final 中被修复。
由于该 Bug 产生的原因正是因为服务端 NioServerSocketChannel(用于监听端口地址和接收客户端连接)和 客户端 NioSocketChannel(用于通信)中的 Config 配置类混用了同一个 ByteBuffer 分配器AdaptiveRecvByteBufAllocator
而导致的。
所以在新版本修复中专门为服务端 ServerSocketChannel 中的 Config 配置类引入了一个新的 ByteBuffer 分配器ServerChannelRecvByteBufAllocator
,专门用于服务端 ServerSocketChannel 接收客户端连接的场景。
在ServerChannelRecvByteBufAllocator
的父类DefaultMaxMessagesRecvByteBufAllocator
中引入了一个新的字段ignoreBytesRead
,用于表示是否忽略网络字节的读取,在创建服务端 Channel 配置类 NioServerSocketChannelConfig 的时候,这个字段会被赋值为true
。
当 main reactor 线程在 read loop 循环中接收客户端连接的时候。
private final class NioMessageUnsafe extends AbstractNioUnsafe {
do {
int localRead = doReadMessages(readBuf);
.........省略...........
} while (allocHandle.continueReading());
}
在 read loop 循环的末尾就会采用从ServerChannelRecvByteBufAllocator
中创建的MaxMessageHandle#continueReading
方法来判断读取连接次数是否超过了 16 次。由于这里的ignoreBytesRead == true
这回我们就会忽略totalBytesRead == 0
的情况,从而使得接收连接的 read loop 得以继续地执行下去。在一个 read loop 中一次性把 16 个连接全部接收完毕。
以上就是对这个 Bug 产生的原因,以及发现的过程,最后修复的方案一个全面的介绍,因此笔者也出现在了 netty 4.1.69.final 版本发布公告里的 thank-list 中。哈哈,真是令人开心的一件事情~~~
通过以上对 netty 接收客户端连接的全流程分析和对这个 Bug 来龙去脉以及修复方案的介绍,大家现在一定已经理解了整个接收连接的流程框架。
接下来笔者就把这个流程中涉及到的一些核心模块在单独拎出来从细节入手,为大家各个击破~~~
public class NioServerSocketChannel extends AbstractNioMessageChannel
implements io.netty.channel.socket.ServerSocketChannel {
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = SocketUtils.accept(javaChannel());
try {
if (ch != null) {
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
logger.warn("Failed to create a new channel from an accepted socket.", t);
try {
ch.close();
} catch (Throwable t2) {
logger.warn("Failed to close a socket.", t2);
}
}
return 0;
}
}
javaChannel()
获取封装在 Netty 服务端NioServerSocketChannel
中的JDK 原生 ServerSocketChannel
。 @Override
protected ServerSocketChannel javaChannel() {
return (ServerSocketChannel) super.javaChannel();
}
JDK NIO 原生
的ServerSocketChannel
的accept方法
获取JDK NIO 原生
客户端连接SocketChannel
。 public static SocketChannel accept(final ServerSocketChannel serverSocketChannel) throws IOException {
try {
return AccessController.doPrivileged(new PrivilegedExceptionAction<SocketChannel>() {
@Override
public SocketChannel run() throws IOException {
return serverSocketChannel.accept();
}
});
} catch (PrivilegedActionException e) {
throw (IOException) e.getCause();
}
}
这一步就是我们在《聊聊Netty那些事儿之从内核角度看IO模型》介绍到的调用监听Socket
的accept方法
,内核会基于监听Socket
创建出来一个新的Socket
专门用于与客户端之间的网络通信这个我们称之为客户端连接Socket
。这里的ServerSocketChannel
就类似于监听Socket
。SocketChannel
就类似于客户端连接Socket
。
由于我们在创建NioServerSocketChannel
的时候,会将JDK NIO 原生
的ServerSocketChannel
设置为非阻塞
,所以这里当ServerSocketChannel
上有客户端连接时就会直接创建SocketChannel
,如果此时并没有客户端连接时accept调用
就会立刻返回null
并不会阻塞。
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;
try {
//设置Channel为非阻塞 配合IO多路复用模型
ch.configureBlocking(false);
} catch (IOException e) {
..........省略.............
}
}
public class NioServerSocketChannel extends AbstractNioMessageChannel
implements io.netty.channel.socket.ServerSocketChannel {
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = SocketUtils.accept(javaChannel());
try {
if (ch != null) {
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
.........省略.......
}
return 0;
}
}
这里会根据ServerSocketChannel
的accept
方法获取到JDK NIO 原生
的SocketChannel
(用于底层真正与客户端通信的 Channel),来创建 Netty 中的NioSocketChannel
。
public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {
public NioSocketChannel(Channel parent, SocketChannel socket) {
super(parent, socket);
config = new NioSocketChannelConfig(this, socket.socket());
}
}
创建客户端NioSocketChannel
的过程其实和之前讲的创建服务端NioServerSocketChannel
大体流程是一样的,我们这里只对客户端NioSocketChannel
和服务端NioServerSocketChannel
在创建过程中的不同之处做一个对比。
具体细节部分大家可以在回看下《详细图解Netty Reactor启动全流程》一文中关于
NioServerSocketChannel
的创建的详细细节。
在我们介绍 Reactor 的创建文章中,我们提到 Netty 中的Channel
是具有层次的。由于客户端 NioSocketChannel 是在 main reactor 接收连接时在服务端 NioServerSocketChannel 中被创建的,所以在创建客户端 NioSocketChannel 的时候会通过构造函数指定了 parent 属性为NioServerSocketChanel
。并将JDK NIO 原生
的SocketChannel
封装进 Netty 的客户端NioSocketChannel
中。
而在 Reactor 启动过程中创建NioServerSocketChannel
的时候parent属性
指定是null
。因为它就是顶层的Channel
,负责创建客户端NioSocketChannel
。
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
客户端 NioSocketChannel 向 Sub Reactor 注册的是SelectionKey.OP_READ事件
,而服务端 NioServerSocketChannel 向 Main Reactor 注册的是SelectionKey.OP_ACCEPT事件
。
public abstract class AbstractNioByteChannel extends AbstractNioChannel {
protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
super(parent, ch, SelectionKey.OP_READ);
}
}
public class NioServerSocketChannel extends AbstractNioMessageChannel
implements io.netty.channel.socket.ServerSocketChannel {
public NioServerSocketChannel(ServerSocketChannel channel) {
//父类AbstractNioChannel中保存JDK NIO原生ServerSocketChannel以及要监听的事件OP_ACCEPT
super(null, channel, SelectionKey.OP_ACCEPT);
//DefaultChannelConfig中设置用于Channel接收数据用的buffer->AdaptiveRecvByteBufAllocator
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
}
客户端NioSocketChannel
继承的是AbstractNioByteChannel
,而服务端NioServerSocketChannel
继承的是AbstractNioMessageChannel
。它们继承的这两个抽象类一个前缀是Byte
,一个前缀是Message
有什么区别吗??
客户端
NioSocketChannel
主要处理的是服务端与客户端的通信,这里涉及到接收客户端发送来的数据,而Sub Reactor线程
从NioSocketChannel
中读取的正是网络通信数据单位为Byte
。服务端
NioServerSocketChannel
主要负责处理OP_ACCEPT事件
,创建用于通信的客户端NioSocketChannel
。这时候客户端与服务端还没开始通信,所以Main Reactor线程
从NioServerSocketChannel
的读取对象为Message
。这里的Message
指的就是底层的SocketChannel
客户端连接。
以上就是NioSocketChannel
与NioServerSocketChannel
创建过程中的不同之处,后面的过程就一样了。
SocketChannel
,并将其底层的 IO 模型设置为非阻塞
,保存需要监听的 IO 事件OP_READ
。 protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;
try {
//设置Channel为非阻塞 配合IO多路复用模型
ch.configureBlocking(false);
} catch (IOException e) {
}
}
channelId
,创建客户端 NioSocketChannel 的底层操作类NioByteUnsafe
,创建 pipeline。 protected AbstractChannel(Channel parent) {
this.parent = parent;
//channel全局唯一ID machineId+processId+sequence+timestamp+random
id = newId();
//unsafe用于底层socket的读写操作
unsafe = newUnsafe();
//为channel分配独立的pipeline用于IO事件编排
pipeline = newChannelPipeline();
}
AdaptiveRecvByteBufAllocator
。 public DefaultChannelConfig(Channel channel) {
this(channel, new AdaptiveRecvByteBufAllocator());
}
在 Bug 修复后的版本中服务端 NioServerSocketChannel 的 RecvByteBufAllocator 类型设置为
ServerChannelRecvByteBufAllocator
最终我们得到的客户端NioSocketChannel
结构如下:
在前边介绍接收连接的整体核心流程框架的时候,我们提到 main reactor 线程是在一个do{.....}while(...)
循环 read loop 中不断的调用ServerSocketChannel#accept
方法来接收客户端的连接。
当满足退出 read loop 循环的条件有两个:
main reactor 就会退出 read loop 循环,此时接收到的客户端连接 NioSocketChannel 暂存与List<Object> readBuf
集合中。
private final class NioMessageUnsafe extends AbstractNioUnsafe {
private final List<Object> readBuf = new ArrayList<Object>();
@Override
public void read() {
try {
try {
do {
........省略.........
//底层调用NioServerSocketChannel->doReadMessages 创建客户端SocketChannel
int localRead = doReadMessages(readBuf);
........省略.........
allocHandle.incMessagesRead(localRead);
} while (allocHandle.continueReading());
} catch (Throwable t) {
exception = t;
}
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
pipeline.fireChannelRead(readBuf.get(i));
}
........省略.........
} finally {
........省略.........
}
}
}
随后 main reactor 线程会遍历List<Object> readBuf
集合中的 NioSocketChannel,并在 NioServerSocketChannel 的 pipeline 中传播 ChannelRead 事件。
最终ChannelRead事件
会传播到ServerBootstrapAcceptor
中,这里正是 Netty 处理客户端连接的核心逻辑所在。
ServerBootstrapAcceptor
主要的作用就是初始化客户端NioSocketChannel
,并将客户端 NioSocketChannel 注册到Sub Reactor Group
中,并监听OP_READ事件
。
在 ServerBootstrapAcceptor 中会初始化客户端 NioSocketChannel 的这些属性。
比如:从 Reactor 组EventLoopGroup childGroup
,用于初始化NioSocketChannel
中的pipeline
用到的ChannelHandler childHandler
,以及NioSocketChannel
中的一些childOptions
和childAttrs
。
private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {
private final EventLoopGroup childGroup;
private final ChannelHandler childHandler;
private final Entry<ChannelOption<?>, Object>[] childOptions;
private final Entry<AttributeKey<?>, Object>[] childAttrs;
@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
//向客户端NioSocketChannel的pipeline中
//添加在启动配置类ServerBootstrap中配置的ChannelHandler
child.pipeline().addLast(childHandler);
//利用配置的属性初始化客户端NioSocketChannel
setChannelOptions(child, childOptions, logger);
setAttributes(child, childAttrs);
try {
/**
* 1:在Sub Reactor线程组中选择一个Reactor绑定
* 2:将客户端SocketChannel注册到绑定的Reactor上
* 3:SocketChannel注册到sub reactor中的selector上,并监听OP_READ事件
* */
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
}
正是在这里,netty 会将我们在《详细图解Netty Reactor启动全流程》的启动示例程序中在 ServerBootstrap 中配置的客户端 NioSocketChannel 的所有属性(child 前缀配置)初始化到 NioSocketChannel 中。
public final class EchoServer {
static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
public static void main(String[] args) throws Exception {
// Configure the server.
//创建主从Reactor线程组
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
final EchoServerHandler serverHandler = new EchoServerHandler();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)//配置主从Reactor
.channel(NioServerSocketChannel.class)//配置主Reactor中的channel类型
.option(ChannelOption.SO_BACKLOG, 100)//设置主Reactor中channel的option选项
.handler(new LoggingHandler(LogLevel.INFO))//设置主Reactor中Channel->pipline->handler
.childHandler(new ChannelInitializer<SocketChannel>() {//设置从Reactor中注册channel的pipeline
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
//p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(serverHandler);
}
});
// Start the server. 绑定端口启动服务,开始监听accept事件
ChannelFuture f = b.bind(PORT).sync();
// Wait until the server socket is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down all event loops to terminate all threads.
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
以上示例代码中通过 ServerBootstrap 配置的 NioSocketChannel 相关属性,会在 Netty 启动并开始初始化NioServerSocketChannel
的时候将ServerBootstrapAcceptor
的创建初始化工作封装成异步任务
,然后在NioServerSocketChannel
注册到Main Reactor
中成功后执行。
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {
@Override
void init(Channel channel) {
................省略................
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) {
final ChannelPipeline pipeline = ch.pipeline();
................省略................
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
}
在经过ServerBootstrapAccptor#chanelRead回调
的处理之后,此时客户端 NioSocketChannel 中 pipeline 的结构为:
随后会将初始化好的客户端 NioSocketChannel 向 Sub Reactor Group 中注册,并监听OP_READ事件
。
如下图中的步骤 3 所示:
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
客户端 NioSocketChannel 向 Sub Reactor Group 注册的流程完全和服务端 NioServerSocketChannel 向 Main Reactor Group 注册流程一样。
关于服务端 NioServerSocketChannel 的注册流程,笔者已经在《详细图解Netty Reactor启动全流程》一文中做出了详细的介绍,对相关细节感兴趣的同学可以在回看下。
这里笔者在带大家简要回顾下整个注册过程并着重区别对比客户端 NioSocetChannel 与服务端 NioServerSocketChannel 注册过程中不同的地方。
public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {
@Override
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
@Override
public EventExecutor next() {
return chooser.next();
}
}
public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {
@Override
public ChannelFuture register(Channel channel) {
//注册channel到绑定的Reactor上
return register(new DefaultChannelPromise(channel, this));
}
@Override
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
//unsafe负责channel底层的各种操作
promise.channel().unsafe().register(this, promise);
return promise;
}
}
NioServerSocketChannel
的注册过程时,这里的promise.channel()
为NioServerSocketChannel
。底层的 unsafe 操作类为NioMessageUnsafe
。promise.channel()
为NioSocketChannel
。底层的 unsafe 操作类为NioByteUnsafe
。 @Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
..............省略....................
//此时这里的eventLoop为Sub Reactor
AbstractChannel.this.eventLoop = eventLoop;
/**
* 执行channel注册的操作必须是Reactor线程来完成
*
* 1: 如果当前执行线程是Reactor线程,则直接执行register0进行注册
* 2:如果当前执行线程是外部线程,则需要将register0注册操作 封装程异步Task 由Reactor线程执行
* */
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
..............省略....................
}
}
}
注意此时传递进来的 EventLoop eventLoop 为 Sub Reactor。
但此时的执行线程为Main Reactor线程
,并不是 Sub Reactor 线程(此时还未启动)。
所以这里的eventLoop.inEventLoop()
返回的是false
。
在else分支
中向绑定的 Sub Reactor 提交注册NioSocketChannel
的任务。
当注册任务提交后,此时绑定的
Sub Reactor线程
启动。
我们又来到了 Channel 注册的老地方register0方法
。在《详细图解Netty Reactor启动全流程》中我们花了大量的篇幅介绍了这个方法。这里我们只对比NioSocketChannel
与NioServerSocketChannel
不同的地方。
private void register0(ChannelPromise promise) {
try {
................省略..................
boolean firstRegistration = neverRegistered;
//执行真正的注册操作
doRegister();
//修改注册状态
neverRegistered = false;
registered = true;
pipeline.invokeHandlerAddedIfNeeded();
if (isActive()) {
if (firstRegistration) {
//触发channelActive事件
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
beginRead();
}
}
} catch (Throwable t) {
................省略..................
}
}
这里 doRegister()方法
将 NioSocketChannel 注册到 Sub Reactor 中的Selector
上。
public abstract class AbstractNioChannel extends AbstractChannel {
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
...............省略...............
}
}
}
}
这里是 Netty 客户端NioSocketChannel
与 JDK NIO 原生 SocketChannel 关联的地方。此时注册的IO事件
依然是0
。目的也是只是为了获取 NioSocketChannel 在 Selector 中的SelectionKey
。
同时通过SelectableChannel#register
方法将 Netty 自定义的 NioSocketChannel(这里的 this 指针)附着在 SelectionKey 的 attechment 属性上,完成 Netty 自定义 Channel 与 JDK NIO Channel 的关系绑定。这样在每次对 Selector 进行 IO 就绪事件轮询时,Netty 都可以从 JDK NIO Selector 返回的 SelectionKey 中获取到自定义的 Channel 对象(这里指的就是 NioSocketChannel)。
随后调用pipeline.invokeHandlerAddedIfNeeded()
回调客户端 NioSocketChannel 上 pipeline 中的所有 ChannelHandler 的handlerAdded方法
,此时pipeline
的结构中只有一个ChannelInitializer
。最终会在ChannelInitializer#handlerAdded
回调方法中初始化客户端NioSocketChannel
的pipeline
。
public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter {
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
if (ctx.channel().isRegistered()) {
if (initChannel(ctx)) {
//初始化工作完成后,需要将自身从pipeline中移除
removeState(ctx);
}
}
}
protected abstract void initChannel(C ch) throws Exception;
}
关于对 Channel 中 pipeline 的详细初始化过程,对细节部分感兴趣的同学可以回看下《详细图解Netty Reactor启动全流程》
此时客户端 NioSocketChannel 中的 pipeline 中的结构就变为了我们自定义的样子,在示例代码中我们自定义的ChannelHandler
为EchoServerHandler
。
@Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ctx.write(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// Close the connection when an exception is raised.
cause.printStackTrace();
ctx.close();
}
}
当客户端 NioSocketChannel 中的 pipeline 初始化完毕后,netty 就开始调用safeSetSuccess(promise)方法
回调regFuture
中注册的ChannelFutureListener
,通知客户端 NioSocketChannel 已经成功注册到 Sub Reactor 上了。
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
在服务端 NioServerSocketChannel 注册的时候我们会在 listener 中向 Main Reactor 提交
bind绑定端口地址任务
。但是在NioSocketChannel
注册的时候,只会在listener
中处理一下注册失败的情况。
当 Sub Reactor 线程通知 ChannelFutureListener 注册成功之后,随后就会调用pipeline.fireChannelRegistered()
在客户端 NioSocketChannel 的 pipeline 中传播ChannelRegistered事件
。
这里笔者重点要强调下,在之前介绍 NioServerSocketChannel 注册的时候,我们提到因为此时 NioServerSocketChannel 并未绑定端口地址,所以这时的 NioServerSocketChannel 并未激活,这里的isActive()
返回false
。register0方法
直接返回。
服务端 NioServerSocketChannel 判断是否激活的标准为端口是否绑定成功。
public class NioServerSocketChannel extends AbstractNioMessageChannel
implements io.netty.channel.socket.ServerSocketChannel {
@Override
public boolean isActive() {
return isOpen() && javaChannel().socket().isBound();
}
}
客户端
NioSocketChannel
判断是否激活的标准为是否处于Connected状态
。那么显然这里肯定是处于connected状态
的。
@Override
public boolean isActive() {
SocketChannel ch = javaChannel();
return ch.isOpen() && ch.isConnected();
}
NioSocketChannel
已经处于connected状态
,这里并不需要绑定端口,所以这里的isActive()
返回true
。
if (isActive()) {
/**
* 客户端SocketChannel注册成功后会走这里,在channelActive事件回调中注册OP_READ事件
* */
if (firstRegistration) {
//触发channelActive事件
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
.......省略..........
}
}
}
最后调用pipeline.fireChannelActive()
在 NioSocketChannel 中的 pipeline 传播ChannelActive事件
,最终在pipeline
的头结点HeadContext
中响应并注册OP_READ事件
到Sub Reactor
中的Selector
上。
public abstract class AbstractNioChannel extends AbstractChannel { {
@Override
protected void doBeginRead() throws Exception {
..............省略................
final int interestOps = selectionKey.interestOps();
/**
* 1:ServerSocketChannel 初始化时 readInterestOp设置的是OP_ACCEPT事件
* 2:SocketChannel 初始化时 readInterestOp设置的是OP_READ事件
* */
if ((interestOps & readInterestOp) == 0) {
//注册监听OP_ACCEPT或者OP_READ事件
selectionKey.interestOps(interestOps | readInterestOp);
}
}
}
注意这里的
readInterestOp
为客户端NioSocketChannel
在初始化时设置的OP_READ事件
。
到这里,Netty 中的Main Reactor
接收连接的整个流程,我们就介绍完了,此时 Netty 中主从 Reactor 组的结构就变为:
本文我们介绍了NioServerSocketChannel
处理客户端连接事件的整个过程。
NioSocketChannel
。NioSocketChannel
中的pipeline
。NioSocketChannel
向Sub Reactor
注册的过程其中我们也对比了NioServerSocketChannel
与NioSocketChannel
在创建初始化以及后面向Reactor
注册过程中的差异之处。
当客户端NioSocketChannel
接收完毕并向Sub Reactor
注册成功后,那么接下来Sub Reactor
就开始监听注册其上的所有客户端NioSocketChannel
的OP_READ事件
,并等待客户端向服务端发送网络数据。
后面Reactor
的主角就该变为Sub Reactor
以及注册在其上的客户端NioSocketChannel
了。
下篇文章,我们将会讨论 Netty 是如何接收网络数据的~~~~ 我们下篇文章见~~
领取专属 10元无门槛券
私享最新 技术干货