Line data Source code
1 : /**
2 : * @file reactor.hpp
3 : * @brief Reactor 模式事件循环实现
4 : *
5 : * 提供跨平台的 I/O 多路复用封装,支持 Linux (epoll) 和 macOS (kqueue)。
6 : * 采用边缘触发(ET)模式,通过无锁队列实现线程安全的异步任务提交。
7 : */
8 :
9 : #pragma once
10 :
11 : #include <functional>
12 : #include <vector>
13 : #include <unordered_map>
14 : #include <atomic>
15 : #include <cerrno>
16 : #include <cstring>
17 : #include <iostream>
18 : #include <unistd.h> // 为 read, write, close 提供 POSIX 函数声明
19 :
20 : #include "base/concurrentqueue.h" // 使用 moodycamel 的无锁队列
21 :
22 : // --- 面向各平台的 I/O 多路处理包含 ---
23 : #ifdef __linux__
24 : #include <sys/epoll.h>
25 : #include <sys/timerfd.h>
26 : #include <sys/eventfd.h> // 使用 eventfd 来唤醒
27 : #elif __APPLE__
28 : #include <sys/event.h>
29 : #include <sys/time.h>
30 : #else
31 : #error "Unsupported platform for Reactor"
32 : #endif
33 :
34 : namespace fix40 {
35 :
36 : /**
37 : * @enum EventType
38 : * @brief I/O 事件类型
39 : */
40 : enum class EventType : uint32_t {
41 : READ = 1, ///< 可读事件
42 : WRITE = 2 ///< 可写事件
43 : };
44 :
45 : /**
46 : * @class Reactor
47 : * @brief 基于 Reactor 模式的事件循环
48 : *
49 : * 封装 epoll (Linux) 或 kqueue (macOS) 实现高性能 I/O 多路复用。
50 : *
51 : * @par 设计特点
52 : * - 边缘触发(ET)模式,减少系统调用次数
53 : * - 无锁任务队列,支持跨线程安全提交任务
54 : * - 支持文件描述符事件和定时器事件
55 : * - 通过 eventfd/pipe 实现跨线程唤醒
56 : *
57 : * @par 线程模型
58 : * - run() 方法在单线程中执行事件循环
59 : * - add_fd/modify_fd/remove_fd/add_timer 可从任意线程调用
60 : * - 实际的 fd 操作通过任务队列在事件循环线程中执行
61 : *
62 : * @par 使用示例
63 : * @code
64 : * Reactor reactor;
65 : *
66 : * // 注册读事件
67 : * reactor.add_fd(client_fd, [](int fd) {
68 : * char buf[1024];
69 : * read(fd, buf, sizeof(buf));
70 : * });
71 : *
72 : * // 添加定时器
73 : * reactor.add_timer(1000, [](int) {
74 : * std::cout << "Timer fired!" << std::endl;
75 : * });
76 : *
77 : * // 启动事件循环(阻塞)
78 : * reactor.run();
79 : * @endcode
80 : */
81 : class Reactor {
82 : public:
83 : /// 文件描述符事件回调类型,参数为触发事件的 fd
84 : using FdCallback = std::function<void(int)>;
85 : /// 异步任务类型
86 : using Task = std::function<void()>;
87 :
88 : /**
89 : * @brief 构造 Reactor
90 : *
91 : * 创建 epoll/kqueue 实例和唤醒机制(eventfd/pipe)。
92 : *
93 : * @throws std::runtime_error 创建失败时抛出异常
94 : */
95 : Reactor();
96 :
97 : /**
98 : * @brief 析构 Reactor
99 : *
100 : * 关闭 epoll/kqueue 实例和所有定时器 fd。
101 : */
102 : ~Reactor();
103 :
104 : /**
105 : * @brief 注册文件描述符的读事件
106 : * @param fd 文件描述符(应已设置为非阻塞)
107 : * @param cb 读事件回调函数
108 : *
109 : * 可从任意线程调用,实际注册在事件循环线程中执行。
110 : */
111 : void add_fd(int fd, FdCallback cb);
112 :
113 : /**
114 : * @brief 修改文件描述符的事件监听
115 : * @param fd 文件描述符
116 : * @param event_mask 事件掩码(EventType::READ | EventType::WRITE)
117 : * @param write_cb 写事件回调函数(可为 nullptr)
118 : *
119 : * 用于动态添加/移除写事件监听。
120 : */
121 : void modify_fd(int fd, uint32_t event_mask, FdCallback write_cb);
122 :
123 : /**
124 : * @brief 添加周期性定时器
125 : * @param interval_ms 定时间隔(毫秒)
126 : * @param cb 定时器回调函数
127 : *
128 : * @note Linux 使用 timerfd,macOS 使用 kqueue EVFILT_TIMER
129 : */
130 : void add_timer(int interval_ms, FdCallback cb);
131 :
132 : /**
133 : * @brief 移除文件描述符
134 : * @param fd 要移除的文件描述符
135 : *
136 : * 从 epoll/kqueue 中移除 fd 并清理回调。
137 : */
138 : void remove_fd(int fd);
139 :
140 : /**
141 : * @brief 启动事件循环
142 : *
143 : * 阻塞当前线程,持续处理 I/O 事件直到调用 stop()。
144 : *
145 : * @par 事件循环流程
146 : * 1. 处理任务队列中的挂起任务
147 : * 2. 调用 epoll_wait/kevent 等待事件
148 : * 3. 分发事件到对应的回调函数
149 : * 4. 重复以上步骤
150 : */
151 : void run();
152 :
153 : /**
154 : * @brief 停止事件循环
155 : *
156 : * 设置停止标志并唤醒事件循环,使 run() 返回。
157 : * 可从任意线程调用。
158 : */
159 : void stop();
160 :
161 : /**
162 : * @brief 检查事件循环是否正在运行
163 : * @return true 正在运行
164 : * @return false 已停止或未启动
165 : */
166 : bool is_running() const;
167 :
168 : private:
169 : /**
170 : * @brief 实际执行添加 fd 操作
171 : * @param fd 文件描述符
172 : * @param cb 回调函数
173 : */
174 : void do_add_fd(int fd, FdCallback cb);
175 :
176 : /**
177 : * @brief 实际执行修改 fd 操作
178 : */
179 : void do_modify_fd(int fd, uint32_t event_mask, FdCallback write_cb);
180 :
181 : /**
182 : * @brief 实际执行添加定时器操作
183 : */
184 : void do_add_timer(int interval_ms, FdCallback cb);
185 :
186 : /**
187 : * @brief 实际执行移除 fd 操作
188 : */
189 : void do_remove_fd(int fd);
190 :
191 : /**
192 : * @brief 处理任务队列中的所有挂起任务
193 : */
194 : void process_tasks();
195 :
196 : /**
197 : * @brief 唤醒阻塞在 epoll_wait/kevent 的事件循环
198 : */
199 : void wakeup();
200 :
201 : int io_fd_; ///< epoll 或 kqueue 的文件描述符
202 : #ifdef __linux__
203 : int wakeup_fd_; ///< Linux 使用 eventfd 唤醒
204 : #else
205 : int wakeup_pipe_[2]; ///< macOS 使用 pipe 唤醒
206 : #endif
207 : std::atomic<bool> running_; ///< 运行状态标志
208 : std::unordered_map<int, FdCallback> callbacks_; ///< 读事件回调映射
209 : std::unordered_map<int, FdCallback> write_callbacks_; ///< 写事件回调映射
210 : std::vector<int> timer_fds_; ///< 定时器 fd 列表(仅 Linux)
211 :
212 : moodycamel::ConcurrentQueue<Task> tasks_; ///< 无锁任务队列
213 : };
214 :
215 : // --- 实现 ---
216 :
217 : inline Reactor::Reactor() : running_(false) {
218 : #ifdef __linux__
219 : io_fd_ = epoll_create1(0);
220 : if (io_fd_ == -1) throw std::runtime_error("Failed to create epoll instance");
221 : wakeup_fd_ = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
222 : if (wakeup_fd_ == -1) {
223 : close(io_fd_);
224 : throw std::runtime_error("Failed to create eventfd for wakeup");
225 : }
226 : do_add_fd(wakeup_fd_, [this](int fd){
227 : uint64_t one;
228 : ssize_t n = read(fd, &one, sizeof(one));
229 : if (n != sizeof(one)) {
230 : // 错误处理
231 : }
232 : });
233 : #elif __APPLE__
234 : io_fd_ = kqueue();
235 : if (io_fd_ == -1) throw std::runtime_error("Failed to create kqueue instance");
236 : if (pipe(wakeup_pipe_) == -1) {
237 : close(io_fd_);
238 : throw std::runtime_error("Failed to create pipe for reactor wakeup");
239 : }
240 : do_add_fd(wakeup_pipe_[0], nullptr); // 不需要回调,只为了唤醒
241 : #endif
242 : }
243 :
244 : inline Reactor::~Reactor() {
245 : close(io_fd_);
246 : #ifdef __linux__
247 : close(wakeup_fd_);
248 : for (int tfd : timer_fds_) {
249 : close(tfd);
250 : }
251 : #elif __APPLE__
252 : close(wakeup_pipe_[0]);
253 : close(wakeup_pipe_[1]);
254 : #endif
255 : }
256 :
257 0 : inline void Reactor::wakeup() {
258 : #ifdef __linux__
259 0 : uint64_t one = 1;
260 0 : ssize_t n = write(wakeup_fd_, &one, sizeof(one));
261 : if (n != sizeof(one)) {
262 : // 错误处理
263 : }
264 : #elif __APPLE__
265 : char buf = 'w';
266 : if (write(wakeup_pipe_[1], &buf, 1) < 0) {
267 : // 错误处理
268 : }
269 : #endif
270 0 : }
271 :
272 : inline void Reactor::add_fd(int fd, FdCallback cb) {
273 : tasks_.enqueue([this, fd, cb = std::move(cb)]() {
274 : do_add_fd(fd, cb);
275 : });
276 : wakeup();
277 : }
278 :
279 0 : inline void Reactor::modify_fd(int fd, uint32_t event_mask, FdCallback write_cb) {
280 0 : tasks_.enqueue([this, fd, event_mask, cb = std::move(write_cb)]() {
281 0 : do_modify_fd(fd, event_mask, cb);
282 0 : });
283 0 : wakeup();
284 0 : }
285 :
286 : inline void Reactor::add_timer(int interval_ms, FdCallback cb) {
287 : tasks_.enqueue([this, interval_ms, cb = std::move(cb)]() {
288 : do_add_timer(interval_ms, cb);
289 : });
290 : wakeup();
291 : }
292 :
293 0 : inline void Reactor::remove_fd(int fd) {
294 0 : tasks_.enqueue([this, fd]() {
295 0 : do_remove_fd(fd);
296 0 : });
297 0 : wakeup();
298 0 : }
299 :
300 : inline void Reactor::do_add_fd(int fd, FdCallback cb) {
301 : #ifdef __linux__
302 : epoll_event event;
303 : event.events = EPOLLIN | EPOLLET;
304 : event.data.fd = fd;
305 : if (epoll_ctl(io_fd_, EPOLL_CTL_ADD, fd, &event) == -1) {
306 : perror("epoll_ctl(ADD) failed");
307 : return;
308 : }
309 : #elif __APPLE__
310 : struct kevent change_event;
311 : EV_SET(&change_event, fd, EVFILT_READ, EV_ADD | EV_CLEAR, 0, 0, nullptr);
312 : if (kevent(io_fd_, &change_event, 1, nullptr, 0, nullptr) == -1) {
313 : perror("kevent(ADD) failed");
314 : return;
315 : }
316 : #endif
317 : if (cb) {
318 : callbacks_[fd] = std::move(cb);
319 : }
320 : }
321 :
322 0 : inline void Reactor::do_modify_fd(int fd, uint32_t event_mask, FdCallback write_cb) {
323 : #ifdef __linux__
324 : epoll_event event;
325 0 : event.data.fd = fd;
326 0 : event.events = EPOLLET;
327 0 : if (static_cast<uint32_t>(event_mask) & static_cast<uint32_t>(EventType::READ)) {
328 0 : event.events |= EPOLLIN;
329 : }
330 0 : if (static_cast<uint32_t>(event_mask) & static_cast<uint32_t>(EventType::WRITE)) {
331 0 : event.events |= EPOLLOUT;
332 : }
333 0 : if (epoll_ctl(io_fd_, EPOLL_CTL_MOD, fd, &event) == -1) {
334 0 : if (errno != ENOENT) {
335 0 : perror("epoll_ctl(MOD) failed");
336 : }
337 0 : return;
338 : }
339 : #elif __APPLE__
340 : struct kevent change;
341 : if (static_cast<uint32_t>(event_mask) & static_cast<uint32_t>(EventType::WRITE)) {
342 : EV_SET(&change, fd, EVFILT_WRITE, EV_ADD | EV_CLEAR, 0, 0, nullptr);
343 : } else {
344 : EV_SET(&change, fd, EVFILT_WRITE, EV_DELETE, 0, 0, nullptr);
345 : }
346 : if (kevent(io_fd_, &change, 1, nullptr, 0, nullptr) == -1) {
347 : perror("kevent(MOD-WRITE) failed");
348 : }
349 : #endif
350 0 : if (static_cast<uint32_t>(event_mask) & static_cast<uint32_t>(EventType::WRITE)) {
351 0 : write_callbacks_[fd] = std::move(write_cb);
352 : } else {
353 0 : write_callbacks_.erase(fd);
354 : }
355 : }
356 :
357 : inline void Reactor::do_add_timer(int interval_ms, FdCallback cb) {
358 : #ifdef __linux__
359 : int tfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
360 : if (tfd == -1) {
361 : perror("timerfd_create failed");
362 : return;
363 : }
364 : itimerspec ts;
365 : ts.it_value.tv_sec = interval_ms / 1000;
366 : ts.it_value.tv_nsec = (long)(interval_ms % 1000) * 1000000L;
367 : ts.it_interval = ts.it_value;
368 : if (timerfd_settime(tfd, 0, &ts, nullptr) == -1) {
369 : perror("timerfd_settime failed");
370 : close(tfd);
371 : return;
372 : }
373 : timer_fds_.push_back(tfd);
374 : do_add_fd(tfd, std::move(cb));
375 : #elif __APPLE__
376 : struct kevent change_event;
377 : EV_SET(&change_event, interval_ms, EVFILT_TIMER, EV_ADD | EV_ENABLE, 0, interval_ms, nullptr);
378 : if (kevent(io_fd_, &change_event, 1, nullptr, 0, nullptr) == -1) {
379 : perror("kevent(ADD TIMER) failed");
380 : return;
381 : }
382 : callbacks_[-interval_ms] = std::move(cb);
383 : #endif
384 : }
385 :
386 0 : inline void Reactor::do_remove_fd(int fd) {
387 : #ifdef __linux__
388 0 : if (epoll_ctl(io_fd_, EPOLL_CTL_DEL, fd, nullptr) == -1) {
389 : // ENOENT: fd 不在 epoll 中
390 : // EBADF: fd 已关闭(Linux 会自动从 epoll 移除已关闭的 fd)
391 0 : if (errno != ENOENT && errno != EBADF) {
392 0 : perror("epoll_ctl(DEL) failed");
393 : }
394 : }
395 : #elif __APPLE__
396 : struct kevent change_event;
397 : EV_SET(&change_event, fd, EVFILT_READ, EV_DELETE, 0, 0, nullptr);
398 : if (kevent(io_fd_, &change_event, 1, nullptr, 0, nullptr) == -1 && errno != ENOENT && errno != EBADF) {
399 : perror("kevent(DEL READ) failed");
400 : }
401 : EV_SET(&change_event, fd, EVFILT_WRITE, EV_DELETE, 0, 0, nullptr);
402 : if (kevent(io_fd_, &change_event, 1, nullptr, 0, nullptr) == -1 && errno != ENOENT && errno != EBADF) {
403 : perror("kevent(DEL WRITE) failed");
404 : }
405 : #endif
406 0 : callbacks_.erase(fd);
407 0 : write_callbacks_.erase(fd);
408 0 : }
409 :
410 :
411 : inline void Reactor::stop() {
412 : if (!running_.exchange(false)) {
413 : return;
414 : }
415 : wakeup(); // 唤醒 run() 循环以使其退出
416 : }
417 :
418 : inline bool Reactor::is_running() const {
419 : return running_.load(std::memory_order_acquire);
420 : }
421 :
422 : inline void Reactor::process_tasks() {
423 : Task task;
424 : while(tasks_.try_dequeue(task)) {
425 : task();
426 : }
427 : }
428 :
429 : inline void Reactor::run() {
430 : running_.store(true, std::memory_order_release);
431 : #ifdef __linux__
432 : std::vector<epoll_event> events(128);
433 : #elif __APPLE__
434 : std::vector<struct kevent> events(128);
435 : #endif
436 :
437 : while (running_.load(std::memory_order_acquire)) {
438 : process_tasks(); // 每次循环前处理挂起的任务
439 :
440 : #ifdef __linux__
441 : int n_events = epoll_wait(io_fd_, events.data(), events.size(), -1);
442 : #elif __APPLE__
443 : int n_events = kevent(io_fd_, nullptr, 0, events.data(), events.size(), nullptr);
444 : #endif
445 : if (!running_) break; // 捕获 stop() 信号后退出
446 :
447 : if (n_events < 0) {
448 : if (errno == EINTR) continue;
449 : std::cerr << "epoll_wait/kevent failed: " << strerror(errno) << std::endl;
450 : break;
451 : }
452 :
453 : for (int i = 0; i < n_events; ++i) {
454 : #ifdef __linux__
455 : int fd = events[i].data.fd;
456 : if (fd == wakeup_fd_) {
457 : uint64_t one;
458 : read(wakeup_fd_, &one, sizeof(one)); // 清空 eventfd
459 : continue;
460 : }
461 : uint32_t active_events = events[i].events;
462 : #elif __APPLE__
463 : int fd = events[i].ident;
464 : if (fd == wakeup_pipe_[0]) {
465 : char buf[1];
466 : read(wakeup_pipe_[0], buf, 1);
467 : continue;
468 : }
469 : uint32_t active_events = 0;
470 : if (events[i].filter == EVFILT_READ) active_events |= static_cast<uint32_t>(EventType::READ);
471 : if (events[i].filter == EVFILT_WRITE) active_events |= static_cast<uint32_t>(EventType::WRITE);
472 : if (events[i].filter == EVFILT_TIMER) {
473 : fd = -static_cast<int>(events[i].ident);
474 : active_events |= static_cast<uint32_t>(EventType::READ);
475 : }
476 : #endif
477 :
478 : #ifdef __linux__
479 : if (active_events & (EPOLLIN | EPOLLERR | EPOLLHUP)) {
480 : auto it = callbacks_.find(fd);
481 : if (it != callbacks_.end() && it->second) it->second(fd);
482 : }
483 : if (active_events & EPOLLOUT) {
484 : auto it = write_callbacks_.find(fd);
485 : if (it != write_callbacks_.end() && it->second) it->second(fd);
486 : }
487 : #elif __APPLE__
488 : if (active_events & static_cast<uint32_t>(EventType::READ)) {
489 : auto it = callbacks_.find(fd);
490 : if (it != callbacks_.end() && it->second) it->second(fd);
491 : }
492 : if (active_events & static_cast<uint32_t>(EventType::WRITE)) {
493 : auto it = write_callbacks_.find(fd);
494 : if (it != write_callbacks_.end() && it->second) it->second(fd);
495 : }
496 : #endif
497 : }
498 : }
499 : }
500 : } // fix40 名称空间结束
|