FIX 4.0 Demo 1.0
Loading...
Searching...
No Matches
thread_pool.hpp
Go to the documentation of this file.
1
9#pragma once
10
11#include <vector>
12#include <thread>
13#include <functional>
14#include <future>
15#include <type_traits>
16#include <atomic>
17
19
20namespace fix40 {
21
49public:
56 explicit ThreadPool(size_t threads);
57
64
75 void enqueue_to(size_t thread_index, std::function<void()> task);
76
89 template<class F, class... Args>
90 auto enqueue(F&& f, Args&&... args)
91 -> std::future<std::invoke_result_t<F, Args...>>;
92
97 size_t get_thread_count() const { return thread_count_; }
98
99private:
100 std::vector<std::thread> workers_;
102 std::vector<std::unique_ptr<moodycamel::BlockingConcurrentQueue<std::function<void()>>>> task_queues_;
103 std::atomic<bool> stop_;
104 size_t thread_count_;
105};
106
107// --- 实现 ---
108
109inline ThreadPool::ThreadPool(size_t threads) : stop_(false), thread_count_(threads) {
110 // 为每个线程创建独立的任务队列
111 for (size_t i = 0; i < threads; ++i) {
112 task_queues_.push_back(
113 std::make_unique<moodycamel::BlockingConcurrentQueue<std::function<void()>>>()
114 );
115 }
116
117 // 创建工作线程,每个线程只从自己的队列取任务
118 for (size_t i = 0; i < threads; ++i) {
119 workers_.emplace_back([this, i] {
120 auto& my_queue = *task_queues_[i];
121 while (true) {
122 std::function<void()> task;
123 my_queue.wait_dequeue(task);
124 if (!task) break; // 空任务表示退出
125 task();
126 }
127 });
128 }
129}
130
131inline void ThreadPool::enqueue_to(size_t thread_index, std::function<void()> task) {
132 if (stop_) return;
133 if (thread_index >= thread_count_) {
134 thread_index = thread_index % thread_count_;
135 }
136 task_queues_[thread_index]->enqueue(std::move(task));
137}
138
139template<class F, class... Args>
140auto ThreadPool::enqueue(F&& f, Args&&... args)
141 -> std::future<std::invoke_result_t<F, Args...>> {
142
143 using return_type = std::invoke_result_t<F, Args...>;
144
145 auto task = std::make_shared<std::packaged_task<return_type()>>(
146 std::bind(std::forward<F>(f), std::forward<Args>(args)...)
147 );
148
149 std::future<return_type> res = task->get_future();
150
151 if (stop_) {
152 throw std::runtime_error("enqueue on stopped ThreadPool");
153 }
154
155 // 轮询分配到各个线程(简单的负载均衡)
156 static std::atomic<size_t> next_thread{0};
157 size_t thread_index = next_thread.fetch_add(1) % thread_count_;
158 task_queues_[thread_index]->enqueue([task]() { (*task)(); });
159
160 return res;
161}
162
164 stop_ = true;
165 // 向每个线程的队列发送空任务,唤醒并退出
166 for (size_t i = 0; i < thread_count_; ++i) {
167 task_queues_[i]->enqueue(nullptr);
168 }
169 for (std::thread& worker : workers_) {
170 if (worker.joinable()) {
171 worker.join();
172 }
173 }
174}
175
176} // namespace fix40
支持"连接绑定线程"的线程池
Definition thread_pool.hpp:48
auto enqueue(F &&f, Args &&... args) -> std::future< std::invoke_result_t< F, Args... > >
提交任务到任意空闲线程
Definition thread_pool.hpp:140
ThreadPool(size_t threads)
构造线程池
Definition thread_pool.hpp:109
size_t get_thread_count() const
获取线程池中的线程数量
Definition thread_pool.hpp:97
void enqueue_to(size_t thread_index, std::function< void()> task)
提交任务到指定线程
Definition thread_pool.hpp:131
~ThreadPool()
析构线程池
Definition thread_pool.hpp:163
Definition blockingconcurrentqueue.h:26
Definition matching_engine.hpp:23