FIX 4.0 Demo 1.0
Loading...
Searching...
No Matches
timing_wheel.hpp
Go to the documentation of this file.
1
9#pragma once
10
11#include <vector>
12#include <list>
13#include <functional>
14#include <memory>
15#include <mutex>
16#include <unordered_map>
17#include <atomic>
18#include <climits>
19
20namespace fix40 {
21
23using TimerTask = std::function<void()>;
25using TimerTaskId = uint64_t;
26
28constexpr int MAX_SAFE_DELAY_MS = INT_MAX / 1000;
31
68public:
69 TimingWheel(int wheel_size, int tick_interval_ms)
70 : wheel_size_(wheel_size),
71 tick_interval_ms_(tick_interval_ms),
72 wheel_(wheel_size),
73 next_task_id_(1) {}
74
83 TimerTaskId add_task(int delay_ms, TimerTask task) {
84 return add_task_internal(delay_ms, std::move(task), false);
85 }
86
95 TimerTaskId add_periodic_task(int interval_ms, TimerTask task) {
96 return add_task_internal(interval_ms, std::move(task), true);
97 }
98
107 if (id == INVALID_TIMER_ID) return;
108
109 std::lock_guard<std::mutex> lock(mutex_);
110 auto it = task_map_.find(id);
111 if (it != task_map_.end()) {
112 it->second->cancelled = true;
113 }
114 }
115
131 void tick() {
132 std::list<std::shared_ptr<TimerNode>> tasks_to_run;
133 std::list<std::shared_ptr<TimerNode>> tasks_to_reschedule;
134
135 {
136 std::lock_guard<std::mutex> lock(mutex_);
137
138 current_tick_ = (current_tick_ + 1) % wheel_size_;
139 auto& slot = wheel_[current_tick_];
140
141 for (auto it = slot.begin(); it != slot.end(); ) {
142 auto& node = *it;
143
144 // 已取消的任务,从映射中移除
145 if (node->cancelled) {
146 task_map_.erase(node->id);
147 it = slot.erase(it);
148 continue;
149 }
150
151 if (node->remaining_laps > 0) {
152 node->remaining_laps--;
153 ++it;
154 } else {
155 // 任务到期
156 tasks_to_run.push_back(node);
157
158 // 周期性任务需要重新调度
159 if (node->is_periodic) {
160 tasks_to_reschedule.push_back(node);
161 } else {
162 // 一次性任务,从映射中移除
163 task_map_.erase(node->id);
164 }
165
166 it = slot.erase(it);
167 }
168 }
169
170 // 重新调度周期性任务
171 for (auto& node : tasks_to_reschedule) {
172 if (!node->cancelled) {
173 int target_slot = (current_tick_ + node->interval_ticks) % wheel_size_;
174 node->remaining_laps = (node->interval_ticks - 1) / wheel_size_;
175 wheel_[target_slot].push_back(node);
176 }
177 }
178 }
179
180 // 在锁外执行任务,避免死锁
181 for (auto& node : tasks_to_run) {
182 if (!node->cancelled && node->task) {
183 node->task();
184 }
185 }
186 }
187
188private:
193 struct TimerNode {
194 TimerTaskId id;
195 int remaining_laps;
196 int interval_ticks;
197 bool is_periodic;
198 bool cancelled;
199 TimerTask task;
200
204 TimerNode(TimerTaskId id_, int laps, int interval, bool periodic, TimerTask t)
205 : id(id_), remaining_laps(laps), interval_ticks(interval),
206 is_periodic(periodic), cancelled(false), task(std::move(t)) {}
207 };
208
216 TimerTaskId add_task_internal(int delay_ms, TimerTask task, bool periodic) {
217 if (delay_ms <= 0 || !task) {
218 return INVALID_TIMER_ID;
219 }
220
221 if (delay_ms > MAX_SAFE_DELAY_MS) {
222 return INVALID_TIMER_ID;
223 }
224
225 int ticks_to_wait = (delay_ms + tick_interval_ms_ - 1) / tick_interval_ms_;
226
227 std::lock_guard<std::mutex> lock(mutex_);
228
229 TimerTaskId id = next_task_id_++;
230 int remaining_laps = (ticks_to_wait - 1) / wheel_size_;
231 int target_slot = (current_tick_ + ticks_to_wait) % wheel_size_;
232
233 auto node = std::make_shared<TimerNode>(
234 id, remaining_laps, ticks_to_wait, periodic, std::move(task)
235 );
236
237 wheel_[target_slot].push_back(node);
238 task_map_[id] = node;
239
240 return id;
241 }
242
243 const int wheel_size_;
244 const int tick_interval_ms_;
245 int current_tick_ = 0;
246
248 std::vector<std::list<std::shared_ptr<TimerNode>>> wheel_;
250 std::unordered_map<TimerTaskId, std::shared_ptr<TimerNode>> task_map_;
252 std::atomic<TimerTaskId> next_task_id_;
254 std::mutex mutex_;
255};
256
257} // namespace fix40
支持周期性任务的时间轮定时器
Definition timing_wheel.hpp:67
void cancel_task(TimerTaskId id)
取消任务
Definition timing_wheel.hpp:106
void tick()
时间轮前进一格,执行到期任务
Definition timing_wheel.hpp:131
TimerTaskId add_task(int delay_ms, TimerTask task)
添加一次性任务
Definition timing_wheel.hpp:83
TimerTaskId add_periodic_task(int interval_ms, TimerTask task)
添加周期性任务
Definition timing_wheel.hpp:95
TimingWheel(int wheel_size, int tick_interval_ms)
Definition timing_wheel.hpp:69
Definition matching_engine.hpp:23
uint64_t TimerTaskId
定时任务唯一标识符类型
Definition timing_wheel.hpp:25
constexpr int MAX_SAFE_DELAY_MS
最大安全延迟时间(毫秒),防止整数溢出
Definition timing_wheel.hpp:28
std::function< void()> TimerTask
定时任务回调函数类型
Definition timing_wheel.hpp:23
constexpr TimerTaskId INVALID_TIMER_ID
无效的定时任务 ID
Definition timing_wheel.hpp:30