前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >【Netty】「NIO」(四)消息边界与可写事件

【Netty】「NIO」(四)消息边界与可写事件

作者头像
sidiot
发布于 2023-08-30 07:15:42
发布于 2023-08-30 07:15:42
24600
代码可运行
举报
文章被收录于专栏:技术大杂烩技术大杂烩
运行总次数:0
代码可运行

前言

本篇博文是《从0到1学习 Netty》中 NIO 系列的第四篇博文,主要内容是介绍如何处理消息边界以及通过可写事件解决写入内容过多的问题,往期系列文章请访问博主的 Netty 专栏,博文中的所有代码全部收集在博主的 GitHub 仓库中;

消息边界

将缓冲区的大小设置为4个字节,发送以下消息时:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
sc.write(Charset.defaultCharset().encode("你好,sidiot!"));

运行结果:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
你�
���
�sid
iot�
��

这是因为 UTF-8 字符集下,1个汉字占用3个字节,此时缓冲区大小为4个字节,一次读时间无法处理完通道中的所有数据,所以会触发多次读事件。这导致其他几个中文字符被拆分开来发送,因此解码时就会出现如上问题。

一般的解决思路有以下三种:

  • 固定消息长度,数据包大小一样,服务器按预定长度读取,当发送的数据较少时,需要将数据进行填充,直到长度与消息规定长度一致,缺点是浪费带宽;
  • 按分隔符拆分,缺点是效率低,需要一个一个字符地去匹配分隔符;
  • TLV 格式,即 Type 类型、Length 长度、Value 数据,也就是在消息开头用一些空间存放后面数据的长度,如 HTTP 请求头中的 Content-Type 与 Content-Length。类型和长度已知的情况下,就可以方便获取消息大小,分配合适的 buffer,缺点是 buffer 需要提前分配,如果内容过大,则影响 server 的吞吐量;
代码语言:txt
AI代码解释
复制
- Http 1.1 是 TLV 格式;
- Http 2.0 是 LTV 格式;

接下来通过按分隔符拆分的方式来处理消息边界问题;

先编写一个 split() 函数,用于处理将 buffer 的内容按分隔符进行拆分,代码如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
private static void split(ByteBuffer buffer) {
    buffer.flip();
    for(int i=0; i<buffer.limit(); i++) {
        if (buffer.get(i) == '\n') {
            int length = i + 1 - buffer.position();
            ByteBuffer target = ByteBuffer.allocate(length);
            for(int j=0; j<length; j++) {
                target.put(buffer.get());
            }
            debugAll(target);
        }
    }
    buffer.compact();
}

然后再看到 ByteBuffer 类,虽然 ByteBuffer 是线程安全的,但是它并不是设计用于多线程并发访问,如果多个线程同时访问同一个 ByteBuffer 对象,那么可能会出现数据竞争和一致性问题,因此,我们需要确保每个 Channel 都有自己的 ByteBuffer 对象,来避免共享;

这时就要看到 register 函数:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public abstract SelectionKey register(Selector sel, int ops, Object att)
    throws ClosedChannelException;

这第三个参数 att 表示附件的意思,即可以向其中放入一个 Object 类型的对象,该对象会与登记的 Channel 以及其对应的 SelectionKey 绑定,可以从 SelectionKey 获取到对应 Channel 的附件,我们可以将 buffer 放入其中:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
ByteBuffer buffer = ByteBuffer.allocate(16);
channel.register(selector, SelectionKey.OP_READ, buffer);

之后可以通过 SelectionKey 的 attachment() 方法获得附件:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
ByteBuffer buf = (ByteBuffer) key.attachment();

此外,还要注意 buffer 的大小,如果发送内容的大小要大于 buffer 的大小,则会出现消息丢失的情况,比如要发送 "Hello, World! --sid10t.\n",由于 buffer 为16,最后接收到的只有 Hello, World! --,但是因为采用了按分隔符拆分,控制台不会输出任何字符;

因此,需要对 buffer 进行动态扩容,代码如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
if (buf.position() == buf.limit()) {
    ByteBuffer newBuf = ByteBuffer.allocate(buf.capacity() * 2);
    buf.flip();
    newBuf.put(buf);
    key.attach(newBuf);
}

上述代码是考虑到 split() 函数中使用的是 compact() 方法,因此当 positionlimit 相等时,说明缓冲区中的数据并未被读取(容量太小),此时创建新的缓冲区,其大小扩大至原先的两倍。同时还要将旧缓冲区中的数据拷贝到新的缓冲区中,并调用 SelectionKey 的 attach() 方法,将新的缓冲区作为新的附件放入 SelectionKey 中;

整体代码如下所示:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@Slf4j
public class MSGBoundary {

    public static void main(String[] args) {
        try {
            Selector selector = Selector.open();

            ServerSocketChannel ssc = ServerSocketChannel.open();
            ssc.configureBlocking(false);

            SelectionKey sscKey = ssc.register(selector, SelectionKey.OP_ACCEPT);
            log.debug("Register Key: {}", sscKey);

            ssc.bind(new InetSocketAddress(7999));

            while (true) {
                selector.select();

                Set<SelectionKey> keySet = selector.selectedKeys();
                Iterator<SelectionKey> iter = keySet.iterator();
//                log.debug("count: {}", keySet.size());

                while (iter.hasNext()) {
                    SelectionKey key = iter.next();
                    log.debug("Selection Key: {}", key);

                    if (key.isAcceptable()) {
                        ServerSocketChannel channel = (ServerSocketChannel) key.channel();
                        SocketChannel sc = channel.accept();
                        sc.configureBlocking(false);
                        ByteBuffer buffer = ByteBuffer.allocate(16);
                        sc.register(selector, SelectionKey.OP_READ, buffer);
                        log.debug("sc Key: {}", sc);
                        iter.remove();
                    } else if (key.isReadable()) {
                        SocketChannel channel = (SocketChannel) key.channel();
                        ByteBuffer buf = (ByteBuffer) key.attachment();

                        try {
                            int read = channel.read(buf);
                            log.debug("read: {}", read);
                            if (read <= 0) {
                                key.cancel();
                                channel.close();
                            } else {
                                split(buf);
                                if (buf.position() == buf.limit()) {
                                    ByteBuffer newBuf = ByteBuffer.allocate(buf.capacity() * 2);
                                    buf.flip();
                                    newBuf.put(buf);
                                    key.attach(newBuf);
                                }
                            }
                        } catch (IOException e) {
                            e.printStackTrace();
                            key.cancel();
                        } finally {
                            iter.remove();
                        }
                    }
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    private static void split(ByteBuffer buffer) {
        buffer.flip();
        for(int i=0; i<buffer.limit(); i++) {
            if (buffer.get(i) == '\n') {
                int length = i + 1 - buffer.position();
                ByteBuffer target = ByteBuffer.allocate(length);
                for(int j=0; j<length; j++) {
                    target.put(buffer.get());
                }
                debugAll(target);
            }
        }
        buffer.compact();
    }
}

运行结果:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
22:18:16 [DEBUG] [main] c.s.n.c.MSGBoundary - Register Key: channel=sun.nio.ch.ServerSocketChannelImpl[unbound], selector=sun.nio.ch.WindowsSelectorImpl@4f51b3e0, interestOps=16, readyOps=0
22:18:20 [DEBUG] [main] c.s.n.c.MSGBoundary - Selection Key: channel=sun.nio.ch.ServerSocketChannelImpl[/[0:0:0:0:0:0:0:0]:7999], selector=sun.nio.ch.WindowsSelectorImpl@4f51b3e0, interestOps=16, readyOps=16
22:18:20 [DEBUG] [main] c.s.n.c.MSGBoundary - sc Key: java.nio.channels.SocketChannel[connected local=/127.0.0.1:7999 remote=/127.0.0.1:52604]
22:18:20 [DEBUG] [main] c.s.n.c.MSGBoundary - Selection Key: channel=java.nio.channels.SocketChannel[connected local=/127.0.0.1:7999 remote=/127.0.0.1:52604], selector=sun.nio.ch.WindowsSelectorImpl@4f51b3e0, interestOps=1, readyOps=1
22:18:20 [DEBUG] [main] c.s.n.c.MSGBoundary - read: 16
22:18:20 [DEBUG] [main] c.s.n.c.MSGBoundary - Selection Key: channel=java.nio.channels.SocketChannel[connected local=/127.0.0.1:7999 remote=/127.0.0.1:52604], selector=sun.nio.ch.WindowsSelectorImpl@4f51b3e0, interestOps=1, readyOps=1
22:18:20 [DEBUG] [main] c.s.n.c.MSGBoundary - read: 8

+--------+-------------------- all ------------------------+----------------+
position: [24], limit: [24]
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 48 65 6c 6c 6f 2c 20 57 6f 72 6c 64 21 20 2d 2d |Hello, World! --|
|00000010| 73 69 64 31 30 74 2e 0a                         |sid10t..        |
+--------+-------------------------------------------------+----------------+

这里还需要考虑一个问题,就是 Bytebuffer 的大小,ByteBuffer 不能太大,比如一个 ByteBuffer 的大小为 1MB 的话,如果要支持百万连接就要 1TB 内存,因此需要设计大小可变的 ByteBuffer:

  • 一种思路是首先分配一个较小的 buffer,例如 4k,如果发现数据不够,再分配 8k 的 buffer,将 4k buffer 内容拷贝至 8k buffer,优点是消息连续容易处理,缺点是数据拷贝耗费性能;
  • 另一种思路是用多个数组组成 buffer,一个数组不够,把多出来的内容写入新的数组,与前面的区别是消息存储不连续解析复杂,优点是避免了拷贝引起的性能损耗;

可写事件

服务器通过 Buffer 向通道中写入数据时,可能会遇到通道容量小于 Buffer 中的数据大小,导致无法一次性将 Buffer 中的数据全部写入到 Channel 中,这时便需要分多次写入,通过 hasRemaining() 方法来判断 Buffer 中是否还有数据,代码如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
StringBuilder sb = new StringBuilder();  
for (int i = 0; i < 5000000; i++) {  
    sb.append("sidiot");  
}  
ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString());  
while (buffer.hasRemaining()) {  
    int write = sc.write(buffer);  
    System.out.println(write);  
}

客户端通过循环来接收数据,代码如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
int cnt = 0;  
while (true) {  
    ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);  
    cnt += sc.read(buffer);  
    System.out.println(cnt);  
    buffer.clear();  
}

运行结果:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
# WriteServer
4718556
3014633
4063201
0
4718556
0
2490349
0
0
2621420
0
2621420
0
2621420
0
2621420
...
509025

上述结果出现 0 是因为缓冲区还没消费完, 无法进行写入,这样子会导致滞留在此,性能低下;

接下来,优化一下代码,通过 Selector 进行处理,提高效率:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
while (iter.hasNext()) {
    SelectionKey key = iter.next();
    iter.remove();
    if (key.isAcceptable()) {
        ServerSocketChannel channel = (ServerSocketChannel) key.channel();
        SocketChannel sc = channel.accept();
        sc.configureBlocking(false);
        SelectionKey scKey = sc.register(selector, 0, null);
        scKey.interestOps(SelectionKey.OP_READ);

        // 1. 向客户端发送大量数据
        StringBuilder sb = new StringBuilder();
        for (int i = 0; i < 5000000; i++) {
            sb.append("sidiot");
        }
        ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString());

        // 2. 返回值代表实际写入的字节数
        int write = sc.write(buffer);
        System.out.println(write);

        // 3. 判断是否有剩余内存
        if (buffer.hasRemaining()) {
            // 4. 关注可写事件 1+4
            scKey.interestOps(scKey.interestOps() + SelectionKey.OP_WRITE);
            // 5. 将未写完的数据挂到 scKey 上
            scKey.attach(buffer);
        }
    } else if (key.isWritable()) {
        ByteBuffer buffer = (ByteBuffer) key.attachment();
        SocketChannel sc = (SocketChannel) key.channel();
        int write = sc.write(buffer);
        System.out.println(write);
        // 6. 清理操作
        if (!buffer.hasRemaining()) {
            key.attach(null);
            key.interestOps(key.interestOps() - SelectionKey.OP_WRITE); // 不在关注写事件
        }
    }
}

注意,这里需要使用组合事件类型,即 scKey.interestOps(scKey.interestOps() + SelectionKey.OP_WRITE);,如果只有 SelectionKey.OP_WRITE 意味着写事件覆盖原先的读事件;

运行结果:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
# WriteServer
2097136
2621420
3014633
3801059
6815692
393213
2228207
393213
2228207
5242840
1164380

# WriteClient
131071
262142
393213
524284
...
29622046
29753117
29884188
30000000

后记

Java NIO 中,消息边界和可写事件是关键概念,它们对于构建高性能的网络应用程序至关重要。

通过合理处理消息边界,我们可以确保数据的正确性和完整性。另一方面,可写事件的处理对于保障系统的稳定性和可靠性同样至关重要。

通过充分理解和灵活运用这两个概念,我们能够设计出高效、稳定的网络系统,满足用户需求并提升用户体验。

以上就是 消息边界与可写事件 的所有内容了,希望本篇博文对大家有所帮助!

参考:

📝 上篇精讲:「NIO」(三)剖析 Selector 💖 我是 𝓼𝓲𝓭𝓲𝓸𝓽,期待你的关注,创作不易,请多多支持; 👍 公众号:sidiot的技术驿站; 🔥 系列专栏:探索 Netty:源码解析与应用案例分享

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2023-05-18,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
【Netty】「NIO」(三)剖析 Selector
本篇博文是《从0到1学习 Netty》中 NIO 系列的第三篇博文,主要内容是介绍通过使用 Selector,一个单独的线程可以有效地监视多个通道,从而提高应用程序的处理效率,往期系列文章请访问博主的 Netty 专栏,博文中的所有代码全部收集在博主的 GitHub 仓库中;
sidiot
2023/08/30
3100
【Netty】「NIO」(三)剖析 Selector
Netty入门之可写事件以及多线程版的通信
通过上述结果我们不难发现这个server端发送数据的时候并不是一次全部发送出去的,他尝试了很多次,效率很低, 并且有的时候Buffer是满的( server端打印0的时候,它是无法写的)他也无法发送,这样其实无法满足非阻塞模式的,接下来进行一个优化: 当buffer满的时候,我去进行别的操作,当buffer清空了触发一个写事件 上代码:
@派大星
2023/07/15
2090
Netty入门之可写事件以及多线程版的通信
Netty入门之消息边界处理以及ByteBuffer大小分配
以上三篇内容主要讲了NIO的三大组件、ByteBuffer、文件编程、阻塞、非阻塞、 Selector等,需要了解像详情的请移步查看。
@派大星
2023/06/28
2420
Netty入门之消息边界处理以及ByteBuffer大小分配
Netty-nio
channel 有一点类似于 stream,它就是读写数据的双向通道,可以从 channel 将数据读入 buffer,也可以将 buffer 的数据写入 channel,而之前的 stream 要么是输入,要么是输出,channel 比 stream 更为底层
sgr997
2022/11/10
7150
Netty-nio
🎯 Java NIO 基础
✏️ 写在前面的话: Netty本质是一个NIO框架,适用于服务器通讯相关的多种应用场景。 Netty作为一款基于Java开发的高性能网络框架,想要从认识到熟悉再到掌握最终理解,因此我们需要从最基础的NIO开始学习。如果你已经学习并掌握了NIO相关知识,那么可以直接进入Netty相关文章的学习;如果没有了解过也没有关系,那我们就从当前文章开始学习吧!🎉🎉🎉 这里我们先简单了解一下这一篇文章中我们将要学习的内容: 首先是NIO的基本介绍,了解NIO的三大组件 ByteBuffer 字节缓冲区的基本使用
爱吃糖的范同学
2023/02/11
8260
Netty 入门篇 Day 3---网络编程
在阻塞模式下,会导致 线程暂停 ssc.accept(); // 阻塞的方法 会导致线程暂停,一直等到有client连接 才继续工作 channel.read(buffer); // 阻塞的方法 会导致线程暂停,一直等client发送信息 才继续进行读操作 服务器端的单线程模式下,阻塞方法会导致这个线程暂停(闲置); 同时 多个client相到受影响,几乎不能正确工作,需要服务器端的多线程支持 服务器端的多线程模式缺点:1) 占用内存多 2)多线程切换,带来比较大的内存开销
猫头虎
2024/04/08
1120
Netty 入门篇 Day 3---网络编程
Netty网络编程第一卷
channel 有一点类似于 stream,它就是读写数据的双向通道,可以从 channel 将数据读入 buffer,也可以将 buffer 的数据写入 channel,而之前的 stream 要么是输入,要么是输出,channel 比 stream 更为底层
大忽悠爱学习
2022/05/06
7110
Netty网络编程第一卷
NIO:为什么Selector的selectedKeys遍历处理事件后要移除?
接着,切换到客户端的调试模式窗口,按Alt+F8,或者点击Evalute图标,打开评估器,切换成代码模式:
借力好风
2021/10/27
1.4K0
NIO:为什么Selector的selectedKeys遍历处理事件后要移除?
【Netty】「NIO」(五)多线程优化
本篇博文是《从0到1学习 Netty》中 NIO 系列的第五篇博文,主要内容是使用多线程对程序进行优化,充分利用 CPU 的能力,往期系列文章请访问博主的 Netty 专栏,博文中的所有代码全部收集在博主的 GitHub 仓库中;
sidiot
2023/08/30
4010
【Netty】「NIO」(五)多线程优化
Netty入门之网络编程
以上两篇内容主要讲了NIO的三大组件、ByteBuffer、文件编程等,需要了解像详情的请移步查看。
@派大星
2023/06/28
1630
Netty入门之网络编程
012. NIO 非阻塞网络编程
1. Java NIO ---- 始于 Java1.4,提供了新的 JAVA IO 操作非阻塞 API。用意是替代 Java IO 和 Java Networking 相关的 API。 三个核心组件 Buffer 缓冲区 Channel 通道 Selector 选择器 2. Buffer 缓冲区 ---- 1. 介绍 缓冲区本质上是一个可以写入数据的内存块(类似数组),然后可以再次读取。此内存块包含在 NIO Buffer 对象中,该对象提供了一组方法,可以更轻松地使用内存块。 相比较直接对数组的操
山海散人
2021/03/03
4050
012. NIO 非阻塞网络编程
NIO非阻塞网络编程三大核心理念
1.capacity 容量:作为一个内存块,Buffer具有一定的固定大小,也称为【容量】。 2.position 位置:写入模式时代表写数据的位置。读取模式时代表读取数据的位置。 3.limit 限制:写入模式,限制等于buffer的容量,读取模式下,limit等于写入的数据量。
IT架构圈
2020/12/02
3990
NIO非阻塞网络编程三大核心理念
Reactor模型-多线程程版
在Reactor单线程版本的设计中,I/O任务乃至业务逻辑都由Reactor线程来完成,这无疑增加了Reactor线程的负担,高负载情况下必然会出现性能瓶颈。此外,对于多处理器的服务器来说,单个Reactor线程也发挥不了多CPU的最大功效。下面我们对之前单线程版的Reactor进行改进。
topgunviper
2022/05/12
3440
NIO简述
一个 Buffer 本质上是内存中的一块,我们可以将数据写入这块内存,之后从这块内存获取数据
leobhao
2022/06/28
3070
NIO简述
java架构之路-(netty专题)初步认识BIO、NIO、AIO
  本次我们主要来说一下我们的IO阻塞模型,只是不多,但是一定要理解,对于后面理解netty很重要的
小菜的不能再菜
2020/02/21
4570
Java中的NIO基础知识
上一篇介绍了五种NIO模型,本篇将介绍Java中的NIO类库,为学习netty做好铺垫
Janti
2018/08/01
5460
Java中的NIO基础知识
Java NIO 实现网络通信
Java NIO 的相关资料很多,对 channel,buffer,selector 如何相关概念也有详细的阐述。但是,不亲自写代码调试一遍,对这些概念的理解仍然是一知半解。
水货程序员
2018/11/13
1K0
NIO全解析说明
Java NIO是一个用来替代标准Java IO API的新型数据传递方式,像现在分布式架构中会经常存在他的身影。其比传统的IO更加高效,非阻塞,异步,双向
迹_Jason
2019/05/30
8170
Java NIO Selector 使用
之前的文章已经把 Java 中 NIO 的 Buffer、Channel 讲解完了,不太了解的可以先回过头去看看。这篇文章我们就来聊聊 Selector —— 选择器。
SH的全栈笔记
2022/08/17
3330
Java NIO Selector 使用
终结全网!手写Netty面试题答案
创建一个线程,注册到 Selector,将 serversocketchannel 注册到Selector selectionKey 里就有具体的事件
JavaEdge
2021/10/18
2240
相关推荐
【Netty】「NIO」(三)剖析 Selector
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验