Netty Review - ServerBootstrap源码解析
Netty Review - NioServerSocketChannel源码分析
Netty Review - 服务端channel注册流程源码解析
Netty客户端的创建流程通常涉及以下步骤:
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
public class MyClient {
public static void main(String[] args) throws Exception{
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group).channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new MyMessageEncoder());
pipeline.addLast(new MyClientHandler());
}
});
System.out.println("netty client start。。");
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9000).sync();
channelFuture.channel().closeFuture().sync();
}finally {
group.shutdownGracefully();
}
}
}
在这个示例中,我们创建了一个NIO的EventLoopGroup,使用NioSocketChannel作为客户端的Channel类型,设置了TCP连接的保持活动选项,并初始化ChannelPipeline。最后,通过connect()方法连接到远程服务器,并启动客户端。
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;
public class MyClientHandler extends SimpleChannelInboundHandler<MyMessageProtocol> {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
for(int i = 0; i< 2; i++) {
String msg = "你好,我是artisan";
//创建协议包对象
MyMessageProtocol messageProtocol = new MyMessageProtocol();
messageProtocol.setLen(msg.getBytes(CharsetUtil.UTF_8).length);
messageProtocol.setContent(msg.getBytes(CharsetUtil.UTF_8));
ctx.writeAndFlush(messageProtocol);
}
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, MyMessageProtocol msg) throws Exception {
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
当客户端连接时,服务器端会监听到一个 OP_ACCEPT
事件。这是由于服务器端的 NIO 通道(通常是 ServerSocketChannel
)在接受客户端连接时,会触发 OP_ACCEPT
事件。这个事件通知服务器端,有一个新的连接已经准备好接受。
在 Netty 中,当服务器端监听到 OP_ACCEPT
事件时,会执行相应的处理逻辑。通常情况下,服务器端会执行以下步骤:
Selector
对象。Selector
获取到发生事件的 SelectionKey
。SelectionKey
中获取到对应的通道(ServerSocketChannel
)。ServerSocketChannel
的 accept()
方法,接受客户端的连接,返回一个新的 SocketChannel
对象,表示与客户端建立的连接。SocketChannel
注册到 Selector
上,并注册 OP_READ
事件,以便读取客户端发送的数据。这样,服务器端就能够接受客户端的连接,并与之建立通信。
这段代码是 Netty 中用于处理读取数据的方法。以下是对代码的中文注释:
@Override
public void read() {
assert eventLoop().inEventLoop(); // 断言当前线程处于事件循环中
// 获取通道配置和管道
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
// 获取用于分配接收字节缓冲区的句柄
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.reset(config); // 重置句柄状态
boolean closed = false; // 标志是否已关闭连接
Throwable exception = null; // 异常信息
try {
try {
do {
int localRead = doReadMessages(readBuf); // 读取消息到缓冲区
if (localRead == 0) { // 如果未读取到任何数据
break;
}
if (localRead < 0) { // 如果读取到的数据小于0,表示连接已关闭
closed = true;
break;
}
allocHandle.incMessagesRead(localRead); // 增加读取的消息数量
} while (allocHandle.continueReading()); // 循环读取数据,直到不再需要读取或者没有更多的数据
} catch (Throwable t) {
exception = t; // 记录异常信息
}
int size = readBuf.size(); // 获取读取到的数据数量
for (int i = 0; i < size; i++) { // 遍历读取到的数据
readPending = false; // 标记为未挂起状态
pipeline.fireChannelRead(readBuf.get(i)); // 将读取到的数据传递给管道的下一个处理器
}
readBuf.clear(); // 清空读取缓冲区
allocHandle.readComplete(); // 表示读取操作完成
pipeline.fireChannelReadComplete(); // 通知管道读取操作已完成
if (exception != null) { // 如果有异常发生
closed = closeOnReadError(exception); // 根据异常情况判断是否需要关闭连接
pipeline.fireExceptionCaught(exception); // 将异常信息传递给管道的异常处理器
}
if (closed) { // 如果连接已关闭
inputShutdown = true; // 标记为输入流已关闭
if (isOpen()) { // 如果通道仍然打开
close(voidPromise()); // 关闭通道
}
}
} finally {
// 检查是否有未处理的读取挂起,可能的原因是:
// * 用户在 channelRead(...) 方法中调用了 Channel.read() 或 ChannelHandlerContext.read()
// * 用户在 channelReadComplete(...) 方法中调用了 Channel.read() 或 ChannelHandlerContext.read()
// 详情参考:https://github.com/netty/netty/issues/2254
if (!readPending && !config.isAutoRead()) { // 如果没有挂起的读取,并且未开启自动读取
removeReadOp(); // 移除读取操作
}
}
}
这段代码负责从通道中读取数据,并将读取到的数据传递给管道中的下一个处理器。在读取数据的过程中,会处理可能发生的异常,并根据需要关闭连接。同时,还会处理是否需要继续读取数据,以及是否需要移除读取操作。
doReadMessages(readBuf)
io.netty.channel.socket.nio.NioServerSocketChannel#doReadMessages
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
// 从底层 SocketChannel 接受新连接
SocketChannel ch = SocketUtils.accept(javaChannel());
try {
if (ch != null) { // 如果成功接受到新连接
// 创建一个新的 NioSocketChannel 实例,并添加到List中
buf.add(new NioSocketChannel(this, ch));
return 1; // 返回成功接受到一个连接
}
} catch (Throwable t) {
// 处理异常
logger.warn("Failed to create a new channel from an accepted socket.", t);
try {
// 尝试关闭 SocketChannel
ch.close();
} catch (Throwable t2) {
// 处理关闭异常
logger.warn("Failed to close a socket.", t2);
}
}
return 0; // 返回没有接受到新连接
}
new NioSocketChannel(this, ch)
public NioSocketChannel(Channel parent, SocketChannel socket) {
super(parent, socket);
config = new NioSocketChannelConfig(this, socket.socket());
}
接下来这段代码是 AbstractNioByteChannel
的构造函数,它调用了父类 AbstractNioChannel
的构造函数,并指定了感兴趣的事件为 SelectionKey.OP_READ
,表示该通道对读取事件感兴趣。让我们逐行解释:
protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
// 调用父类 AbstractNioChannel 的构造函数
super(parent, ch, SelectionKey.OP_READ);
}
这个构造函数的作用是初始化 AbstractNioByteChannel
,它接受一个 Channel
父类和一个 java.nio.channels.SelectableChannel
对象作为参数。在构造函数中,通过调用父类的构造函数,将 SelectableChannel
注册到了父类的 selector
中,并指定了对读取事件感兴趣。
剩下的逻辑如下
pipeline.fireChannelRead(readBuf.get(i))
当客户端向服务器端发送数据时,会触发 OP_READ
事件。这是由于服务器端的 NIO 通道在接收到客户端发送的数据时,会触发 OP_READ
事件。这个事件通知服务器端,有数据可读取。
在 Netty 中,当服务器端监听到 OP_READ
事件时,会执行相应的处理逻辑。通常情况下,服务器端会执行以下步骤:
Selector
对象。Selector
获取到发生事件的 SelectionKey
。SelectionKey
中获取到对应的通道(SocketChannel
)。SocketChannel
中读取数据,并将数据存储到缓冲区中。这样,服务器端就能够接收客户端发送的数据,并根据业务逻辑进行处理。
这段代码是 Netty 中用于读取数据的方法。让我们逐行解释:
@Override
public final void read() {
final ChannelConfig config = config(); // 获取通道配置
if (shouldBreakReadReady(config)) { // 检查是否应该中断读取就绪状态
clearReadPending(); // 清除读取挂起标志
return;
}
final ChannelPipeline pipeline = pipeline(); // 获取通道管道
final ByteBufAllocator allocator = config.getAllocator(); // 获取字节缓冲区分配器
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle(); // 获取接收缓冲区分配器的句柄
allocHandle.reset(config); // 重置分配器
ByteBuf byteBuf = null; // 字节缓冲区对象
boolean close = false; // 是否关闭通道
try {
do {
byteBuf = allocHandle.allocate(allocator); // 分配字节缓冲区
allocHandle.lastBytesRead(doReadBytes(byteBuf)); // 读取字节到缓冲区
if (allocHandle.lastBytesRead() <= 0) { // 检查是否有字节被读取
// 如果没有读取到任何字节,释放缓冲区
byteBuf.release();
byteBuf = null;
close = allocHandle.lastBytesRead() < 0; // 检查是否需要关闭通道
if (close) { // 如果需要关闭通道
// 因为收到了 EOF,所以没有剩余可读取的数据。
readPending = false; // 标记读取完成
}
break; // 跳出循环
}
allocHandle.incMessagesRead(1); // 增加已读消息数
readPending = false; // 标记读取完成
pipeline.fireChannelRead(byteBuf); // 将读取到的字节缓冲区传递给通道的处理器链
byteBuf = null;
} while (allocHandle.continueReading()); // 继续读取直到满足条件
allocHandle.readComplete(); // 读取完成
pipeline.fireChannelReadComplete(); // 通知通道读取完成
if (close) { // 如果需要关闭通道
closeOnRead(pipeline); // 关闭通道
}
} catch (Throwable t) {
// 处理读取过程中的异常
handleReadException(pipeline, byteBuf, t, close, allocHandle);
} finally {
// 检查是否有未处理的读取挂起
if (!readPending && !config.isAutoRead()) {
removeReadOp(); // 移除读取操作
}
}
}
从通道中读取数据,并将读取到的数据传递给通道的处理器链进行处理。在读取过程中可能会出现异常,需要进行相应的处理。最后,根据读取的结果来判断是否需要关闭通道。
里面的主要逻辑如下
图都给你画好了,戳这里