今天为大家带来的是并发设计模式实战系列,第十八章反应器(Reactor),废话不多说直接开始~
while(true) { selector.select(); }
)Selector
实现I/O多路复用(Linux epoll/kqueue)ChannelHandler
接口处理具体业务non-blocking
ByteBuffer
直接读写内核缓冲区系统组件 | 现实类比 | 核心行为 |
---|---|---|
Reactor | 前台接待员 | 监听顾客举手信号 |
Selector | 座位呼叫器 | 哪个桌位需要服务就亮灯 |
EventHandler | 服务员 | 处理具体点餐、上菜请求 |
import java.nio.*;
import java.nio.channels.*;
import java.util.*;
public class Reactor implements Runnable {
final Selector selector;
final ServerSocketChannel serverSocket;
// 启动方法
public Reactor(int port) throws Exception {
selector = Selector.open();
serverSocket = ServerSocketChannel.open();
serverSocket.socket().bind(new InetSocketAddress(port));
serverSocket.configureBlocking(false);
SelectionKey sk = serverSocket.register(selector, SelectionKey.OP_ACCEPT);
sk.attach(new Acceptor());
}
// 事件循环核心
public void run() {
try {
while (!Thread.interrupted()) {
selector.select();
Set<SelectionKey> selected = selector.selectedKeys();
Iterator<SelectionKey> it = selected.iterator();
while (it.hasNext()) {
dispatch(it.next());
}
selected.clear();
}
} catch (Exception e) {
e.printStackTrace();
}
}
void dispatch(SelectionKey key) {
Runnable r = (Runnable) key.attachment();
if (r != null) r.run();
}
// 连接处理器
class Acceptor implements Runnable {
public void run() {
try {
SocketChannel c = serverSocket.accept();
if (c != null) new Handler(selector, c);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
// 业务处理器
class Handler implements Runnable {
final SocketChannel socket;
final SelectionKey sk;
ByteBuffer input = ByteBuffer.allocate(1024);
Handler(Selector sel, SocketChannel c) throws Exception {
socket = c;
c.configureBlocking(false);
sk = socket.register(sel, 0);
sk.attach(this);
sk.interestOps(SelectionKey.OP_READ);
sel.wakeup(); // 唤醒selector
}
public void run() {
try {
if (sk.isReadable()) read();
else if (sk.isWritable()) write();
} catch (Exception e) {
close();
}
}
void read() throws Exception {
socket.read(input);
if (inputIsComplete()) {
process();
sk.interestOps(SelectionKey.OP_WRITE);
}
}
void write() throws Exception {
if (outputIsComplete()) sk.cancel();
}
boolean inputIsComplete() { /* 解析协议头判断 */ return true; }
boolean outputIsComplete() { /* 判断写入完成 */ return true; }
void process() { /* 业务处理逻辑 */ }
void close() { /* 资源释放 */ }
}
// 启动类
class Main {
public static void main(String[] args) throws Exception {
new Thread(new Reactor(8080)).start();
}
}
// 重要参数调优
serverSocket.setOption(StandardSocketOptions.SO_REUSEADDR, true);
socket.setOption(StandardSocketOptions.TCP_NODELAY, true);
// Buffer分配策略
ByteBuffer.allocateDirect(1024); // 直接内存,减少拷贝
类型 | 线程模型 | 适用场景 | JDK实现案例 |
---|---|---|---|
单线程Reactor | 所有操作单线程 | 轻量级应用 | Redis单线程模型 |
多线程Reactor | I/O多路复用+线程池 | 计算密集型业务 | Netty主从线程组 |
多Reactor | 多Selector分级处理 | 超高并发连接 | Nginx事件处理 |
指标 | Thread-Per-Connection | Reactor模式 |
---|---|---|
连接数支持 | 数百级 | 百万级 |
上下文切换 | 频繁 | 极少 |
内存消耗 | 每个连接1MB栈 | 共享少量缓冲区 |
延迟敏感性 | 一般 | 极佳 |
// 主从Reactor配置
Selector[] selectors = new Selector[Runtime.getRuntime().availableProcessors()];
for (int i = 0; i < selectors.length; i++) {
selectors[i] = Selector.open();
new Thread(new SubReactor(selectors[i])).start();
}
/ FileChannel.transferTo实现零拷贝
fileChannel.transferTo(position, count, socketChannel);
// 使用Netty的ByteBuf内存池
ByteBufAllocator alloc = PooledByteBufAllocator.DEFAULT;
ByteBuf buffer = alloc.directBuffer(1024);
// 关键指标采集
selector.keys().size(); // 注册通道数
selector.selectNow(); // 就绪事件数
bufferPool.usedMemory(); // 内存使用量
// 主从Reactor线程组实现
class MasterSlaveReactor {
private final Selector masterSelector;
private final Selector[] slaveSelectors;
private final ExecutorService masterExecutor;
private final ExecutorService[] slaveExecutors;
public MasterSlaveReactor(int slaveCount) throws Exception {
// 主Reactor负责连接接入
masterSelector = Selector.open();
masterExecutor = Executors.newSingleThreadExecutor(r -> new Thread(r, "Master-Reactor"));
// 从Reactor负责IO读写
slaveSelectors = new Selector[slaveCount];
slaveExecutors = new ExecutorService[slaveCount];
for (int i = 0; i < slaveCount; i++) {
slaveSelectors[i] = Selector.open();
slaveExecutors[i] = Executors.newSingleThreadExecutor(r -> new Thread(r, "Slave-Reactor-" + i));
}
}
public void start() {
// 主Reactor启动
masterExecutor.execute(() -> {
while (!Thread.interrupted()) {
try {
masterSelector.select();
Set<SelectionKey> keys = masterSelector.selectedKeys();
Iterator<SelectionKey> it = keys.iterator();
while (it.hasNext()) {
SelectionKey key = it.next();
if (key.isAcceptable()) {
// 分配连接到从Reactor
int slaveIndex = key.attachment().hashCode() % slaveSelectors.length;
dispatchToSlave(slaveIndex, key);
}
it.remove();
}
} catch (Exception e) { /* 处理异常 */ }
}
});
// 从Reactor启动
for (int i = 0; i < slaveSelectors.length; i++) {
final int index = i;
slaveExecutors[i].execute(() -> {
while (!Thread.interrupted()) {
try {
slaveSelectors[index].select();
Set<SelectionKey> keys = slaveSelectors[index].selectedKeys();
// 处理IO读写(同基础版Handler逻辑)
} catch (Exception e) { /* 处理异常 */ }
}
});
}
}
}
优化点说明:
// 基于FileChannel的零拷贝传输
void sendFile(SocketChannel channel, File file) throws Exception {
try (FileInputStream fis = new FileInputStream(file)) {
FileChannel fileChannel = fis.getChannel();
long position = 0;
long remaining = fileChannel.size();
while (remaining > 0) {
long transferred = fileChannel.transferTo(position, remaining, channel);
position += transferred;
remaining -= transferred;
}
}
}
// 内存映射解析大文件
ByteBuffer mapFile(String path) throws Exception {
try (RandomAccessFile raf = new RandomAccessFile(path, "r")) {
return raf.getChannel().map(FileChannel.MapMode.READ_ONLY, 0, raf.length());
}
}
// 协议帧结构示例
class ProtocolFrame {
byte magic; // 魔数标识
int length; // 数据长度
byte type; // 协议类型
byte[] data; // 有效载荷
// 使用ByteBuffer解析
public static ProtocolFrame decode(ByteBuffer buffer) {
ProtocolFrame frame = new ProtocolFrame();
frame.magic = buffer.get();
frame.length = buffer.getInt();
frame.type = buffer.get();
frame.data = new byte[frame.length];
buffer.get(frame.data);
return frame;
}
}
问题现象 | 根本原因 | 解决方案 |
---|---|---|
CPU 100% | 空轮询Bug | 1. 升级JDK版本 2. 添加select()超时时间 3. 使用Netty等成熟框架 |
内存泄漏 | ByteBuffer未释放 | 1. 使用内存池技术 2. 实现引用计数 3. 添加JVM参数-XX:+DisableExplicitGC |
连接数不均衡 | 哈希分配不均匀 | 1. 改用一致性哈希 2. 动态监测负载调整分配策略 |
// 关键指标采集示例
class ReactorMetrics {
void collectMetrics(Selector selector) {
// 连接数监控
int connectionCount = selector.keys().size() - 1; // 排除ServerSocketChannel
// 事件处理耗时
long start = System.nanoTime();
selector.select(100);
long latency = System.nanoTime() - start;
// 内存使用监控
long directMemory = ((sun.misc.VM) Class.forName("sun.misc.VM")
.getMethod("maxDirectMemory").invoke(null));
}
}
场景 | 推荐模式 | 理由 |
---|---|---|
Linux平台 | Reactor | 原生支持epoll,社区方案成熟(Netty/libuv) |
Windows平台 | Proactor | IOCP是系统级实现,性能更优 |
混合业务 | 分层架构 | 底层用Reactor处理IO,上层用Proactor处理磁盘操作 |
// Netty线程模型配置示例
EventLoopGroup bossGroup = new NioEventLoopGroup(1); // 主Reactor
EventLoopGroup workerGroup = new NioEventLoopGroup(); // 从Reactor
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new HttpServerCodec());
ch.pipeline().addLast(new CustomHandler());
}
});
Netty的优化:
扫码关注腾讯云开发者
领取腾讯云代金券
Copyright © 2013 - 2025 Tencent Cloud. All Rights Reserved. 腾讯云 版权所有
深圳市腾讯计算机系统有限公司 ICP备案/许可证号:粤B2-20090059 深公网安备号 44030502008569
腾讯云计算(北京)有限责任公司 京ICP证150476号 | 京ICP备11018762号 | 京公网安备号11010802020287
Copyright © 2013 - 2025 Tencent Cloud.
All Rights Reserved. 腾讯云 版权所有