LCOV - code coverage report
Current view: top level - include/core - reactor.hpp (source / functions) Coverage Total Hit
Test: coverage.info Lines: 0.0 % 37 0
Test Date: 2025-12-19 03:13:09 Functions: 0.0 % 7 0

            Line data    Source code
       1              : /**
       2              :  * @file reactor.hpp
       3              :  * @brief Reactor 模式事件循环实现
       4              :  *
       5              :  * 提供跨平台的 I/O 多路复用封装,支持 Linux (epoll) 和 macOS (kqueue)。
       6              :  * 采用边缘触发(ET)模式,通过无锁队列实现线程安全的异步任务提交。
       7              :  */
       8              : 
       9              : #pragma once
      10              : 
      11              : #include <functional>
      12              : #include <vector>
      13              : #include <unordered_map>
      14              : #include <atomic>
      15              : #include <cerrno>
      16              : #include <cstring>
      17              : #include <iostream>
      18              : #include <unistd.h> // 为 read, write, close 提供 POSIX 函数声明
      19              : 
      20              : #include "base/concurrentqueue.h" // 使用 moodycamel 的无锁队列
      21              : 
      22              : // --- 面向各平台的 I/O 多路处理包含 ---
      23              : #ifdef __linux__
      24              : #include <sys/epoll.h>
      25              : #include <sys/timerfd.h>
      26              : #include <sys/eventfd.h> // 使用 eventfd 来唤醒
      27              : #elif __APPLE__
      28              : #include <sys/event.h>
      29              : #include <sys/time.h>
      30              : #else
      31              : #error "Unsupported platform for Reactor"
      32              : #endif
      33              : 
      34              : namespace fix40 {
      35              : 
      36              : /**
      37              :  * @enum EventType
      38              :  * @brief I/O 事件类型
      39              :  */
      40              : enum class EventType : uint32_t {
      41              :     READ = 1,   ///< 可读事件
      42              :     WRITE = 2   ///< 可写事件
      43              : };
      44              : 
      45              : /**
      46              :  * @class Reactor
      47              :  * @brief 基于 Reactor 模式的事件循环
      48              :  *
      49              :  * 封装 epoll (Linux) 或 kqueue (macOS) 实现高性能 I/O 多路复用。
      50              :  *
      51              :  * @par 设计特点
      52              :  * - 边缘触发(ET)模式,减少系统调用次数
      53              :  * - 无锁任务队列,支持跨线程安全提交任务
      54              :  * - 支持文件描述符事件和定时器事件
      55              :  * - 通过 eventfd/pipe 实现跨线程唤醒
      56              :  *
      57              :  * @par 线程模型
      58              :  * - run() 方法在单线程中执行事件循环
      59              :  * - add_fd/modify_fd/remove_fd/add_timer 可从任意线程调用
      60              :  * - 实际的 fd 操作通过任务队列在事件循环线程中执行
      61              :  *
      62              :  * @par 使用示例
      63              :  * @code
      64              :  * Reactor reactor;
      65              :  * 
      66              :  * // 注册读事件
      67              :  * reactor.add_fd(client_fd, [](int fd) {
      68              :  *     char buf[1024];
      69              :  *     read(fd, buf, sizeof(buf));
      70              :  * });
      71              :  * 
      72              :  * // 添加定时器
      73              :  * reactor.add_timer(1000, [](int) {
      74              :  *     std::cout << "Timer fired!" << std::endl;
      75              :  * });
      76              :  * 
      77              :  * // 启动事件循环(阻塞)
      78              :  * reactor.run();
      79              :  * @endcode
      80              :  */
      81              : class Reactor {
      82              : public:
      83              :     /// 文件描述符事件回调类型,参数为触发事件的 fd
      84              :     using FdCallback = std::function<void(int)>;
      85              :     /// 异步任务类型
      86              :     using Task = std::function<void()>;
      87              : 
      88              :     /**
      89              :      * @brief 构造 Reactor
      90              :      *
      91              :      * 创建 epoll/kqueue 实例和唤醒机制(eventfd/pipe)。
      92              :      *
      93              :      * @throws std::runtime_error 创建失败时抛出异常
      94              :      */
      95              :     Reactor();
      96              : 
      97              :     /**
      98              :      * @brief 析构 Reactor
      99              :      *
     100              :      * 关闭 epoll/kqueue 实例和所有定时器 fd。
     101              :      */
     102              :     ~Reactor();
     103              : 
     104              :     /**
     105              :      * @brief 注册文件描述符的读事件
     106              :      * @param fd 文件描述符(应已设置为非阻塞)
     107              :      * @param cb 读事件回调函数
     108              :      *
     109              :      * 可从任意线程调用,实际注册在事件循环线程中执行。
     110              :      */
     111              :     void add_fd(int fd, FdCallback cb);
     112              : 
     113              :     /**
     114              :      * @brief 修改文件描述符的事件监听
     115              :      * @param fd 文件描述符
     116              :      * @param event_mask 事件掩码(EventType::READ | EventType::WRITE)
     117              :      * @param write_cb 写事件回调函数(可为 nullptr)
     118              :      *
     119              :      * 用于动态添加/移除写事件监听。
     120              :      */
     121              :     void modify_fd(int fd, uint32_t event_mask, FdCallback write_cb);
     122              : 
     123              :     /**
     124              :      * @brief 添加周期性定时器
     125              :      * @param interval_ms 定时间隔(毫秒)
     126              :      * @param cb 定时器回调函数
     127              :      *
     128              :      * @note Linux 使用 timerfd,macOS 使用 kqueue EVFILT_TIMER
     129              :      */
     130              :     void add_timer(int interval_ms, FdCallback cb);
     131              : 
     132              :     /**
     133              :      * @brief 移除文件描述符
     134              :      * @param fd 要移除的文件描述符
     135              :      *
     136              :      * 从 epoll/kqueue 中移除 fd 并清理回调。
     137              :      */
     138              :     void remove_fd(int fd);
     139              : 
     140              :     /**
     141              :      * @brief 启动事件循环
     142              :      *
     143              :      * 阻塞当前线程,持续处理 I/O 事件直到调用 stop()。
     144              :      *
     145              :      * @par 事件循环流程
     146              :      * 1. 处理任务队列中的挂起任务
     147              :      * 2. 调用 epoll_wait/kevent 等待事件
     148              :      * 3. 分发事件到对应的回调函数
     149              :      * 4. 重复以上步骤
     150              :      */
     151              :     void run();
     152              : 
     153              :     /**
     154              :      * @brief 停止事件循环
     155              :      *
     156              :      * 设置停止标志并唤醒事件循环,使 run() 返回。
     157              :      * 可从任意线程调用。
     158              :      */
     159              :     void stop();
     160              : 
     161              :     /**
     162              :      * @brief 检查事件循环是否正在运行
     163              :      * @return true 正在运行
     164              :      * @return false 已停止或未启动
     165              :      */
     166              :     bool is_running() const;
     167              : 
     168              : private:
     169              :     /**
     170              :      * @brief 实际执行添加 fd 操作
     171              :      * @param fd 文件描述符
     172              :      * @param cb 回调函数
     173              :      */
     174              :     void do_add_fd(int fd, FdCallback cb);
     175              : 
     176              :     /**
     177              :      * @brief 实际执行修改 fd 操作
     178              :      */
     179              :     void do_modify_fd(int fd, uint32_t event_mask, FdCallback write_cb);
     180              : 
     181              :     /**
     182              :      * @brief 实际执行添加定时器操作
     183              :      */
     184              :     void do_add_timer(int interval_ms, FdCallback cb);
     185              : 
     186              :     /**
     187              :      * @brief 实际执行移除 fd 操作
     188              :      */
     189              :     void do_remove_fd(int fd);
     190              : 
     191              :     /**
     192              :      * @brief 处理任务队列中的所有挂起任务
     193              :      */
     194              :     void process_tasks();
     195              : 
     196              :     /**
     197              :      * @brief 唤醒阻塞在 epoll_wait/kevent 的事件循环
     198              :      */
     199              :     void wakeup();
     200              : 
     201              :     int io_fd_; ///< epoll 或 kqueue 的文件描述符
     202              : #ifdef __linux__
     203              :     int wakeup_fd_; ///< Linux 使用 eventfd 唤醒
     204              : #else
     205              :     int wakeup_pipe_[2]; ///< macOS 使用 pipe 唤醒
     206              : #endif
     207              :     std::atomic<bool> running_;  ///< 运行状态标志
     208              :     std::unordered_map<int, FdCallback> callbacks_;       ///< 读事件回调映射
     209              :     std::unordered_map<int, FdCallback> write_callbacks_; ///< 写事件回调映射
     210              :     std::vector<int> timer_fds_; ///< 定时器 fd 列表(仅 Linux)
     211              : 
     212              :     moodycamel::ConcurrentQueue<Task> tasks_; ///< 无锁任务队列
     213              : };
     214              : 
     215              : // --- 实现 ---
     216              : 
     217              : inline Reactor::Reactor() : running_(false) {
     218              : #ifdef __linux__
     219              :     io_fd_ = epoll_create1(0);
     220              :     if (io_fd_ == -1) throw std::runtime_error("Failed to create epoll instance");
     221              :     wakeup_fd_ = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
     222              :     if (wakeup_fd_ == -1) {
     223              :         close(io_fd_);
     224              :         throw std::runtime_error("Failed to create eventfd for wakeup");
     225              :     }
     226              :     do_add_fd(wakeup_fd_, [this](int fd){
     227              :         uint64_t one;
     228              :         ssize_t n = read(fd, &one, sizeof(one));
     229              :         if (n != sizeof(one)) {
     230              :             // 错误处理
     231              :         }
     232              :     });
     233              : #elif __APPLE__
     234              :     io_fd_ = kqueue();
     235              :     if (io_fd_ == -1) throw std::runtime_error("Failed to create kqueue instance");
     236              :     if (pipe(wakeup_pipe_) == -1) {
     237              :         close(io_fd_);
     238              :         throw std::runtime_error("Failed to create pipe for reactor wakeup");
     239              :     }
     240              :     do_add_fd(wakeup_pipe_[0], nullptr); // 不需要回调,只为了唤醒
     241              : #endif
     242              : }
     243              : 
     244              : inline Reactor::~Reactor() {
     245              :     close(io_fd_);
     246              : #ifdef __linux__
     247              :     close(wakeup_fd_);
     248              :     for (int tfd : timer_fds_) {
     249              :         close(tfd);
     250              :     }
     251              : #elif __APPLE__
     252              :     close(wakeup_pipe_[0]);
     253              :     close(wakeup_pipe_[1]);
     254              : #endif
     255              : }
     256              : 
     257            0 : inline void Reactor::wakeup() {
     258              : #ifdef __linux__
     259            0 :     uint64_t one = 1;
     260            0 :     ssize_t n = write(wakeup_fd_, &one, sizeof(one));
     261              :     if (n != sizeof(one)) {
     262              :          // 错误处理
     263              :     }
     264              : #elif __APPLE__
     265              :     char buf = 'w';
     266              :     if (write(wakeup_pipe_[1], &buf, 1) < 0) {
     267              :         // 错误处理
     268              :     }
     269              : #endif
     270            0 : }
     271              : 
     272              : inline void Reactor::add_fd(int fd, FdCallback cb) {
     273              :     tasks_.enqueue([this, fd, cb = std::move(cb)]() {
     274              :         do_add_fd(fd, cb);
     275              :     });
     276              :     wakeup();
     277              : }
     278              : 
     279            0 : inline void Reactor::modify_fd(int fd, uint32_t event_mask, FdCallback write_cb) {
     280            0 :     tasks_.enqueue([this, fd, event_mask, cb = std::move(write_cb)]() {
     281            0 :         do_modify_fd(fd, event_mask, cb);
     282            0 :     });
     283            0 :     wakeup();
     284            0 : }
     285              : 
     286              : inline void Reactor::add_timer(int interval_ms, FdCallback cb) {
     287              :     tasks_.enqueue([this, interval_ms, cb = std::move(cb)]() {
     288              :         do_add_timer(interval_ms, cb);
     289              :     });
     290              :     wakeup();
     291              : }
     292              : 
     293            0 : inline void Reactor::remove_fd(int fd) {
     294            0 :     tasks_.enqueue([this, fd]() {
     295            0 :         do_remove_fd(fd);
     296            0 :     });
     297            0 :     wakeup();
     298            0 : }
     299              : 
     300              : inline void Reactor::do_add_fd(int fd, FdCallback cb) {
     301              : #ifdef __linux__
     302              :     epoll_event event;
     303              :     event.events = EPOLLIN | EPOLLET;
     304              :     event.data.fd = fd;
     305              :     if (epoll_ctl(io_fd_, EPOLL_CTL_ADD, fd, &event) == -1) {
     306              :         perror("epoll_ctl(ADD) failed");
     307              :         return;
     308              :     }
     309              : #elif __APPLE__
     310              :     struct kevent change_event;
     311              :     EV_SET(&change_event, fd, EVFILT_READ, EV_ADD | EV_CLEAR, 0, 0, nullptr);
     312              :     if (kevent(io_fd_, &change_event, 1, nullptr, 0, nullptr) == -1) {
     313              :         perror("kevent(ADD) failed");
     314              :         return;
     315              :     }
     316              : #endif
     317              :     if (cb) {
     318              :         callbacks_[fd] = std::move(cb);
     319              :     }
     320              : }
     321              : 
     322            0 : inline void Reactor::do_modify_fd(int fd, uint32_t event_mask, FdCallback write_cb) {
     323              : #ifdef __linux__
     324              :     epoll_event event;
     325            0 :     event.data.fd = fd;
     326            0 :     event.events = EPOLLET;
     327            0 :     if (static_cast<uint32_t>(event_mask) & static_cast<uint32_t>(EventType::READ)) {
     328            0 :         event.events |= EPOLLIN;
     329              :     }
     330            0 :     if (static_cast<uint32_t>(event_mask) & static_cast<uint32_t>(EventType::WRITE)) {
     331            0 :         event.events |= EPOLLOUT;
     332              :     }
     333            0 :     if (epoll_ctl(io_fd_, EPOLL_CTL_MOD, fd, &event) == -1) {
     334            0 :         if (errno != ENOENT) {
     335            0 :             perror("epoll_ctl(MOD) failed");
     336              :         }
     337            0 :         return;
     338              :     }
     339              : #elif __APPLE__
     340              :     struct kevent change;
     341              :     if (static_cast<uint32_t>(event_mask) & static_cast<uint32_t>(EventType::WRITE)) {
     342              :         EV_SET(&change, fd, EVFILT_WRITE, EV_ADD | EV_CLEAR, 0, 0, nullptr);
     343              :     } else {
     344              :         EV_SET(&change, fd, EVFILT_WRITE, EV_DELETE, 0, 0, nullptr);
     345              :     }
     346              :     if (kevent(io_fd_, &change, 1, nullptr, 0, nullptr) == -1) {
     347              :         perror("kevent(MOD-WRITE) failed");
     348              :     }
     349              : #endif
     350            0 :     if (static_cast<uint32_t>(event_mask) & static_cast<uint32_t>(EventType::WRITE)) {
     351            0 :         write_callbacks_[fd] = std::move(write_cb);
     352              :     } else {
     353            0 :         write_callbacks_.erase(fd);
     354              :     }
     355              : }
     356              : 
     357              : inline void Reactor::do_add_timer(int interval_ms, FdCallback cb) {
     358              : #ifdef __linux__
     359              :     int tfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
     360              :     if (tfd == -1) {
     361              :         perror("timerfd_create failed");
     362              :         return;
     363              :     }
     364              :     itimerspec ts;
     365              :     ts.it_value.tv_sec = interval_ms / 1000;
     366              :     ts.it_value.tv_nsec = (long)(interval_ms % 1000) * 1000000L;
     367              :     ts.it_interval = ts.it_value;
     368              :     if (timerfd_settime(tfd, 0, &ts, nullptr) == -1) {
     369              :         perror("timerfd_settime failed");
     370              :         close(tfd);
     371              :         return;
     372              :     }
     373              :     timer_fds_.push_back(tfd);
     374              :     do_add_fd(tfd, std::move(cb));
     375              : #elif __APPLE__
     376              :     struct kevent change_event;
     377              :     EV_SET(&change_event, interval_ms, EVFILT_TIMER, EV_ADD | EV_ENABLE, 0, interval_ms, nullptr);
     378              :     if (kevent(io_fd_, &change_event, 1, nullptr, 0, nullptr) == -1) {
     379              :         perror("kevent(ADD TIMER) failed");
     380              :         return;
     381              :     }
     382              :     callbacks_[-interval_ms] = std::move(cb);
     383              : #endif
     384              : }
     385              : 
     386            0 : inline void Reactor::do_remove_fd(int fd) {
     387              : #ifdef __linux__
     388            0 :     if (epoll_ctl(io_fd_, EPOLL_CTL_DEL, fd, nullptr) == -1) {
     389              :         // ENOENT: fd 不在 epoll 中
     390              :         // EBADF: fd 已关闭(Linux 会自动从 epoll 移除已关闭的 fd)
     391            0 :         if (errno != ENOENT && errno != EBADF) {
     392            0 :             perror("epoll_ctl(DEL) failed");
     393              :         }
     394              :     }
     395              : #elif __APPLE__
     396              :     struct kevent change_event;
     397              :     EV_SET(&change_event, fd, EVFILT_READ, EV_DELETE, 0, 0, nullptr);
     398              :     if (kevent(io_fd_, &change_event, 1, nullptr, 0, nullptr) == -1 && errno != ENOENT && errno != EBADF) {
     399              :         perror("kevent(DEL READ) failed");
     400              :     }
     401              :     EV_SET(&change_event, fd, EVFILT_WRITE, EV_DELETE, 0, 0, nullptr);
     402              :     if (kevent(io_fd_, &change_event, 1, nullptr, 0, nullptr) == -1 && errno != ENOENT && errno != EBADF) {
     403              :         perror("kevent(DEL WRITE) failed");
     404              :     }
     405              : #endif
     406            0 :     callbacks_.erase(fd);
     407            0 :     write_callbacks_.erase(fd);
     408            0 : }
     409              : 
     410              : 
     411              : inline void Reactor::stop() {
     412              :     if (!running_.exchange(false)) {
     413              :         return;
     414              :     }
     415              :     wakeup(); // 唤醒 run() 循环以使其退出
     416              : }
     417              : 
     418              : inline bool Reactor::is_running() const {
     419              :     return running_.load(std::memory_order_acquire);
     420              : }
     421              : 
     422              : inline void Reactor::process_tasks() {
     423              :     Task task;
     424              :     while(tasks_.try_dequeue(task)) {
     425              :         task();
     426              :     }
     427              : }
     428              : 
     429              : inline void Reactor::run() {
     430              :     running_.store(true, std::memory_order_release);
     431              : #ifdef __linux__
     432              :     std::vector<epoll_event> events(128);
     433              : #elif __APPLE__
     434              :     std::vector<struct kevent> events(128);
     435              : #endif
     436              : 
     437              :     while (running_.load(std::memory_order_acquire)) {
     438              :         process_tasks(); // 每次循环前处理挂起的任务
     439              : 
     440              : #ifdef __linux__
     441              :         int n_events = epoll_wait(io_fd_, events.data(), events.size(), -1);
     442              : #elif __APPLE__
     443              :         int n_events = kevent(io_fd_, nullptr, 0, events.data(), events.size(), nullptr);
     444              : #endif
     445              :         if (!running_) break; // 捕获 stop() 信号后退出
     446              : 
     447              :         if (n_events < 0) {
     448              :             if (errno == EINTR) continue;
     449              :             std::cerr << "epoll_wait/kevent failed: " << strerror(errno) << std::endl;
     450              :             break;
     451              :         }
     452              : 
     453              :         for (int i = 0; i < n_events; ++i) {
     454              : #ifdef __linux__
     455              :             int fd = events[i].data.fd;
     456              :             if (fd == wakeup_fd_) {
     457              :                 uint64_t one;
     458              :                 read(wakeup_fd_, &one, sizeof(one)); // 清空 eventfd
     459              :                 continue;
     460              :             }
     461              :             uint32_t active_events = events[i].events;
     462              : #elif __APPLE__
     463              :             int fd = events[i].ident;
     464              :             if (fd == wakeup_pipe_[0]) {
     465              :                 char buf[1];
     466              :                 read(wakeup_pipe_[0], buf, 1);
     467              :                 continue;
     468              :             }
     469              :             uint32_t active_events = 0;
     470              :             if (events[i].filter == EVFILT_READ) active_events |= static_cast<uint32_t>(EventType::READ);
     471              :             if (events[i].filter == EVFILT_WRITE) active_events |= static_cast<uint32_t>(EventType::WRITE);
     472              :             if (events[i].filter == EVFILT_TIMER) {
     473              :                 fd = -static_cast<int>(events[i].ident);
     474              :                 active_events |= static_cast<uint32_t>(EventType::READ);
     475              :             }
     476              : #endif
     477              : 
     478              : #ifdef __linux__
     479              :             if (active_events & (EPOLLIN | EPOLLERR | EPOLLHUP)) {
     480              :                 auto it = callbacks_.find(fd);
     481              :                 if (it != callbacks_.end() && it->second) it->second(fd);
     482              :             }
     483              :             if (active_events & EPOLLOUT) {
     484              :                 auto it = write_callbacks_.find(fd);
     485              :                 if (it != write_callbacks_.end() && it->second) it->second(fd);
     486              :             }
     487              : #elif __APPLE__
     488              :             if (active_events & static_cast<uint32_t>(EventType::READ)) {
     489              :                 auto it = callbacks_.find(fd);
     490              :                 if (it != callbacks_.end() && it->second) it->second(fd);
     491              :             }
     492              :             if (active_events & static_cast<uint32_t>(EventType::WRITE)) {
     493              :                 auto it = write_callbacks_.find(fd);
     494              :                 if (it != write_callbacks_.end() && it->second) it->second(fd);
     495              :             }
     496              : #endif
     497              :         }
     498              :     }
     499              : }
     500              : } // fix40 名称空间结束
        

Generated by: LCOV version 2.0-1