Loading [MathJax]/jax/output/CommonHTML/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >并发设计模式实战系列(18):反应器(Reactor)

并发设计模式实战系列(18):反应器(Reactor)

作者头像
摘星.
发布于 2025-05-20 07:08:07
发布于 2025-05-20 07:08:07
10100
代码可运行
举报
文章被收录于专栏:博客专享博客专享
运行总次数:0
代码可运行
🌟 大家好,我是摘星! 🌟

今天为大家带来的是并发设计模式实战系列,第十八章反应器(Reactor),废话不多说直接开始~

一、核心原理深度拆解

1. 事件驱动架构
  • Reactor核心:单线程事件循环(while(true) { selector.select(); }
  • 事件分离器:通过Selector实现I/O多路复用(Linux epoll/kqueue)
  • 事件处理器:实现ChannelHandler接口处理具体业务
2. 高性能关键设计
  • 非阻塞I/O:所有Channel必须配置为non-blocking
  • 零拷贝优化:使用ByteBuffer直接读写内核缓冲区
  • 避免线程切换:I/O操作与业务处理在同一线程完成

二、生活化类比:餐厅点餐系统

系统组件

现实类比

核心行为

Reactor

前台接待员

监听顾客举手信号

Selector

座位呼叫器

哪个桌位需要服务就亮灯

EventHandler

服务员

处理具体点餐、上菜请求

  • 高效原理:1个接待员管理10个服务员(传统模式:1顾客配1服务员)

三、Java代码实现(NIO原生版)

1. 完整可运行代码
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
import java.nio.*;
import java.nio.channels.*;
import java.util.*;

public class Reactor implements Runnable {
    final Selector selector;
    final ServerSocketChannel serverSocket;

    // 启动方法
    public Reactor(int port) throws Exception {
        selector = Selector.open();
        serverSocket = ServerSocketChannel.open();
        serverSocket.socket().bind(new InetSocketAddress(port));
        serverSocket.configureBlocking(false);
        SelectionKey sk = serverSocket.register(selector, SelectionKey.OP_ACCEPT);
        sk.attach(new Acceptor());
    }

    // 事件循环核心
    public void run() {
        try {
            while (!Thread.interrupted()) {
                selector.select();
                Set<SelectionKey> selected = selector.selectedKeys();
                Iterator<SelectionKey> it = selected.iterator();
                while (it.hasNext()) {
                    dispatch(it.next());
                }
                selected.clear();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    void dispatch(SelectionKey key) {
        Runnable r = (Runnable) key.attachment();
        if (r != null) r.run();
    }

    // 连接处理器
    class Acceptor implements Runnable {
        public void run() {
            try {
                SocketChannel c = serverSocket.accept();
                if (c != null) new Handler(selector, c);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

// 业务处理器
class Handler implements Runnable {
    final SocketChannel socket;
    final SelectionKey sk;
    ByteBuffer input = ByteBuffer.allocate(1024);

    Handler(Selector sel, SocketChannel c) throws Exception {
        socket = c;
        c.configureBlocking(false);
        sk = socket.register(sel, 0);
        sk.attach(this);
        sk.interestOps(SelectionKey.OP_READ);
        sel.wakeup(); // 唤醒selector
    }

    public void run() {
        try {
            if (sk.isReadable()) read();
            else if (sk.isWritable()) write();
        } catch (Exception e) {
            close();
        }
    }

    void read() throws Exception {
        socket.read(input);
        if (inputIsComplete()) {
            process();
            sk.interestOps(SelectionKey.OP_WRITE);
        }
    }

    void write() throws Exception {
        if (outputIsComplete()) sk.cancel();
    }

    boolean inputIsComplete() { /* 解析协议头判断 */ return true; }
    boolean outputIsComplete() { /* 判断写入完成 */ return true; }
    void process() { /* 业务处理逻辑 */ }
    void close() { /* 资源释放 */ }
}

// 启动类
class Main {
    public static void main(String[] args) throws Exception {
        new Thread(new Reactor(8080)).start();
    }
}
2. 关键配置说明
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
// 重要参数调优
serverSocket.setOption(StandardSocketOptions.SO_REUSEADDR, true);
socket.setOption(StandardSocketOptions.TCP_NODELAY, true);

// Buffer分配策略
ByteBuffer.allocateDirect(1024); // 直接内存,减少拷贝

四、横向对比表格

1. Reactor变体对比

类型

线程模型

适用场景

JDK实现案例

单线程Reactor

所有操作单线程

轻量级应用

Redis单线程模型

多线程Reactor

I/O多路复用+线程池

计算密集型业务

Netty主从线程组

多Reactor

多Selector分级处理

超高并发连接

Nginx事件处理

2. 与传统模式对比

指标

Thread-Per-Connection

Reactor模式

连接数支持

数百级

百万级

上下文切换

频繁

极少

内存消耗

每个连接1MB栈

共享少量缓冲区

延迟敏感性

一般

极佳


五、高级优化技巧

1. 多Reactor线程组
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
// 主从Reactor配置
Selector[] selectors = new Selector[Runtime.getRuntime().availableProcessors()];
for (int i = 0; i < selectors.length; i++) {
    selectors[i] = Selector.open();
    new Thread(new SubReactor(selectors[i])).start();
}
2. 零拷贝优化
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
/ FileChannel.transferTo实现零拷贝
fileChannel.transferTo(position, count, socketChannel);
3. 内存池化技术
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
// 使用Netty的ByteBuf内存池
ByteBufAllocator alloc = PooledByteBufAllocator.DEFAULT;
ByteBuf buffer = alloc.directBuffer(1024);
4. 监控指标
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
// 关键指标采集
selector.keys().size();      // 注册通道数
selector.selectNow();        // 就绪事件数
bufferPool.usedMemory();     // 内存使用量

六、Reactor模式进阶优化

6.1 多Reactor线程组实战
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
// 主从Reactor线程组实现
class MasterSlaveReactor {
    private final Selector masterSelector;
    private final Selector[] slaveSelectors;
    private final ExecutorService masterExecutor;
    private final ExecutorService[] slaveExecutors;

    public MasterSlaveReactor(int slaveCount) throws Exception {
        // 主Reactor负责连接接入
        masterSelector = Selector.open();
        masterExecutor = Executors.newSingleThreadExecutor(r -> new Thread(r, "Master-Reactor"));
        
        // 从Reactor负责IO读写
        slaveSelectors = new Selector[slaveCount];
        slaveExecutors = new ExecutorService[slaveCount];
        for (int i = 0; i < slaveCount; i++) {
            slaveSelectors[i] = Selector.open();
            slaveExecutors[i] = Executors.newSingleThreadExecutor(r -> new Thread(r, "Slave-Reactor-" + i));
        }
    }

    public void start() {
        // 主Reactor启动
        masterExecutor.execute(() -> {
            while (!Thread.interrupted()) {
                try {
                    masterSelector.select();
                    Set<SelectionKey> keys = masterSelector.selectedKeys();
                    Iterator<SelectionKey> it = keys.iterator();
                    while (it.hasNext()) {
                        SelectionKey key = it.next();
                        if (key.isAcceptable()) {
                            // 分配连接到从Reactor
                            int slaveIndex = key.attachment().hashCode() % slaveSelectors.length;
                            dispatchToSlave(slaveIndex, key);
                        }
                        it.remove();
                    }
                } catch (Exception e) { /* 处理异常 */ }
            }
        });

        // 从Reactor启动
        for (int i = 0; i < slaveSelectors.length; i++) {
            final int index = i;
            slaveExecutors[i].execute(() -> {
                while (!Thread.interrupted()) {
                    try {
                        slaveSelectors[index].select();
                        Set<SelectionKey> keys = slaveSelectors[index].selectedKeys();
                        // 处理IO读写(同基础版Handler逻辑)
                    } catch (Exception e) { /* 处理异常 */ }
                }
            });
        }
    }
}

优化点说明

  • 连接分配策略:采用哈希取模实现简单负载均衡
  • 线程隔离:读写操作分散到不同线程,避免单个Selector饱和
  • 资源控制:每个从Reactor独立线程处理,避免竞争

七、协议解析优化策略

7.1 零拷贝解析HTTP请求
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
// 基于FileChannel的零拷贝传输
void sendFile(SocketChannel channel, File file) throws Exception {
    try (FileInputStream fis = new FileInputStream(file)) {
        FileChannel fileChannel = fis.getChannel();
        long position = 0;
        long remaining = fileChannel.size();
        while (remaining > 0) {
            long transferred = fileChannel.transferTo(position, remaining, channel);
            position += transferred;
            remaining -= transferred;
        }
    }
}

// 内存映射解析大文件
ByteBuffer mapFile(String path) throws Exception {
    try (RandomAccessFile raf = new RandomAccessFile(path, "r")) {
        return raf.getChannel().map(FileChannel.MapMode.READ_ONLY, 0, raf.length());
    }
}
7.2 自定义协议设计模板
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
// 协议帧结构示例
class ProtocolFrame {
    byte magic;    // 魔数标识
    int length;    // 数据长度
    byte type;     // 协议类型
    byte[] data;   // 有效载荷

    // 使用ByteBuffer解析
    public static ProtocolFrame decode(ByteBuffer buffer) {
        ProtocolFrame frame = new ProtocolFrame();
        frame.magic = buffer.get();
        frame.length = buffer.getInt();
        frame.type = buffer.get();
        frame.data = new byte[frame.length];
        buffer.get(frame.data);
        return frame;
    }
}

八、生产环境问题解决方案

8.1 常见问题处理方案

问题现象

根本原因

解决方案

CPU 100%

空轮询Bug

1. 升级JDK版本 2. 添加select()超时时间 3. 使用Netty等成熟框架

内存泄漏

ByteBuffer未释放

1. 使用内存池技术 2. 实现引用计数 3. 添加JVM参数-XX:+DisableExplicitGC

连接数不均衡

哈希分配不均匀

1. 改用一致性哈希 2. 动态监测负载调整分配策略

8.2 性能监控指标采集
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
// 关键指标采集示例
class ReactorMetrics {
    void collectMetrics(Selector selector) {
        // 连接数监控
        int connectionCount = selector.keys().size() - 1; // 排除ServerSocketChannel
        
        // 事件处理耗时
        long start = System.nanoTime();
        selector.select(100);
        long latency = System.nanoTime() - start;
        
        // 内存使用监控
        long directMemory = ((sun.misc.VM) Class.forName("sun.misc.VM")
            .getMethod("maxDirectMemory").invoke(null));
    }
}

九、与Proactor模式对比

9.1 原理差异图解
9.2 工程选择建议

场景

推荐模式

理由

Linux平台

Reactor

原生支持epoll,社区方案成熟(Netty/libuv)

Windows平台

Proactor

IOCP是系统级实现,性能更优

混合业务

分层架构

底层用Reactor处理IO,上层用Proactor处理磁盘操作


十、现代框架中的演进

10.1 Netty的增强设计
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
// Netty线程模型配置示例
EventLoopGroup bossGroup = new NioEventLoopGroup(1);  // 主Reactor
EventLoopGroup workerGroup = new NioEventLoopGroup(); // 从Reactor

ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
 .channel(NioServerSocketChannel.class)
 .childHandler(new ChannelInitializer<SocketChannel>() {
     @Override
     public void initChannel(SocketChannel ch) {
         ch.pipeline().addLast(new HttpServerCodec());
         ch.pipeline().addLast(new CustomHandler());
     }
 });

Netty的优化

  1. 无锁化设计:每个Channel绑定固定EventLoop
  2. 内存池:Recycler对象池减少GC
  3. FastThreadLocal:比JDK实现快3倍
10.2 云原生适配
  • Kubernetes就绪探针:基于活跃连接数判断
  • 服务网格集成:通过xDS API动态调整线程池大小
  • Serverless适配:冷启动时延迟创建线程池
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2025-05-08,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
Netty基础—4.NIO的使用简介
在NIO中,所有的数据都是通过使用Buffer缓冲区来处理的。如果要通过NIO,将数据写到文件和网络或从文件和网络中读取数据,那么就需要使用Buffer缓冲区来进行处理。
东阳马生架构
2025/05/20
590
3万字加50张图,带你深度解析 Netty 架构与原理(上)
接下来我们会学习一个 Netty 系列教程,Netty 系列由「架构与原理」,「源码」,「架构」三部分组成,今天我们先来看看第一部分:Netty 架构与原理初探,大纲如下:
烂猪皮
2021/01/28
1K0
3万字加50张图,带你深度解析 Netty 架构与原理(上)
netty Reactor模式(源码死磕
1. 为什么是Reactor模式 2. Reactor模式简介 3. 多线程IO的致命缺陷 4. 单线程Reactor模型 4.1. 什么是单线程Reactor呢? 4.2. 单线程Reactor的参考代码 4.3. 单线程模式的缺点: 5. 多线程的Reactor 5.1. 基于线程池的改进 5.2. 改进后的完整示意图 5.3. 多线程Reactor的参考代码 6. Reactor持续改进 7. Reactor编程的优点和缺点 7.1. 优点 7.2. 缺点
py3study
2020/01/15
2.7K0
Netty系列| Netty创始人告诉你为什么选择NIO
NIO模型 同步非阻塞 NIO有同步阻塞和同步非阻塞两种模式,一般讲的是同步非阻塞,服务器实现模式为一个请求一个线程,但客户端发送的连接请求都会注册到多路复用器上,多路复用器轮询到连接有I/O请求时才启动一个线程进行处理。
狼王编程
2021/06/01
1.4K0
Netty系列| Netty创始人告诉你为什么选择NIO
🎯 Java NIO 基础
✏️ 写在前面的话: Netty本质是一个NIO框架,适用于服务器通讯相关的多种应用场景。 Netty作为一款基于Java开发的高性能网络框架,想要从认识到熟悉再到掌握最终理解,因此我们需要从最基础的NIO开始学习。如果你已经学习并掌握了NIO相关知识,那么可以直接进入Netty相关文章的学习;如果没有了解过也没有关系,那我们就从当前文章开始学习吧!🎉🎉🎉 这里我们先简单了解一下这一篇文章中我们将要学习的内容: 首先是NIO的基本介绍,了解NIO的三大组件 ByteBuffer 字节缓冲区的基本使用
爱吃糖的范同学
2023/02/11
8400
012. NIO 非阻塞网络编程
1. Java NIO ---- 始于 Java1.4,提供了新的 JAVA IO 操作非阻塞 API。用意是替代 Java IO 和 Java Networking 相关的 API。 三个核心组件 Buffer 缓冲区 Channel 通道 Selector 选择器 2. Buffer 缓冲区 ---- 1. 介绍 缓冲区本质上是一个可以写入数据的内存块(类似数组),然后可以再次读取。此内存块包含在 NIO Buffer 对象中,该对象提供了一组方法,可以更轻松地使用内存块。 相比较直接对数组的操
山海散人
2021/03/03
4190
012. NIO 非阻塞网络编程
45 张图深度解析 Netty 架构与原理
接下来我们会学习一个 Netty 系列教程,Netty 系列由「架构与原理」,「源码」,「架构」三部分组成,今天我们先来看看第一部分:Netty 架构与原理初探,大纲如下:
kunge
2020/11/27
25.9K4
手撕Netty-用Java NIO完成Netty Reactor思想,助你理解Netty模型事件驱动
这是Netty官网上的一段介绍。我们主要关注它的可维护,高性能,高可伸缩性,使用Netty可以简化网络编程,并且性能优秀!
行百里er
2020/12/02
5190
手撕Netty-用Java NIO完成Netty Reactor思想,助你理解Netty模型事件驱动
第十二节 netty前传-NIO 实现reactor模式
主要说下reactor模式:简单来说reactor模式用于同时处理一个或多个传递给服务端的请求的事件的处理模式。 然后,服务端处理程序解析输入别的请求,并将它们同步分派给与之关联的请求异步处理程序。不恰当可类比web页面事件,当点击某个按钮时,浏览器收到这个信号(监听),分派给相关的js处理程序处理(handler)。
用户1418372
2018/12/28
4450
02.Netty与NIO之前世今生
在 NIO 中有几个核心对象需要掌握:缓冲区(Buffer)、选择器(Selector)、通道(Channel)
云扬四海
2022/05/10
2800
02.Netty与NIO之前世今生
Reactor模式
Reactor模式是一种设计模式,它是基于事件驱动的,可以并发的处理多个服务请求,当请求抵达后,依据多路复用策略,同步的派发这些请求至相关的请求处理程序。
CodingDiray
2019/09/25
2K0
Reactor模式
Netty3学习笔记(一) --- 传统IO与NIO比较
  (4)使用传统的I/O程序读取文件内容, 并写入到另一个文件(或Socket), 如下程序:
挽风
2021/04/13
2700
Netty3学习笔记(一) --- 传统IO与NIO比较
并发设计模式实战系列(1):半同步/半异步模式
今天为大家带来的是并发设计模式实战系列,第一章半同步/半异步(Half-Sync/Half-Async)模式,废话不多说直接开始~
摘星.
2025/05/20
860
1 Netty 网络高并发框架
纯手打,总结! Netty是什么? Netty是当前非常流行的网络通讯开源框架,高并发和高可靠,底层就可以用Netty支撑。 Netty 官网:https://netty.io/ 学习视频:https
收心
2022/01/19
6990
1 Netty 网络高并发框架
『互联网架构』软件架构-io与nio线程模型reactor模型(上)(53)
PS:NIO不需要的代码里面根本没有多线程,实际上nio只有一个工作线程,一个线程可以为多个客人服务。
IT架构圈
2019/05/30
5500
Netty-nio
channel 有一点类似于 stream,它就是读写数据的双向通道,可以从 channel 将数据读入 buffer,也可以将 buffer 的数据写入 channel,而之前的 stream 要么是输入,要么是输出,channel 比 stream 更为底层
sgr997
2022/11/10
7200
Netty-nio
从BIO到NIO、AIO和零拷贝
在JAVA的网络编程方面,BIO、NIO、AIO和零拷贝是我们必须掌握的技术,它们分别代表着不同的网络编程实现方式。
青山师
2023/05/05
2090
Java进阶(五)Java I/O模型从BIO到NIO和Reactor模式
Java I/O模型 同步 vs. 异步 同步I/O 每个请求必须逐个地被处理,一个请求的处理会导致整个流程的暂时等待,这些事件无法并发地执行。用户线程发起I/O请求后需要等待或者轮询内核I/O操作完成后才能继续执行。 异步I/O 多个请求可以并发地执行,一个请求或者任务的执行不会导致整个流程的暂时等待。用户线程发起I/O请求后仍然继续执行,当内核I/O操作完成后会通知用户线程,或者调用用户线程注册的回调函数。 阻塞 vs. 非阻塞 阻塞 某个请求发出后,由于该请求操作需要的条件不满足,请求操作一直阻塞,
Jason Guo
2018/06/19
7240
漫谈Java IO之 NIO那些事儿
前面一篇中已经介绍了基本IO的使用以及最简单的阻塞服务器的例子,本篇就来介绍下NIO的相关内容,前面的分享可以参考目录: 网络IO的基本知识与概念 普通IO以及BIO服务器 NIO的使用与服务器Hello world Netty入门与服务器Hello world Netty深入浅出 NIO,也叫做new-IO或者non-blocking-IO,就暂且理解为非阻塞IO吧。 为什么选择NIO 那么NIO相对于IO来说,有什么优势呢?总结来说: IO是面向流的,数据只能从一端读取到另一端,不能随意读写。NI
用户1154259
2018/04/10
9130
漫谈Java IO之 NIO那些事儿
java的IO模型
本文主要是重新梳理了Java的IO模型,基于之前NIO的文章进行补充,为学习Netty做准备。
贪挽懒月
2020/07/14
7430
相关推荐
Netty基础—4.NIO的使用简介
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验