前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Netty Review - 客户端流程源码解析

Netty Review - 客户端流程源码解析

作者头像
小小工匠
发布2024-05-26 12:27:18
370
发布2024-05-26 12:27:18
举报
文章被收录于专栏:小工匠聊架构小工匠聊架构

Pre

Netty Review - ServerBootstrap源码解析

Netty Review - NioServerSocketChannel源码分析

Netty Review - 服务端channel注册流程源码解析


Netty Client Code

Netty客户端的创建流程通常涉及以下步骤:

  1. 创建Bootstrap实例:使用Bootstrap类创建一个Netty客户端引导程序实例。Bootstrap负责配置和启动Netty客户端。
  2. 设置EventLoopGroup:为客户端引导程序指定一个EventLoopGroup。EventLoopGroup是一组处理I/O操作的线程池,通常包含一个用于处理连接的boss线程池和一个用于处理I/O事件的worker线程池。
  3. 指定Channel类型:通过指定Channel的实现类或提供一个Channel工厂来指定客户端将要使用的Channel类型。不同的Channel类型对应着不同的传输协议,如NIO、Epoll、KQueue等。
  4. 配置Channel选项:通过调用Bootstrap的option()方法来配置客户端Channel的选项,如TCP连接的参数、Socket参数等。
  5. 设置Channel处理器:调用Bootstrap的handler()方法设置ChannelPipeline中的ChannelHandler。ChannelHandler用于处理入站和出站事件,比如编解码、数据处理、日志等。
  6. 连接到服务器:调用Bootstrap的connect()方法连接到服务器。此时,客户端会尝试连接到指定的远程服务器,并返回一个ChannelFuture对象,用于异步等待连接的建立。
  7. 处理连接结果:通过ChannelFuture对象的addListener()方法添加一个监听器,监听连接操作的结果。一旦连接建立成功或失败,将执行相应的操作。
代码语言:javascript
复制
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

public class MyClient {
    public static void main(String[] args)  throws  Exception{

        EventLoopGroup group = new NioEventLoopGroup();

        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group).channel(NioSocketChannel.class)
                    .option(ChannelOption.SO_KEEPALIVE, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(new MyMessageEncoder());
                            pipeline.addLast(new MyClientHandler());
                        }
                    });

            System.out.println("netty client start。。");
            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9000).sync();
            channelFuture.channel().closeFuture().sync();
        }finally {
            group.shutdownGracefully();
        }
    }
}

在这个示例中,我们创建了一个NIO的EventLoopGroup,使用NioSocketChannel作为客户端的Channel类型,设置了TCP连接的保持活动选项,并初始化ChannelPipeline。最后,通过connect()方法连接到远程服务器,并启动客户端。

代码语言:javascript
复制
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;

public class MyClientHandler extends SimpleChannelInboundHandler<MyMessageProtocol> {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        for(int i = 0; i< 2; i++) {
            String msg = "你好,我是artisan";
            //创建协议包对象
            MyMessageProtocol messageProtocol = new MyMessageProtocol();
            messageProtocol.setLen(msg.getBytes(CharsetUtil.UTF_8).length);
            messageProtocol.setContent(msg.getBytes(CharsetUtil.UTF_8));
            ctx.writeAndFlush(messageProtocol);
        }
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, MyMessageProtocol msg) throws Exception {

    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }

}

Netty 客户端创建流程


源码分析

入口


客户端建立连接

当客户端连接时,服务器端会监听到一个 OP_ACCEPT 事件。这是由于服务器端的 NIO 通道(通常是 ServerSocketChannel)在接受客户端连接时,会触发 OP_ACCEPT 事件。这个事件通知服务器端,有一个新的连接已经准备好接受。

在 Netty 中,当服务器端监听到 OP_ACCEPT 事件时,会执行相应的处理逻辑。通常情况下,服务器端会执行以下步骤:

  1. 获取到服务器端的 Selector 对象。
  2. 通过 Selector 获取到发生事件的 SelectionKey
  3. SelectionKey 中获取到对应的通道(ServerSocketChannel)。
  4. 调用 ServerSocketChannelaccept() 方法,接受客户端的连接,返回一个新的 SocketChannel 对象,表示与客户端建立的连接。
  5. 将新建立的 SocketChannel 注册到 Selector 上,并注册 OP_READ 事件,以便读取客户端发送的数据。
  6. 处理客户端连接成功的逻辑,如记录日志、发送欢迎消息等。

这样,服务器端就能够接受客户端的连接,并与之建立通信。


NioMessageUnsafe#read 处理 OP_ACCEPT

这段代码是 Netty 中用于处理读取数据的方法。以下是对代码的中文注释:

代码语言:javascript
复制
@Override
public void read() {
    assert eventLoop().inEventLoop(); // 断言当前线程处于事件循环中

    // 获取通道配置和管道
    final ChannelConfig config = config();
    final ChannelPipeline pipeline = pipeline();

    // 获取用于分配接收字节缓冲区的句柄
    final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
    allocHandle.reset(config); // 重置句柄状态

    boolean closed = false; // 标志是否已关闭连接
    Throwable exception = null; // 异常信息

    try {
        try {
            do {
                int localRead = doReadMessages(readBuf); // 读取消息到缓冲区
                if (localRead == 0) { // 如果未读取到任何数据
                    break;
                }
                if (localRead < 0) { // 如果读取到的数据小于0,表示连接已关闭
                    closed = true;
                    break;
                }

                allocHandle.incMessagesRead(localRead); // 增加读取的消息数量
            } while (allocHandle.continueReading()); // 循环读取数据,直到不再需要读取或者没有更多的数据
        } catch (Throwable t) {
            exception = t; // 记录异常信息
        }

        int size = readBuf.size(); // 获取读取到的数据数量
        for (int i = 0; i < size; i++) { // 遍历读取到的数据
            readPending = false; // 标记为未挂起状态
            pipeline.fireChannelRead(readBuf.get(i)); // 将读取到的数据传递给管道的下一个处理器
        }
        readBuf.clear(); // 清空读取缓冲区
        allocHandle.readComplete(); // 表示读取操作完成
        pipeline.fireChannelReadComplete(); // 通知管道读取操作已完成

        if (exception != null) { // 如果有异常发生
            closed = closeOnReadError(exception); // 根据异常情况判断是否需要关闭连接
            pipeline.fireExceptionCaught(exception); // 将异常信息传递给管道的异常处理器
        }

        if (closed) { // 如果连接已关闭
            inputShutdown = true; // 标记为输入流已关闭
            if (isOpen()) { // 如果通道仍然打开
                close(voidPromise()); // 关闭通道
            }
        }
    } finally {
        // 检查是否有未处理的读取挂起,可能的原因是:
        // * 用户在 channelRead(...) 方法中调用了 Channel.read() 或 ChannelHandlerContext.read()
        // * 用户在 channelReadComplete(...) 方法中调用了 Channel.read() 或 ChannelHandlerContext.read()
        // 详情参考:https://github.com/netty/netty/issues/2254
        if (!readPending && !config.isAutoRead()) { // 如果没有挂起的读取,并且未开启自动读取
            removeReadOp(); // 移除读取操作
        }
    }
}

这段代码负责从通道中读取数据,并将读取到的数据传递给管道中的下一个处理器。在读取数据的过程中,会处理可能发生的异常,并根据需要关闭连接。同时,还会处理是否需要继续读取数据,以及是否需要移除读取操作。


doReadMessages(readBuf)

io.netty.channel.socket.nio.NioServerSocketChannel#doReadMessages

代码语言:javascript
复制
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
    // 从底层 SocketChannel 接受新连接
    SocketChannel ch = SocketUtils.accept(javaChannel());

    try {
        if (ch != null) { // 如果成功接受到新连接
            // 创建一个新的 NioSocketChannel 实例,并添加到List中
            buf.add(new NioSocketChannel(this, ch));
            return 1; // 返回成功接受到一个连接
        }
    } catch (Throwable t) {
        // 处理异常
        logger.warn("Failed to create a new channel from an accepted socket.", t);

        try {
            // 尝试关闭 SocketChannel
            ch.close();
        } catch (Throwable t2) {
            // 处理关闭异常
            logger.warn("Failed to close a socket.", t2);
        }
    }

    return 0; // 返回没有接受到新连接
}

new NioSocketChannel(this, ch)

代码语言:javascript
复制
 public NioSocketChannel(Channel parent, SocketChannel socket) {
        super(parent, socket);
        config = new NioSocketChannelConfig(this, socket.socket());
    }

接下来这段代码是 AbstractNioByteChannel 的构造函数,它调用了父类 AbstractNioChannel 的构造函数,并指定了感兴趣的事件为 SelectionKey.OP_READ,表示该通道对读取事件感兴趣。让我们逐行解释:

代码语言:javascript
复制
protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
    // 调用父类 AbstractNioChannel 的构造函数
    super(parent, ch, SelectionKey.OP_READ);
}

这个构造函数的作用是初始化 AbstractNioByteChannel,它接受一个 Channel 父类和一个 java.nio.channels.SelectableChannel 对象作为参数。在构造函数中,通过调用父类的构造函数,将 SelectableChannel 注册到了父类的 selector 中,并指定了对读取事件感兴趣。

剩下的逻辑如下

pipeline.fireChannelRead(readBuf.get(i))


客户端发送数据

当客户端向服务器端发送数据时,会触发 OP_READ 事件。这是由于服务器端的 NIO 通道在接收到客户端发送的数据时,会触发 OP_READ 事件。这个事件通知服务器端,有数据可读取。

在 Netty 中,当服务器端监听到 OP_READ 事件时,会执行相应的处理逻辑。通常情况下,服务器端会执行以下步骤:

  1. 获取到服务器端的 Selector 对象。
  2. 通过 Selector 获取到发生事件的 SelectionKey
  3. SelectionKey 中获取到对应的通道(SocketChannel)。
  4. SocketChannel 中读取数据,并将数据存储到缓冲区中。
  5. 处理从客户端接收到的数据,执行相应的业务逻辑,如解析请求、处理消息等。
  6. 如有必要,向客户端发送响应消息。

这样,服务器端就能够接收客户端发送的数据,并根据业务逻辑进行处理。

NioByteUnsafe#read 处理 OP_READ

这段代码是 Netty 中用于读取数据的方法。让我们逐行解释:

代码语言:javascript
复制
@Override
public final void read() {
    final ChannelConfig config = config(); // 获取通道配置
    if (shouldBreakReadReady(config)) { // 检查是否应该中断读取就绪状态
        clearReadPending(); // 清除读取挂起标志
        return;
    }
    final ChannelPipeline pipeline = pipeline(); // 获取通道管道
    final ByteBufAllocator allocator = config.getAllocator(); // 获取字节缓冲区分配器
    final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle(); // 获取接收缓冲区分配器的句柄
    allocHandle.reset(config); // 重置分配器

    ByteBuf byteBuf = null; // 字节缓冲区对象
    boolean close = false; // 是否关闭通道
    try {
        do {
            byteBuf = allocHandle.allocate(allocator); // 分配字节缓冲区
            allocHandle.lastBytesRead(doReadBytes(byteBuf)); // 读取字节到缓冲区
            if (allocHandle.lastBytesRead() <= 0) { // 检查是否有字节被读取
                // 如果没有读取到任何字节,释放缓冲区
                byteBuf.release();
                byteBuf = null;
                close = allocHandle.lastBytesRead() < 0; // 检查是否需要关闭通道
                if (close) { // 如果需要关闭通道
                    // 因为收到了 EOF,所以没有剩余可读取的数据。
                    readPending = false; // 标记读取完成
                }
                break; // 跳出循环
            }

            allocHandle.incMessagesRead(1); // 增加已读消息数
            readPending = false; // 标记读取完成
            pipeline.fireChannelRead(byteBuf); // 将读取到的字节缓冲区传递给通道的处理器链
            byteBuf = null;
        } while (allocHandle.continueReading()); // 继续读取直到满足条件

        allocHandle.readComplete(); // 读取完成
        pipeline.fireChannelReadComplete(); // 通知通道读取完成

        if (close) { // 如果需要关闭通道
            closeOnRead(pipeline); // 关闭通道
        }
    } catch (Throwable t) {
        // 处理读取过程中的异常
        handleReadException(pipeline, byteBuf, t, close, allocHandle);
    } finally {
        // 检查是否有未处理的读取挂起
        if (!readPending && !config.isAutoRead()) {
            removeReadOp(); // 移除读取操作
        }
    }
}

从通道中读取数据,并将读取到的数据传递给通道的处理器链进行处理。在读取过程中可能会出现异常,需要进行相应的处理。最后,根据读取的结果来判断是否需要关闭通道。

里面的主要逻辑如下


源码图

图都给你画好了,戳这里

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2024-05-24,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Pre
  • Netty Client Code
  • Netty 客户端创建流程
  • 源码分析
    • 入口
      • 客户端建立连接
        • NioMessageUnsafe#read 处理 OP_ACCEPT
      • 客户端发送数据
        • NioByteUnsafe#read 处理 OP_READ
    • 源码图
    相关产品与服务
    数据保险箱
    数据保险箱(Cloud Data Coffer Service,CDCS)为您提供更高安全系数的企业核心数据存储服务。您可以通过自定义过期天数的方法删除数据,避免误删带来的损害,还可以将数据跨地域存储,防止一些不可抗因素导致的数据丢失。数据保险箱支持通过控制台、API 等多样化方式快速简单接入,实现海量数据的存储管理。您可以使用数据保险箱对文件数据进行上传、下载,最终实现数据的安全存储和提取。
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档