FIX 4.0 Demo 1.0
Loading...
Searching...
No Matches
reactor.hpp
Go to the documentation of this file.
1
9#pragma once
10
11#include <functional>
12#include <vector>
13#include <unordered_map>
14#include <atomic>
15#include <cerrno>
16#include <cstring>
17#include <iostream>
18#include <unistd.h> // 为 read, write, close 提供 POSIX 函数声明
19
20#include "base/concurrentqueue.h" // 使用 moodycamel 的无锁队列
21
22// --- 面向各平台的 I/O 多路处理包含 ---
23#ifdef __linux__
24#include <sys/epoll.h>
25#include <sys/timerfd.h>
26#include <sys/eventfd.h> // 使用 eventfd 来唤醒
27#elif __APPLE__
28#include <sys/event.h>
29#include <sys/time.h>
30#else
31#error "Unsupported platform for Reactor"
32#endif
33
34namespace fix40 {
35
40enum class EventType : uint32_t {
41 READ = 1,
42 WRITE = 2
43};
44
81class Reactor {
82public:
84 using FdCallback = std::function<void(int)>;
86 using Task = std::function<void()>;
87
95 Reactor();
96
102 ~Reactor();
103
111 void add_fd(int fd, FdCallback cb);
112
121 void modify_fd(int fd, uint32_t event_mask, FdCallback write_cb);
122
130 void add_timer(int interval_ms, FdCallback cb);
131
138 void remove_fd(int fd);
139
151 void run();
152
159 void stop();
160
166 bool is_running() const;
167
168private:
174 void do_add_fd(int fd, FdCallback cb);
175
179 void do_modify_fd(int fd, uint32_t event_mask, FdCallback write_cb);
180
184 void do_add_timer(int interval_ms, FdCallback cb);
185
189 void do_remove_fd(int fd);
190
194 void process_tasks();
195
199 void wakeup();
200
201 int io_fd_;
202#ifdef __linux__
203 int wakeup_fd_;
204#else
205 int wakeup_pipe_[2];
206#endif
207 std::atomic<bool> running_;
208 std::unordered_map<int, FdCallback> callbacks_;
209 std::unordered_map<int, FdCallback> write_callbacks_;
210 std::vector<int> timer_fds_;
211
213};
214
215// --- 实现 ---
216
217inline Reactor::Reactor() : running_(false) {
218#ifdef __linux__
219 io_fd_ = epoll_create1(0);
220 if (io_fd_ == -1) throw std::runtime_error("Failed to create epoll instance");
221 wakeup_fd_ = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
222 if (wakeup_fd_ == -1) {
223 close(io_fd_);
224 throw std::runtime_error("Failed to create eventfd for wakeup");
225 }
226 do_add_fd(wakeup_fd_, [this](int fd){
227 uint64_t one;
228 ssize_t n = read(fd, &one, sizeof(one));
229 if (n != sizeof(one)) {
230 // 错误处理
231 }
232 });
233#elif __APPLE__
234 io_fd_ = kqueue();
235 if (io_fd_ == -1) throw std::runtime_error("Failed to create kqueue instance");
236 if (pipe(wakeup_pipe_) == -1) {
237 close(io_fd_);
238 throw std::runtime_error("Failed to create pipe for reactor wakeup");
239 }
240 do_add_fd(wakeup_pipe_[0], nullptr); // 不需要回调,只为了唤醒
241#endif
242}
243
245 close(io_fd_);
246#ifdef __linux__
247 close(wakeup_fd_);
248 for (int tfd : timer_fds_) {
249 close(tfd);
250 }
251#elif __APPLE__
252 close(wakeup_pipe_[0]);
253 close(wakeup_pipe_[1]);
254#endif
255}
256
257inline void Reactor::wakeup() {
258#ifdef __linux__
259 uint64_t one = 1;
260 ssize_t n = write(wakeup_fd_, &one, sizeof(one));
261 if (n != sizeof(one)) {
262 // 错误处理
263 }
264#elif __APPLE__
265 char buf = 'w';
266 if (write(wakeup_pipe_[1], &buf, 1) < 0) {
267 // 错误处理
268 }
269#endif
270}
271
272inline void Reactor::add_fd(int fd, FdCallback cb) {
273 tasks_.enqueue([this, fd, cb = std::move(cb)]() {
274 do_add_fd(fd, cb);
275 });
276 wakeup();
277}
278
279inline void Reactor::modify_fd(int fd, uint32_t event_mask, FdCallback write_cb) {
280 tasks_.enqueue([this, fd, event_mask, cb = std::move(write_cb)]() {
281 do_modify_fd(fd, event_mask, cb);
282 });
283 wakeup();
284}
285
286inline void Reactor::add_timer(int interval_ms, FdCallback cb) {
287 tasks_.enqueue([this, interval_ms, cb = std::move(cb)]() {
288 do_add_timer(interval_ms, cb);
289 });
290 wakeup();
291}
292
293inline void Reactor::remove_fd(int fd) {
294 tasks_.enqueue([this, fd]() {
295 do_remove_fd(fd);
296 });
297 wakeup();
298}
299
300inline void Reactor::do_add_fd(int fd, FdCallback cb) {
301#ifdef __linux__
302 epoll_event event;
303 event.events = EPOLLIN | EPOLLET;
304 event.data.fd = fd;
305 if (epoll_ctl(io_fd_, EPOLL_CTL_ADD, fd, &event) == -1) {
306 perror("epoll_ctl(ADD) failed");
307 return;
308 }
309#elif __APPLE__
310 struct kevent change_event;
311 EV_SET(&change_event, fd, EVFILT_READ, EV_ADD | EV_CLEAR, 0, 0, nullptr);
312 if (kevent(io_fd_, &change_event, 1, nullptr, 0, nullptr) == -1) {
313 perror("kevent(ADD) failed");
314 return;
315 }
316#endif
317 if (cb) {
318 callbacks_[fd] = std::move(cb);
319 }
320}
321
322inline void Reactor::do_modify_fd(int fd, uint32_t event_mask, FdCallback write_cb) {
323#ifdef __linux__
324 epoll_event event;
325 event.data.fd = fd;
326 event.events = EPOLLET;
327 if (static_cast<uint32_t>(event_mask) & static_cast<uint32_t>(EventType::READ)) {
328 event.events |= EPOLLIN;
329 }
330 if (static_cast<uint32_t>(event_mask) & static_cast<uint32_t>(EventType::WRITE)) {
331 event.events |= EPOLLOUT;
332 }
333 if (epoll_ctl(io_fd_, EPOLL_CTL_MOD, fd, &event) == -1) {
334 if (errno != ENOENT) {
335 perror("epoll_ctl(MOD) failed");
336 }
337 return;
338 }
339#elif __APPLE__
340 struct kevent change;
341 if (static_cast<uint32_t>(event_mask) & static_cast<uint32_t>(EventType::WRITE)) {
342 EV_SET(&change, fd, EVFILT_WRITE, EV_ADD | EV_CLEAR, 0, 0, nullptr);
343 } else {
344 EV_SET(&change, fd, EVFILT_WRITE, EV_DELETE, 0, 0, nullptr);
345 }
346 if (kevent(io_fd_, &change, 1, nullptr, 0, nullptr) == -1) {
347 perror("kevent(MOD-WRITE) failed");
348 }
349#endif
350 if (static_cast<uint32_t>(event_mask) & static_cast<uint32_t>(EventType::WRITE)) {
351 write_callbacks_[fd] = std::move(write_cb);
352 } else {
353 write_callbacks_.erase(fd);
354 }
355}
356
357inline void Reactor::do_add_timer(int interval_ms, FdCallback cb) {
358#ifdef __linux__
359 int tfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
360 if (tfd == -1) {
361 perror("timerfd_create failed");
362 return;
363 }
364 itimerspec ts;
365 ts.it_value.tv_sec = interval_ms / 1000;
366 ts.it_value.tv_nsec = (long)(interval_ms % 1000) * 1000000L;
367 ts.it_interval = ts.it_value;
368 if (timerfd_settime(tfd, 0, &ts, nullptr) == -1) {
369 perror("timerfd_settime failed");
370 close(tfd);
371 return;
372 }
373 timer_fds_.push_back(tfd);
374 do_add_fd(tfd, std::move(cb));
375#elif __APPLE__
376 struct kevent change_event;
377 EV_SET(&change_event, interval_ms, EVFILT_TIMER, EV_ADD | EV_ENABLE, 0, interval_ms, nullptr);
378 if (kevent(io_fd_, &change_event, 1, nullptr, 0, nullptr) == -1) {
379 perror("kevent(ADD TIMER) failed");
380 return;
381 }
382 callbacks_[-interval_ms] = std::move(cb);
383#endif
384}
385
386inline void Reactor::do_remove_fd(int fd) {
387#ifdef __linux__
388 if (epoll_ctl(io_fd_, EPOLL_CTL_DEL, fd, nullptr) == -1) {
389 // ENOENT: fd 不在 epoll 中
390 // EBADF: fd 已关闭(Linux 会自动从 epoll 移除已关闭的 fd)
391 if (errno != ENOENT && errno != EBADF) {
392 perror("epoll_ctl(DEL) failed");
393 }
394 }
395#elif __APPLE__
396 struct kevent change_event;
397 EV_SET(&change_event, fd, EVFILT_READ, EV_DELETE, 0, 0, nullptr);
398 if (kevent(io_fd_, &change_event, 1, nullptr, 0, nullptr) == -1 && errno != ENOENT && errno != EBADF) {
399 perror("kevent(DEL READ) failed");
400 }
401 EV_SET(&change_event, fd, EVFILT_WRITE, EV_DELETE, 0, 0, nullptr);
402 if (kevent(io_fd_, &change_event, 1, nullptr, 0, nullptr) == -1 && errno != ENOENT && errno != EBADF) {
403 perror("kevent(DEL WRITE) failed");
404 }
405#endif
406 callbacks_.erase(fd);
407 write_callbacks_.erase(fd);
408}
409
410
411inline void Reactor::stop() {
412 if (!running_.exchange(false)) {
413 return;
414 }
415 wakeup(); // 唤醒 run() 循环以使其退出
416}
417
418inline bool Reactor::is_running() const {
419 return running_.load(std::memory_order_acquire);
420}
421
422inline void Reactor::process_tasks() {
423 Task task;
424 while(tasks_.try_dequeue(task)) {
425 task();
426 }
427}
428
429inline void Reactor::run() {
430 running_.store(true, std::memory_order_release);
431#ifdef __linux__
432 std::vector<epoll_event> events(128);
433#elif __APPLE__
434 std::vector<struct kevent> events(128);
435#endif
436
437 while (running_.load(std::memory_order_acquire)) {
438 process_tasks(); // 每次循环前处理挂起的任务
439
440#ifdef __linux__
441 int n_events = epoll_wait(io_fd_, events.data(), events.size(), -1);
442#elif __APPLE__
443 int n_events = kevent(io_fd_, nullptr, 0, events.data(), events.size(), nullptr);
444#endif
445 if (!running_) break; // 捕获 stop() 信号后退出
446
447 if (n_events < 0) {
448 if (errno == EINTR) continue;
449 std::cerr << "epoll_wait/kevent failed: " << strerror(errno) << std::endl;
450 break;
451 }
452
453 for (int i = 0; i < n_events; ++i) {
454#ifdef __linux__
455 int fd = events[i].data.fd;
456 if (fd == wakeup_fd_) {
457 uint64_t one;
458 read(wakeup_fd_, &one, sizeof(one)); // 清空 eventfd
459 continue;
460 }
461 uint32_t active_events = events[i].events;
462#elif __APPLE__
463 int fd = events[i].ident;
464 if (fd == wakeup_pipe_[0]) {
465 char buf[1];
466 read(wakeup_pipe_[0], buf, 1);
467 continue;
468 }
469 uint32_t active_events = 0;
470 if (events[i].filter == EVFILT_READ) active_events |= static_cast<uint32_t>(EventType::READ);
471 if (events[i].filter == EVFILT_WRITE) active_events |= static_cast<uint32_t>(EventType::WRITE);
472 if (events[i].filter == EVFILT_TIMER) {
473 fd = -static_cast<int>(events[i].ident);
474 active_events |= static_cast<uint32_t>(EventType::READ);
475 }
476#endif
477
478#ifdef __linux__
479 if (active_events & (EPOLLIN | EPOLLERR | EPOLLHUP)) {
480 auto it = callbacks_.find(fd);
481 if (it != callbacks_.end() && it->second) it->second(fd);
482 }
483 if (active_events & EPOLLOUT) {
484 auto it = write_callbacks_.find(fd);
485 if (it != write_callbacks_.end() && it->second) it->second(fd);
486 }
487#elif __APPLE__
488 if (active_events & static_cast<uint32_t>(EventType::READ)) {
489 auto it = callbacks_.find(fd);
490 if (it != callbacks_.end() && it->second) it->second(fd);
491 }
492 if (active_events & static_cast<uint32_t>(EventType::WRITE)) {
493 auto it = write_callbacks_.find(fd);
494 if (it != write_callbacks_.end() && it->second) it->second(fd);
495 }
496#endif
497 }
498 }
499}
500} // fix40 名称空间结束
基于 Reactor 模式的事件循环
Definition reactor.hpp:81
~Reactor()
析构 Reactor
Definition reactor.hpp:244
void run()
启动事件循环
Definition reactor.hpp:429
void stop()
停止事件循环
Definition reactor.hpp:411
void add_fd(int fd, FdCallback cb)
注册文件描述符的读事件
Definition reactor.hpp:272
void modify_fd(int fd, uint32_t event_mask, FdCallback write_cb)
修改文件描述符的事件监听
Definition reactor.hpp:279
Reactor()
构造 Reactor
Definition reactor.hpp:217
void remove_fd(int fd)
移除文件描述符
Definition reactor.hpp:293
std::function< void()> Task
异步任务类型
Definition reactor.hpp:86
void add_timer(int interval_ms, FdCallback cb)
添加周期性定时器
Definition reactor.hpp:286
bool is_running() const
检查事件循环是否正在运行
Definition reactor.hpp:418
std::function< void(int)> FdCallback
文件描述符事件回调类型,参数为触发事件的 fd
Definition reactor.hpp:84
Definition concurrentqueue.h:768
bool try_dequeue(U &item)
Definition concurrentqueue.h:1125
bool enqueue(T const &item)
Definition concurrentqueue.h:995
Definition matching_engine.hpp:23
EventType
I/O 事件类型
Definition reactor.hpp:40
@ READ
可读事件
@ WRITE
可写事件