Line data Source code
1 : /**
2 : * @file thread_pool.hpp
3 : * @brief 支持连接绑定的线程池实现
4 : *
5 : * 提供高性能的线程池,支持将任务派发到指定线程执行,
6 : * 实现"连接绑定线程"模型,避免同一连接的操作产生锁竞争。
7 : */
8 :
9 : #pragma once
10 :
11 : #include <vector>
12 : #include <thread>
13 : #include <functional>
14 : #include <future>
15 : #include <type_traits>
16 : #include <atomic>
17 :
18 : #include "base/blockingconcurrentqueue.h"
19 :
20 : namespace fix40 {
21 :
22 : /**
23 : * @class ThreadPool
24 : * @brief 支持"连接绑定线程"的线程池
25 : *
26 : * 每个工作线程有独立的任务队列,可以指定任务在哪个线程执行。
27 : * 这样同一个连接的所有操作都在同一个线程中串行执行,避免锁竞争。
28 : *
29 : * @par 设计特点
30 : * - 每个线程独立的无锁任务队列(moodycamel::BlockingConcurrentQueue)
31 : * - 支持指定线程执行任务(enqueue_to)
32 : * - 支持轮询分配任务(enqueue)
33 : * - 优雅关闭:发送空任务通知线程退出
34 : *
35 : * @par 使用示例
36 : * @code
37 : * ThreadPool pool(4);
38 : *
39 : * // 将任务派发到指定线程(连接绑定场景)
40 : * pool.enqueue_to(connection_fd % pool.get_thread_count(), [&]() {
41 : * handle_connection(connection_fd);
42 : * });
43 : *
44 : * // 提交任务到任意线程
45 : * auto future = pool.enqueue([]() { return compute_result(); });
46 : * @endcode
47 : */
48 : class ThreadPool {
49 : public:
50 : /**
51 : * @brief 构造线程池
52 : * @param threads 工作线程数量
53 : *
54 : * 创建指定数量的工作线程,每个线程有独立的任务队列。
55 : */
56 : explicit ThreadPool(size_t threads);
57 :
58 : /**
59 : * @brief 析构线程池
60 : *
61 : * 向每个线程发送空任务以通知退出,然后等待所有线程结束。
62 : */
63 : ~ThreadPool();
64 :
65 : /**
66 : * @brief 提交任务到指定线程
67 : * @param thread_index 目标线程索引(会自动取模)
68 : * @param task 要执行的任务
69 : *
70 : * 用于连接绑定场景,确保同一连接的所有操作在同一线程串行执行。
71 : *
72 : * @note 如果 thread_index >= thread_count_,会自动取模
73 : * @note 线程池停止后调用此方法无效
74 : */
75 : void enqueue_to(size_t thread_index, std::function<void()> task);
76 :
77 : /**
78 : * @brief 提交任务到任意空闲线程
79 : * @tparam F 可调用对象类型
80 : * @tparam Args 参数类型
81 : * @param f 可调用对象
82 : * @param args 调用参数
83 : * @return std::future 用于获取任务返回值
84 : *
85 : * 使用轮询方式分配任务到各线程,实现简单的负载均衡。
86 : *
87 : * @throws std::runtime_error 如果线程池已停止
88 : */
89 : template<class F, class... Args>
90 : auto enqueue(F&& f, Args&&... args)
91 : -> std::future<std::invoke_result_t<F, Args...>>;
92 :
93 : /**
94 : * @brief 获取线程池中的线程数量
95 : * @return size_t 线程数量
96 : */
97 3 : size_t get_thread_count() const { return thread_count_; }
98 :
99 : private:
100 : std::vector<std::thread> workers_; ///< 工作线程数组
101 : /// 每个线程独立的任务队列
102 : std::vector<std::unique_ptr<moodycamel::BlockingConcurrentQueue<std::function<void()>>>> task_queues_;
103 : std::atomic<bool> stop_; ///< 停止标志
104 : size_t thread_count_; ///< 线程数量
105 : };
106 :
107 : // --- 实现 ---
108 :
109 12 : inline ThreadPool::ThreadPool(size_t threads) : stop_(false), thread_count_(threads) {
110 : // 为每个线程创建独立的任务队列
111 53 : for (size_t i = 0; i < threads; ++i) {
112 41 : task_queues_.push_back(
113 82 : std::make_unique<moodycamel::BlockingConcurrentQueue<std::function<void()>>>()
114 : );
115 : }
116 :
117 : // 创建工作线程,每个线程只从自己的队列取任务
118 53 : for (size_t i = 0; i < threads; ++i) {
119 41 : workers_.emplace_back([this, i] {
120 41 : auto& my_queue = *task_queues_[i];
121 : while (true) {
122 1172 : std::function<void()> task;
123 1172 : my_queue.wait_dequeue(task);
124 1172 : if (!task) break; // 空任务表示退出
125 1131 : task();
126 2303 : }
127 41 : });
128 : }
129 12 : }
130 :
131 26 : inline void ThreadPool::enqueue_to(size_t thread_index, std::function<void()> task) {
132 26 : if (stop_) return;
133 26 : if (thread_index >= thread_count_) {
134 1 : thread_index = thread_index % thread_count_;
135 : }
136 26 : task_queues_[thread_index]->enqueue(std::move(task));
137 : }
138 :
139 : template<class F, class... Args>
140 1105 : auto ThreadPool::enqueue(F&& f, Args&&... args)
141 : -> std::future<std::invoke_result_t<F, Args...>> {
142 :
143 : using return_type = std::invoke_result_t<F, Args...>;
144 :
145 1105 : auto task = std::make_shared<std::packaged_task<return_type()>>(
146 1104 : std::bind(std::forward<F>(f), std::forward<Args>(args)...)
147 : );
148 :
149 1105 : std::future<return_type> res = task->get_future();
150 :
151 1105 : if (stop_) {
152 0 : throw std::runtime_error("enqueue on stopped ThreadPool");
153 : }
154 :
155 : // 轮询分配到各个线程(简单的负载均衡)
156 : static std::atomic<size_t> next_thread{0};
157 1105 : size_t thread_index = next_thread.fetch_add(1) % thread_count_;
158 2210 : task_queues_[thread_index]->enqueue([task]() { (*task)(); });
159 :
160 2210 : return res;
161 1105 : }
162 :
163 12 : inline ThreadPool::~ThreadPool() {
164 12 : stop_ = true;
165 : // 向每个线程的队列发送空任务,唤醒并退出
166 53 : for (size_t i = 0; i < thread_count_; ++i) {
167 41 : task_queues_[i]->enqueue(nullptr);
168 : }
169 53 : for (std::thread& worker : workers_) {
170 41 : if (worker.joinable()) {
171 41 : worker.join();
172 : }
173 : }
174 12 : }
175 :
176 : } // namespace fix40
|