Line data Source code
1 : /**
2 : * @file timing_wheel.hpp
3 : * @brief 时间轮定时器实现
4 : *
5 : * 提供高效的定时任务管理,支持一次性和周期性任务,
6 : * 适用于心跳检测、超时管理等场景。
7 : */
8 :
9 : #pragma once
10 :
11 : #include <vector>
12 : #include <list>
13 : #include <functional>
14 : #include <memory>
15 : #include <mutex>
16 : #include <unordered_map>
17 : #include <atomic>
18 : #include <climits>
19 :
20 : namespace fix40 {
21 :
22 : /// 定时任务回调函数类型
23 : using TimerTask = std::function<void()>;
24 : /// 定时任务唯一标识符类型
25 : using TimerTaskId = uint64_t;
26 :
27 : /// 最大安全延迟时间(毫秒),防止整数溢出
28 : constexpr int MAX_SAFE_DELAY_MS = INT_MAX / 1000;
29 : /// 无效的定时任务 ID
30 : constexpr TimerTaskId INVALID_TIMER_ID = 0;
31 :
32 : /**
33 : * @class TimingWheel
34 : * @brief 支持周期性任务的时间轮定时器
35 : *
36 : * 时间轮是一种高效的定时器实现,将时间划分为固定数量的槽位,
37 : * 每个槽位存储在该时刻到期的任务列表。
38 : *
39 : * @par 特性
40 : * - O(1) 时间复杂度添加任务
41 : * - 支持一次性任务和周期性任务
42 : * - 支持取消任务
43 : * - 线程安全
44 : *
45 : * @par 工作原理
46 : * 1. 时间轮由 N 个槽位组成,每个槽位代表一个时间间隔
47 : * 2. 指针每次 tick() 前进一格,执行当前槽位的到期任务
48 : * 3. 对于延迟超过一圈的任务,使用 remaining_laps 记录剩余圈数
49 : *
50 : * @par 使用示例
51 : * @code
52 : * TimingWheel wheel(60, 1000); // 60 个槽,每槽 1 秒
53 : *
54 : * // 添加一次性任务,5 秒后执行
55 : * auto id = wheel.add_task(5000, []() { std::cout << "Timeout!" << std::endl; });
56 : *
57 : * // 添加周期性任务,每 30 秒执行一次
58 : * wheel.add_periodic_task(30000, []() { send_heartbeat(); });
59 : *
60 : * // 取消任务
61 : * wheel.cancel_task(id);
62 : *
63 : * // 在定时器回调中驱动时间轮
64 : * reactor.add_timer(1000, [&wheel](int) { wheel.tick(); });
65 : * @endcode
66 : */
67 : class TimingWheel {
68 : public:
69 14 : TimingWheel(int wheel_size, int tick_interval_ms)
70 14 : : wheel_size_(wheel_size),
71 14 : tick_interval_ms_(tick_interval_ms),
72 14 : wheel_(wheel_size),
73 28 : next_task_id_(1) {}
74 :
75 : /**
76 : * @brief 添加一次性任务
77 : * @param delay_ms 延迟时间(毫秒),必须大于 0 且不超过 MAX_SAFE_DELAY_MS
78 : * @param task 任务回调函数
79 : * @return TimerTaskId 任务 ID,可用于取消任务;失败返回 INVALID_TIMER_ID
80 : *
81 : * @note 任务将在约 delay_ms 毫秒后执行一次,然后自动移除
82 : */
83 112 : TimerTaskId add_task(int delay_ms, TimerTask task) {
84 112 : return add_task_internal(delay_ms, std::move(task), false);
85 : }
86 :
87 : /**
88 : * @brief 添加周期性任务
89 : * @param interval_ms 执行间隔(毫秒),必须大于 0 且不超过 MAX_SAFE_DELAY_MS
90 : * @param task 任务回调函数
91 : * @return TimerTaskId 任务 ID,可用于取消任务;失败返回 INVALID_TIMER_ID
92 : *
93 : * @note 任务将每隔 interval_ms 毫秒执行一次,直到被取消
94 : */
95 4 : TimerTaskId add_periodic_task(int interval_ms, TimerTask task) {
96 4 : return add_task_internal(interval_ms, std::move(task), true);
97 : }
98 :
99 : /**
100 : * @brief 取消任务
101 : * @param id 要取消的任务 ID
102 : *
103 : * 标记任务为已取消状态,任务将在下次 tick() 时被清理。
104 : * 如果 id 为 INVALID_TIMER_ID 或任务不存在,则无操作。
105 : */
106 5 : void cancel_task(TimerTaskId id) {
107 5 : if (id == INVALID_TIMER_ID) return;
108 :
109 4 : std::lock_guard<std::mutex> lock(mutex_);
110 4 : auto it = task_map_.find(id);
111 4 : if (it != task_map_.end()) {
112 3 : it->second->cancelled = true;
113 : }
114 4 : }
115 :
116 : /**
117 : * @brief 时间轮前进一格,执行到期任务
118 : *
119 : * 该方法应由外部定时器周期性调用,调用间隔应等于 tick_interval_ms。
120 : *
121 : * @par 执行流程
122 : * 1. 指针前进一格
123 : * 2. 遍历当前槽位的任务列表
124 : * 3. 跳过已取消的任务
125 : * 4. 对于 remaining_laps > 0 的任务,减少圈数
126 : * 5. 执行到期任务
127 : * 6. 周期性任务重新调度到下一周期
128 : *
129 : * @note 任务回调在锁外执行,避免死锁
130 : */
131 95 : void tick() {
132 95 : std::list<std::shared_ptr<TimerNode>> tasks_to_run;
133 95 : std::list<std::shared_ptr<TimerNode>> tasks_to_reschedule;
134 :
135 : {
136 95 : std::lock_guard<std::mutex> lock(mutex_);
137 :
138 95 : current_tick_ = (current_tick_ + 1) % wheel_size_;
139 95 : auto& slot = wheel_[current_tick_];
140 :
141 123 : for (auto it = slot.begin(); it != slot.end(); ) {
142 28 : auto& node = *it;
143 :
144 : // 已取消的任务,从映射中移除
145 28 : if (node->cancelled) {
146 3 : task_map_.erase(node->id);
147 3 : it = slot.erase(it);
148 3 : continue;
149 : }
150 :
151 25 : if (node->remaining_laps > 0) {
152 5 : node->remaining_laps--;
153 5 : ++it;
154 : } else {
155 : // 任务到期
156 20 : tasks_to_run.push_back(node);
157 :
158 : // 周期性任务需要重新调度
159 20 : if (node->is_periodic) {
160 12 : tasks_to_reschedule.push_back(node);
161 : } else {
162 : // 一次性任务,从映射中移除
163 8 : task_map_.erase(node->id);
164 : }
165 :
166 20 : it = slot.erase(it);
167 : }
168 : }
169 :
170 : // 重新调度周期性任务
171 107 : for (auto& node : tasks_to_reschedule) {
172 12 : if (!node->cancelled) {
173 12 : int target_slot = (current_tick_ + node->interval_ticks) % wheel_size_;
174 12 : node->remaining_laps = (node->interval_ticks - 1) / wheel_size_;
175 12 : wheel_[target_slot].push_back(node);
176 : }
177 : }
178 95 : }
179 :
180 : // 在锁外执行任务,避免死锁
181 115 : for (auto& node : tasks_to_run) {
182 20 : if (!node->cancelled && node->task) {
183 20 : node->task();
184 : }
185 : }
186 95 : }
187 :
188 : private:
189 : /**
190 : * @struct TimerNode
191 : * @brief 定时任务节点
192 : */
193 : struct TimerNode {
194 : TimerTaskId id; ///< 任务唯一标识符
195 : int remaining_laps; ///< 剩余圈数(用于延迟超过一圈的任务)
196 : int interval_ticks; ///< 周期间隔(tick 数),用于周期性任务重新调度
197 : bool is_periodic; ///< 是否为周期性任务
198 : bool cancelled; ///< 是否已取消
199 : TimerTask task; ///< 任务回调函数
200 :
201 : /**
202 : * @brief 构造定时任务节点
203 : */
204 113 : TimerNode(TimerTaskId id_, int laps, int interval, bool periodic, TimerTask t)
205 113 : : id(id_), remaining_laps(laps), interval_ticks(interval),
206 113 : is_periodic(periodic), cancelled(false), task(std::move(t)) {}
207 : };
208 :
209 : /**
210 : * @brief 内部添加任务实现
211 : * @param delay_ms 延迟/间隔时间(毫秒)
212 : * @param task 任务回调
213 : * @param periodic 是否为周期性任务
214 : * @return TimerTaskId 任务 ID
215 : */
216 116 : TimerTaskId add_task_internal(int delay_ms, TimerTask task, bool periodic) {
217 116 : if (delay_ms <= 0 || !task) {
218 3 : return INVALID_TIMER_ID;
219 : }
220 :
221 113 : if (delay_ms > MAX_SAFE_DELAY_MS) {
222 0 : return INVALID_TIMER_ID;
223 : }
224 :
225 113 : int ticks_to_wait = (delay_ms + tick_interval_ms_ - 1) / tick_interval_ms_;
226 :
227 113 : std::lock_guard<std::mutex> lock(mutex_);
228 :
229 113 : TimerTaskId id = next_task_id_++;
230 113 : int remaining_laps = (ticks_to_wait - 1) / wheel_size_;
231 113 : int target_slot = (current_tick_ + ticks_to_wait) % wheel_size_;
232 :
233 : auto node = std::make_shared<TimerNode>(
234 113 : id, remaining_laps, ticks_to_wait, periodic, std::move(task)
235 113 : );
236 :
237 113 : wheel_[target_slot].push_back(node);
238 113 : task_map_[id] = node;
239 :
240 113 : return id;
241 113 : }
242 :
243 : const int wheel_size_; ///< 时间轮槽位数量
244 : const int tick_interval_ms_; ///< 每个槽位代表的时间间隔(毫秒)
245 : int current_tick_ = 0; ///< 当前指针位置
246 :
247 : /// 时间轮槽位数组,每个槽位是一个任务链表
248 : std::vector<std::list<std::shared_ptr<TimerNode>>> wheel_;
249 : /// 任务 ID 到任务节点的映射,用于快速查找和取消
250 : std::unordered_map<TimerTaskId, std::shared_ptr<TimerNode>> task_map_;
251 : /// 下一个可用的任务 ID(原子递增)
252 : std::atomic<TimerTaskId> next_task_id_;
253 : /// 保护时间轮数据结构的互斥锁
254 : std::mutex mutex_;
255 : };
256 :
257 : } // namespace fix40
|