Loading [MathJax]/jax/output/CommonHTML/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >【Netty】「NIO」(五)多线程优化

【Netty】「NIO」(五)多线程优化

作者头像
sidiot
发布于 2023-08-30 07:16:20
发布于 2023-08-30 07:16:20
40300
代码可运行
举报
文章被收录于专栏:技术大杂烩技术大杂烩
运行总次数:0
代码可运行

前言

本篇博文是《从0到1学习 Netty》中 NIO 系列的第五篇博文,主要内容是使用多线程对程序进行优化,充分利用 CPU 的能力,往期系列文章请访问博主的 Netty 专栏,博文中的所有代码全部收集在博主的 GitHub 仓库中;

引入

这前几篇文章中,都是采用单线程进行设计,虽然可以运行,但是没有充分利用 CPU 的性能,并且如果有一个事件的处理时间较长,则会影响其他事件的处理。

例如,开发一个项目,如果团队只有一个全栈工程师,那么他需要先完成前端,再完成后端,只能按部就班的完成任务,如果前端开发遭遇困难,花费了很多时间,则会大大拉长项目开发周期,而如果一个团队里有前端工程师和后端工程师,则前后端的开发能同步进行,这样会大大提高开发效率。

同理,对之前的代码进行优化,分两组选择器:

  • 选择一个线程配置一个选择器,作为 ‘Boss’,专门处理 accept 事件
  • 创建多个线程(最好与 CPU 核心数一直),作为 ‘Worker’,每个线程配置一个选择器,轮流处理 read**,**write 等事件

实现

1、创建一个 Boss 线程,负责处理 accept 事件类型:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
Thread.currentThread().setName("Boss");
Selector boss = Selector.open();
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
SelectionKey bossKey = ssc.register(boss, 0, null);
bossKey.interestOps(SelectionKey.OP_ACCEPT);
ssc.bind(new InetSocketAddress(7999));

while (true) {
    boss.select();
    Iterator<SelectionKey> iter = boss.selectedKeys().iterator();

    while (iter.hasNext()) {
        SelectionKey key = iter.next();
        iter.remove();
        if (key.isAcceptable()) {
            ServerSocketChannel channel = (ServerSocketChannel) key.channel();
            SocketChannel sc = channel.accept();
            sc.configureBlocking(false);
        }
    }
}

2、创建 Worker 类,用于初始化 Worker 线程和 Selector,负责处理 read 事件类型:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
class Worker implements Runnable{
    private Thread thread;
    private volatile Selector worker;
    private String name;

    public Worker(String name) {
        this.name = name;
    }

    public void register() throws IOException {
        this.thread = new Thread(this, this.name);
        this.worker = Selector.open();
        this.thread.start();
    }
    
    @Override
    public void run() {
        while (true) {
            try {
                this.worker.select();
                Iterator<SelectionKey> iter = this.worker.selectedKeys().iterator();
                while (iter.hasNext()) {
                    SelectionKey key = iter.next();
                    iter.remove();
                    if (key.isReadable()) {
                        ByteBuffer buffer = ByteBuffer.allocate(16);
                        SocketChannel channel = (SocketChannel) key.channel();
                        channel.read(buffer);
                        buffer.flip();
                        debugAll(buffer);
                    }
                }
            } catch (IOException e) {
                throw new RuntimeException(e);
            }

        }
    }
}

但是这里会有个问题,每次进行 register() 的时候会新创建一个线程,但我们只想一个 Worker 对应一个线程,所以我们需要对上述代码进行优化,使用标志符来进行判断是否完成过初始化:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
private volatile boolean start = false;

public void register() throws IOException {
    if (!this.start) {
        this.thread = new Thread(this, this.name);
        this.selector = Selector.open();
        this.thread.start();
        this.start = true;
    }
}

注意,this.worker = Selector.open();this.thread.start(); 不要写反了,不然之后运行会出现空指针异常:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
Exception in thread "worker-0" java.lang.NullPointerException
        at com.sidiot.netty.c3.MultiThreadServer$Worker.run(MultiThreadServer.java:75)
        at java.base/java.lang.Thread.run(Thread.java:832)

3、将 Worker 进行关联,先创建一个 worker 线程:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
Worker worker0 = new Worker("worker-0");
worker0.register();

while (true) {
    ...
    while (iter.hasNext()) {
        ...
        if (key.isAcceptable()) {
            ...
            log.debug("connected... {}", sc.getRemoteAddress());
            log.debug("before register {}", sc.getRemoteAddress());
            sc.register(worker0.selector, SelectionKey.OP_READ, null);
            log.debug("after register {}", sc.getRemoteAddress());
        }
    }
}

4、编写客户端:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public class MultiThreadClient {  
    public static void main(String[] args) throws IOException {  
        SocketChannel sc = SocketChannel.open();  
        sc.connect(new InetSocketAddress("localhost", 7999));  
        sc.write(Charset.defaultCharset().encode("Hello, World! --sidiot."));  
        System.in.read();  
    }  
}

5、运行服务端和客户端,运行结果如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
20:30:30 [DEBUG] [Boss] c.s.n.c.MultiThreadServer - connected... /127.0.0.1:50612
20:30:30 [DEBUG] [Boss] c.s.n.c.MultiThreadServer - before register /127.0.0.1:50612
20:30:30 [DEBUG] [Boss] c.s.n.c.MultiThreadServer - after register /127.0.0.1:50612

发现 worker 并没有进行工作,或者说是客户端发送的数据并没有进入到 worker 的可读事件中,这是因为在 worker 的 run() 方法运行时,SocketChannel 还没有注册到 worker 的 selector 中,导致 worker 线程在 this.selector.select(); 的位置发生了阻塞;

6、由于 sc.register 发生在 boss 线程中,而 select 发生在 worker 线程中,无法确定两个线程的执行顺序,因此需要把两步操作都放入一个线程中;

SocketChannel 传到到 Worker 的 register() 方法中:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public void register(SocketChannel sc) throws IOException {  
    if (!this.start) {  
        this.thread = new Thread(this, this.name);  
        this.selector = Selector.open();  
        this.thread.start();  
        this.start = true;  
    }  
  
    sc.register(this.selector, SelectionKey.OP_READ, null);  
}

但这样还是不行的,因为 register() 方法还是在 boss 线程中执行,这就需要使用队列来完成线程间的通信了:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
private ConcurrentLinkedQueue<Runnable> queue = new ConcurrentLinkedQueue<>();

public void register(SocketChannel sc) throws IOException {  
    ...
  
    this.queue.add(() -> {  
        try {  
            sc.register(this.selector, SelectionKey.OP_READ, null);  
        } catch (ClosedChannelException e) {  
            throw new RuntimeException(e);  
        }  
    });  
    
    this.selector.wakeup();
}

@Override  
public void run() {  
    while (true) {  
        try {  
            this.selector.select();  
            Runnable task = this.queue.poll();  
            if (task != null) {  
                task.run();  
            }  
            Iterator<SelectionKey> iter = this.selector.selectedKeys().iterator();
            ...
    }
}

注意,这里需要 this.selector.wakeup(); 来唤醒 selector 继续往下走;

还有另一种方法,参考代码点击这里;

7、将单线程 worker 转成多线程:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
Worker[] workers = new Worker[4];  
for (int i = 0; i < workers.length; i++) {  
    workers[i] = new Worker("worker-" + i);  
}

同时使用计数器来实现各个 worker 线程的轮询使用:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
AtomicInteger index = new AtomicInteger();

while (true) {  
    ...
    while (iter.hasNext()) {  
        ...
        if (key.isAcceptable()) {  
            ...
            workers[index.getAndIncrement() % workers.length].register(sc);  
        }  
    }  
}

运行结果:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
22:36:13 [DEBUG] [Boss] c.s.n.c.MultiThreadServer - connected... /127.0.0.1:54668
22:36:13 [DEBUG] [Boss] c.s.n.c.MultiThreadServer - before register /127.0.0.1:54668
22:36:13 [DEBUG] [Boss] c.s.n.c.MultiThreadServer - after register /127.0.0.1:54668
22:36:13 [DEBUG] [worker-0] c.s.n.c.MultiThreadServer - read... /127.0.0.1:54668
+--------+-------------------- all ------------------------+----------------+
position: [0], limit: [7]
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 73 69 64 69 6f 74 2e 00 00 00 00 00 00 00 00 00 |sidiot..........|
+--------+-------------------------------------------------+----------------+

22:36:20 [DEBUG] [Boss] c.s.n.c.MultiThreadServer - connected... /127.0.0.1:54676
22:36:20 [DEBUG] [Boss] c.s.n.c.MultiThreadServer - before register /127.0.0.1:54676
22:36:20 [DEBUG] [Boss] c.s.n.c.MultiThreadServer - after register /127.0.0.1:54676
22:36:20 [DEBUG] [worker-1] c.s.n.c.MultiThreadServer - read... /127.0.0.1:54676
+--------+-------------------- all ------------------------+----------------+
position: [0], limit: [7]
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 73 69 64 69 6f 74 2e 00 00 00 00 00 00 00 00 00 |sidiot..........|
+--------+-------------------------------------------------+----------------+

22:36:30 [DEBUG] [Boss] c.s.n.c.MultiThreadServer - connected... /127.0.0.1:54687
22:36:30 [DEBUG] [Boss] c.s.n.c.MultiThreadServer - before register /127.0.0.1:54687
22:36:30 [DEBUG] [Boss] c.s.n.c.MultiThreadServer - after register /127.0.0.1:54687
22:36:30 [DEBUG] [worker-0] c.s.n.c.MultiThreadServer - read... /127.0.0.1:54687
+--------+-------------------- all ------------------------+----------------+
position: [0], limit: [7]
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 73 69 64 69 6f 74 2e 00 00 00 00 00 00 00 00 00 |sidiot..........|
+--------+-------------------------------------------------+----------------+

后记

综上所述,多线程优化是在 Java NIO 中提高系统性能和响应能力的关键手段。通过引入并发处理机制、合理的线程管理策略以及有效的同步与通信机制,可以充分发挥 NIO 框架的优势,提升系统的效率与可扩展性。

以上就是 多线程优化 的所有内容了,希望本篇博文对大家有所帮助!

参考:

📝 上篇精讲:「NIO」(四)消息边界与可写事件 💖 我是 𝓼𝓲𝓭𝓲𝓸𝓽,期待你的关注,创作不易,请多多支持; 👍 公众号:sidiot的技术驿站; 🔥 系列专栏:探索 Netty:源码解析与应用案例分享

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2023-05-19,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
Netty入门之可写事件以及多线程版的通信
通过上述结果我们不难发现这个server端发送数据的时候并不是一次全部发送出去的,他尝试了很多次,效率很低, 并且有的时候Buffer是满的( server端打印0的时候,它是无法写的)他也无法发送,这样其实无法满足非阻塞模式的,接下来进行一个优化: 当buffer满的时候,我去进行别的操作,当buffer清空了触发一个写事件 上代码:
@派大星
2023/07/15
2100
Netty入门之可写事件以及多线程版的通信
【Netty】「NIO」(三)剖析 Selector
本篇博文是《从0到1学习 Netty》中 NIO 系列的第三篇博文,主要内容是介绍通过使用 Selector,一个单独的线程可以有效地监视多个通道,从而提高应用程序的处理效率,往期系列文章请访问博主的 Netty 专栏,博文中的所有代码全部收集在博主的 GitHub 仓库中;
sidiot
2023/08/30
3120
【Netty】「NIO」(三)剖析 Selector
【Netty】「NIO」(四)消息边界与可写事件
本篇博文是《从0到1学习 Netty》中 NIO 系列的第四篇博文,主要内容是介绍如何处理消息边界以及通过可写事件解决写入内容过多的问题,往期系列文章请访问博主的 Netty 专栏,博文中的所有代码全部收集在博主的 GitHub 仓库中;
sidiot
2023/08/30
2470
Netty-nio
channel 有一点类似于 stream,它就是读写数据的双向通道,可以从 channel 将数据读入 buffer,也可以将 buffer 的数据写入 channel,而之前的 stream 要么是输入,要么是输出,channel 比 stream 更为底层
sgr997
2022/11/10
7150
Netty-nio
【Netty】「NIO」(二)阻塞模式与非阻塞模式
本篇博文是《从0到1学习 Netty》中 NIO 系列的第二篇博文,主要内容是通过 NIO 来理解阻塞模式与非阻塞模式,往期系列文章请访问博主的 Netty 专栏,博文中的所有代码全部收集在博主的 GitHub 仓库中;
sidiot
2023/08/30
5080
【Netty】「NIO」(二)阻塞模式与非阻塞模式
Netty网络编程第一卷
channel 有一点类似于 stream,它就是读写数据的双向通道,可以从 channel 将数据读入 buffer,也可以将 buffer 的数据写入 channel,而之前的 stream 要么是输入,要么是输出,channel 比 stream 更为底层
大忽悠爱学习
2022/05/06
7110
Netty网络编程第一卷
NIO如何多线程操作 顶
因为NIO本身是非阻塞的,所以他的消息选择器Selector可以在单线程下连接多台客户端的访问。
算法之名
2019/08/20
4980
🎯 Java NIO 基础
✏️ 写在前面的话: Netty本质是一个NIO框架,适用于服务器通讯相关的多种应用场景。 Netty作为一款基于Java开发的高性能网络框架,想要从认识到熟悉再到掌握最终理解,因此我们需要从最基础的NIO开始学习。如果你已经学习并掌握了NIO相关知识,那么可以直接进入Netty相关文章的学习;如果没有了解过也没有关系,那我们就从当前文章开始学习吧!🎉🎉🎉 这里我们先简单了解一下这一篇文章中我们将要学习的内容: 首先是NIO的基本介绍,了解NIO的三大组件 ByteBuffer 字节缓冲区的基本使用
爱吃糖的范同学
2023/02/11
8260
「高并发通信框架Netty4 源码解读(八)」NIO应用——聊天案例及Reactor线程模式
上面的聊天案例无论是服务端还是客户端,都是单线程的,所有的链接及读写都是在一个main方法所在的主线程内运行。
源码之路
2020/09/04
1.8K0
「高并发通信框架Netty4 源码解读(八)」NIO应用——聊天案例及Reactor线程模式
NIO:为什么Selector的selectedKeys遍历处理事件后要移除?
接着,切换到客户端的调试模式窗口,按Alt+F8,或者点击Evalute图标,打开评估器,切换成代码模式:
借力好风
2021/10/27
1.4K0
NIO:为什么Selector的selectedKeys遍历处理事件后要移除?
Netty入门之消息边界处理以及ByteBuffer大小分配
以上三篇内容主要讲了NIO的三大组件、ByteBuffer、文件编程、阻塞、非阻塞、 Selector等,需要了解像详情的请移步查看。
@派大星
2023/06/28
2440
Netty入门之消息边界处理以及ByteBuffer大小分配
java架构之路-(netty专题)初步认识BIO、NIO、AIO
  本次我们主要来说一下我们的IO阻塞模型,只是不多,但是一定要理解,对于后面理解netty很重要的
小菜的不能再菜
2020/02/21
4570
Netty 入门篇 Day 3---网络编程
在阻塞模式下,会导致 线程暂停 ssc.accept(); // 阻塞的方法 会导致线程暂停,一直等到有client连接 才继续工作 channel.read(buffer); // 阻塞的方法 会导致线程暂停,一直等client发送信息 才继续进行读操作 服务器端的单线程模式下,阻塞方法会导致这个线程暂停(闲置); 同时 多个client相到受影响,几乎不能正确工作,需要服务器端的多线程支持 服务器端的多线程模式缺点:1) 占用内存多 2)多线程切换,带来比较大的内存开销
猫头虎
2024/04/08
1140
Netty 入门篇 Day 3---网络编程
netty系列之:NIO和netty详解
netty为什么快呢?这是因为netty底层使用了JAVA的NIO技术,并在其基础上进行了性能的优化,虽然netty不是单纯的JAVA nio,但是netty的底层还是基于的是nio技术。
程序那些事
2022/03/09
6430
Netty01-nio
channel 有一点类似于 stream,它就是读写数据的双向通道,可以从 channel 将数据读入 buffer,也可以将 buffer 的数据写入 channel,而之前的 stream 要么是输入,要么是输出,channel 比 stream 更为底层
海仔
2021/05/18
1.1K0
Netty01-nio
Java中的NIO基础知识
上一篇介绍了五种NIO模型,本篇将介绍Java中的NIO类库,为学习netty做好铺垫
Janti
2018/08/01
5480
Java中的NIO基础知识
Netty-优化与源码
提供两个实现,我这里直接将实现加入了枚举类 Serializer.Algorithm 中
sgr997
2022/11/10
5120
Netty-优化与源码
NIO源码阅读
  自己对着源码敲一遍练习,写上注释。发现NIO编程难度好高啊。。虽然很复杂,但是NIO编程的有点还是很多:
用户3003813
2018/09/06
5020
NIO源码阅读
NIO之多线程协作处理数据读写
经过前面几章的学习,我们已经 能够掌握了JDK NIO的开发方式,我们来总结一下NIO开发的流程:
止术
2021/07/16
8200
NIO之多线程协作处理数据读写
netty源码分析一之NioServerSocketChannel
这里调用的也是SelectorProvider.provider()的openServerSocketChannel方法。
山行AI
2019/10/21
7290
netty源码分析一之NioServerSocketChannel
相关推荐
Netty入门之可写事件以及多线程版的通信
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验