13#include <unordered_map>
25#include <sys/timerfd.h>
26#include <sys/eventfd.h>
31#error "Unsupported platform for Reactor"
86 using Task = std::function<void()>;
179 void do_modify_fd(
int fd, uint32_t event_mask,
FdCallback write_cb);
184 void do_add_timer(
int interval_ms,
FdCallback cb);
189 void do_remove_fd(
int fd);
194 void process_tasks();
207 std::atomic<bool> running_;
208 std::unordered_map<int, FdCallback> callbacks_;
209 std::unordered_map<int, FdCallback> write_callbacks_;
210 std::vector<int> timer_fds_;
219 io_fd_ = epoll_create1(0);
220 if (io_fd_ == -1)
throw std::runtime_error(
"Failed to create epoll instance");
221 wakeup_fd_ = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
222 if (wakeup_fd_ == -1) {
224 throw std::runtime_error(
"Failed to create eventfd for wakeup");
226 do_add_fd(wakeup_fd_, [
this](
int fd){
228 ssize_t n = read(fd, &one,
sizeof(one));
229 if (n !=
sizeof(one)) {
235 if (io_fd_ == -1)
throw std::runtime_error(
"Failed to create kqueue instance");
236 if (pipe(wakeup_pipe_) == -1) {
238 throw std::runtime_error(
"Failed to create pipe for reactor wakeup");
240 do_add_fd(wakeup_pipe_[0],
nullptr);
248 for (
int tfd : timer_fds_) {
252 close(wakeup_pipe_[0]);
253 close(wakeup_pipe_[1]);
257inline void Reactor::wakeup() {
260 ssize_t n = write(wakeup_fd_, &one,
sizeof(one));
261 if (n !=
sizeof(one)) {
266 if (write(wakeup_pipe_[1], &buf, 1) < 0) {
273 tasks_.
enqueue([
this, fd, cb = std::move(cb)]() {
280 tasks_.
enqueue([
this, fd, event_mask, cb = std::move(write_cb)]() {
281 do_modify_fd(fd, event_mask, cb);
287 tasks_.
enqueue([
this, interval_ms, cb = std::move(cb)]() {
288 do_add_timer(interval_ms, cb);
300inline void Reactor::do_add_fd(
int fd, FdCallback cb) {
303 event.events = EPOLLIN | EPOLLET;
305 if (epoll_ctl(io_fd_, EPOLL_CTL_ADD, fd, &event) == -1) {
306 perror(
"epoll_ctl(ADD) failed");
310 struct kevent change_event;
311 EV_SET(&change_event, fd, EVFILT_READ, EV_ADD | EV_CLEAR, 0, 0,
nullptr);
312 if (kevent(io_fd_, &change_event, 1,
nullptr, 0,
nullptr) == -1) {
313 perror(
"kevent(ADD) failed");
318 callbacks_[fd] = std::move(cb);
322inline void Reactor::do_modify_fd(
int fd, uint32_t event_mask, FdCallback write_cb) {
326 event.events = EPOLLET;
327 if (
static_cast<uint32_t
>(event_mask) &
static_cast<uint32_t
>(
EventType::READ)) {
328 event.events |= EPOLLIN;
330 if (
static_cast<uint32_t
>(event_mask) &
static_cast<uint32_t
>(
EventType::WRITE)) {
331 event.events |= EPOLLOUT;
333 if (epoll_ctl(io_fd_, EPOLL_CTL_MOD, fd, &event) == -1) {
334 if (errno != ENOENT) {
335 perror(
"epoll_ctl(MOD) failed");
340 struct kevent change;
341 if (
static_cast<uint32_t
>(event_mask) &
static_cast<uint32_t
>(
EventType::WRITE)) {
342 EV_SET(&change, fd, EVFILT_WRITE, EV_ADD | EV_CLEAR, 0, 0,
nullptr);
344 EV_SET(&change, fd, EVFILT_WRITE, EV_DELETE, 0, 0,
nullptr);
346 if (kevent(io_fd_, &change, 1,
nullptr, 0,
nullptr) == -1) {
347 perror(
"kevent(MOD-WRITE) failed");
350 if (
static_cast<uint32_t
>(event_mask) &
static_cast<uint32_t
>(
EventType::WRITE)) {
351 write_callbacks_[fd] = std::move(write_cb);
353 write_callbacks_.erase(fd);
357inline void Reactor::do_add_timer(
int interval_ms, FdCallback cb) {
359 int tfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
361 perror(
"timerfd_create failed");
365 ts.it_value.tv_sec = interval_ms / 1000;
366 ts.it_value.tv_nsec = (long)(interval_ms % 1000) * 1000000L;
367 ts.it_interval = ts.it_value;
368 if (timerfd_settime(tfd, 0, &ts,
nullptr) == -1) {
369 perror(
"timerfd_settime failed");
373 timer_fds_.push_back(tfd);
374 do_add_fd(tfd, std::move(cb));
376 struct kevent change_event;
377 EV_SET(&change_event, interval_ms, EVFILT_TIMER, EV_ADD | EV_ENABLE, 0, interval_ms,
nullptr);
378 if (kevent(io_fd_, &change_event, 1,
nullptr, 0,
nullptr) == -1) {
379 perror(
"kevent(ADD TIMER) failed");
382 callbacks_[-interval_ms] = std::move(cb);
386inline void Reactor::do_remove_fd(
int fd) {
388 if (epoll_ctl(io_fd_, EPOLL_CTL_DEL, fd,
nullptr) == -1) {
391 if (errno != ENOENT && errno != EBADF) {
392 perror(
"epoll_ctl(DEL) failed");
396 struct kevent change_event;
397 EV_SET(&change_event, fd, EVFILT_READ, EV_DELETE, 0, 0,
nullptr);
398 if (kevent(io_fd_, &change_event, 1,
nullptr, 0,
nullptr) == -1 && errno != ENOENT && errno != EBADF) {
399 perror(
"kevent(DEL READ) failed");
401 EV_SET(&change_event, fd, EVFILT_WRITE, EV_DELETE, 0, 0,
nullptr);
402 if (kevent(io_fd_, &change_event, 1,
nullptr, 0,
nullptr) == -1 && errno != ENOENT && errno != EBADF) {
403 perror(
"kevent(DEL WRITE) failed");
406 callbacks_.erase(fd);
407 write_callbacks_.erase(fd);
412 if (!running_.exchange(
false)) {
419 return running_.load(std::memory_order_acquire);
422inline void Reactor::process_tasks() {
430 running_.store(
true, std::memory_order_release);
432 std::vector<epoll_event> events(128);
434 std::vector<struct kevent> events(128);
437 while (running_.load(std::memory_order_acquire)) {
441 int n_events = epoll_wait(io_fd_, events.data(), events.size(), -1);
443 int n_events = kevent(io_fd_,
nullptr, 0, events.data(), events.size(),
nullptr);
445 if (!running_)
break;
448 if (errno == EINTR)
continue;
449 std::cerr <<
"epoll_wait/kevent failed: " << strerror(errno) << std::endl;
453 for (
int i = 0; i < n_events; ++i) {
455 int fd = events[i].data.fd;
456 if (fd == wakeup_fd_) {
458 read(wakeup_fd_, &one,
sizeof(one));
461 uint32_t active_events = events[i].events;
463 int fd = events[i].ident;
464 if (fd == wakeup_pipe_[0]) {
466 read(wakeup_pipe_[0], buf, 1);
469 uint32_t active_events = 0;
470 if (events[i].filter == EVFILT_READ) active_events |=
static_cast<uint32_t
>(
EventType::READ);
471 if (events[i].filter == EVFILT_WRITE) active_events |=
static_cast<uint32_t
>(
EventType::WRITE);
472 if (events[i].filter == EVFILT_TIMER) {
473 fd = -
static_cast<int>(events[i].ident);
479 if (active_events & (EPOLLIN | EPOLLERR | EPOLLHUP)) {
480 auto it = callbacks_.find(fd);
481 if (it != callbacks_.end() && it->second) it->second(fd);
483 if (active_events & EPOLLOUT) {
484 auto it = write_callbacks_.find(fd);
485 if (it != write_callbacks_.end() && it->second) it->second(fd);
489 auto it = callbacks_.find(fd);
490 if (it != callbacks_.end() && it->second) it->second(fd);
493 auto it = write_callbacks_.find(fd);
494 if (it != write_callbacks_.end() && it->second) it->second(fd);
基于 Reactor 模式的事件循环
Definition reactor.hpp:81
~Reactor()
析构 Reactor
Definition reactor.hpp:244
void run()
启动事件循环
Definition reactor.hpp:429
void stop()
停止事件循环
Definition reactor.hpp:411
void add_fd(int fd, FdCallback cb)
注册文件描述符的读事件
Definition reactor.hpp:272
void modify_fd(int fd, uint32_t event_mask, FdCallback write_cb)
修改文件描述符的事件监听
Definition reactor.hpp:279
Reactor()
构造 Reactor
Definition reactor.hpp:217
void remove_fd(int fd)
移除文件描述符
Definition reactor.hpp:293
std::function< void()> Task
异步任务类型
Definition reactor.hpp:86
void add_timer(int interval_ms, FdCallback cb)
添加周期性定时器
Definition reactor.hpp:286
bool is_running() const
检查事件循环是否正在运行
Definition reactor.hpp:418
std::function< void(int)> FdCallback
文件描述符事件回调类型,参数为触发事件的 fd
Definition reactor.hpp:84
Definition concurrentqueue.h:768
bool try_dequeue(U &item)
Definition concurrentqueue.h:1125
bool enqueue(T const &item)
Definition concurrentqueue.h:995
Definition matching_engine.hpp:23
EventType
I/O 事件类型
Definition reactor.hpp:40