Reactor

一、Reactor 解决的核心问题

在高并发场景下,传统的 “一个连接一个线程” 模型会因为线程上下文切换和资源耗尽而崩溃。

Reactor 的核心思路是:

用单线程(或少量线程)监听所有 IO 事件,事件就绪后再分发给业务逻辑处理,从而避免无效的线程切换和资源浪费。

它的本质是事件驱动 + IO 多路复用的结合。

二、Reactor 的核心组件

一个完整的 Reactor 模式包含 5 个核心组件,协同工作:

组件 职责 实现方式
Reactor 事件循环的核心,负责管理事件注册、监听和分发 封装 epoll/kqueue 等多路复用器
多路复用器(Demultiplexer) 等待 IO 事件就绪,是 Reactor 的底层依赖 Linux 用 epoll,BSD 用 kqueue,Windows 用 IOCP
事件处理器(Handler) 绑定到特定的 IO 事件,执行具体的业务逻辑 ReadHandlerWriteHandler
Acceptor 专门处理新连接事件的 Handler,接受连接后为新 socket 注册读写事件 基于 listenfd 实现
事件队列(可选) 当事件处理耗时较长时,将事件放入队列,由 Worker 池异步处理 解耦 IO 线程和业务线程,避免阻塞 Reactor

三、Reactor 的完整工作流程

我们以一个 TCP 服务器为例,完整走一遍 Reactor 的生命周期:

  1. 初始化阶段

    • Reactor 初始化 epoll 实例,并创建 listenfd 监听端口。
    • listenfd读事件(EPOLLIN) 注册到 epoll,并绑定 Acceptor 作为事件处理器。
  2. 事件循环阶段

    • Reactor 调用 epoll_wait() 阻塞,等待事件就绪。
    • 当有新连接到达时,listenfd 的读事件就绪,epoll_wait() 返回。
    • Reactor 触发 Acceptor 执行,调用 accept() 获取新的 connfd
    • connfd 设置非阻塞模式,并将其读事件注册到 epoll,绑定 ReadHandler
  3. 事件处理阶段

    • 当客户端发送数据时,connfd 的读事件就绪。
    • Reactor 触发 ReadHandler 执行,调用 read() 读取数据。
    • 业务逻辑处理完成后,若需要回包,则将 connfd写事件注册到 epoll,绑定 WriteHandler
    • 当内核缓冲区可写时,WriteHandler 被触发,调用 write() 发送响应。
  4. 连接关闭阶段

    • 当客户端断开连接时,connfd 的读事件就绪,read() 返回 0
    • ReadHandler 触发关闭逻辑,从 epoll 中注销该 connfd 并关闭。

四、Reactor 的三种典型实现

根据线程模型的不同,Reactor 有三种主流实现方式,各有适用场景:

1. 单 Reactor 单线程

  • 结构:一个 Reactor 线程负责监听所有事件、处理连接和业务逻辑。
  • 优点:实现最简单,无线程安全问题。
  • 缺点:业务逻辑阻塞会导致整个服务停滞,仅适用于 IO 密集型、业务逻辑简单的场景(如 Redis)。

2. 单 Reactor 多线程

  • 结构:一个 Reactor 线程负责监听和分发事件,业务逻辑由 Worker 线程池处理。
  • 优点:IO 处理和业务逻辑分离,避免阻塞 Reactor 线程。
  • 缺点:单 Reactor 线程仍是性能瓶颈,无法充分利用多核 CPU。

3. 主从 Reactor 多线程(最常用)

  • 结构

    • 主 Reactor:负责监听新连接,接受后分发给从 Reactor。
    • 从 Reactor:负责处理已连接 socket 的读写事件。
    • Worker 线程池:处理具体的业务逻辑。
  • 优点:充分利用多核 CPU,支持海量并发,是 Netty 等框架的默认实现。

  • 缺点:实现复杂度较高,需要处理线程间通信和事件分发。

五、Reactor 模式的代码实现(C++ 版)

下面是一个基于 epoll 的单 Reactor 单线程实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
#include <sys/epoll.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <fcntl.h>
#include <unistd.h>
#include <cstring>
#include <iostream>
#include <unordered_map>
#include <functional>

class Reactor {
public:
Reactor() : epoll_fd_(epoll_create1(0)) {}

void add_handler(int fd, std::function<void()> handler, int events) {
epoll_event ev{};
ev.data.fd = fd;
ev.events = events;
epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev);
handlers_[fd] = std::move(handler);
}

void run() {
std::vector<epoll_event> events(1024);
while (true) {
int n = epoll_wait(epoll_fd_, events.data(), events.size(), -1);
for (int i = 0; i < n; ++i) {
int fd = events[i].data.fd;
if (handlers_.count(fd)) {
handlers_[fd]();
}
}
}
}

private:
int epoll_fd_;
std::unordered_map<int, std::function<void()>> handlers_;
};

int main() {
// 1. 创建 listenfd
int listenfd = socket(AF_INET, SOCK_STREAM, 0);
fcntl(listenfd, F_SETFL, fcntl(listenfd, F_GETFL) | O_NONBLOCK);

// 2. 绑定端口
sockaddr_in addr{};
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = INADDR_ANY;
addr.sin_port = htons(8080);
bind(listenfd, (sockaddr*)&addr, sizeof(addr));
listen(listenfd, 1024);

// 3. 初始化 Reactor
Reactor reactor;

// 4. 注册 Acceptor 处理新连接
reactor.add_handler(listenfd, [&]() {
int connfd = accept(listenfd, nullptr, nullptr);
fcntl(connfd, F_SETFL, fcntl(connfd, F_GETFL) | O_NONBLOCK);
// 为新连接注册读事件处理器
reactor.add_handler(connfd, [connfd]() {
char buf[1024];
ssize_t n = read(connfd, buf, sizeof(buf));
if (n > 0) {
buf[n] = '\0';
std::cout << "收到数据:" << buf << std::endl;
write(connfd, "Hello from Reactor", 18);
} else {
close(connfd);
}
}, EPOLLIN);
}, EPOLLIN);

// 5. 启动事件循环
reactor.run();

close(listenfd);
return 0;
}

六、Reactor 模式的优化点

  1. 水平触发(LT) vs 边缘触发(ET)

    • 默认是水平触发,事件就绪后会持续通知,直到被处理。
    • 边缘触发只通知一次,需要一次性读完所有数据,性能更高,但实现更复杂。
  2. 事件优先级

    • epoll 支持 EPOLLET(边缘触发)和 EPOLLONESHOT(事件只触发一次),可以优化事件处理效率。
  3. 避免阻塞 Reactor

    • 耗时的业务逻辑必须异步化,放入 Worker 线程池处理。
    • 禁止在 Handler 中调用 sleep()recv() 等阻塞函数。
  4. 连接数优化

    • 修改系统参数(如 /etc/security/limits.conf)提升最大文件描述符限制。