75 void enqueue_to(
size_t thread_index, std::function<
void()> task);
89 template<
class F,
class... Args>
90 auto enqueue(F&& f, Args&&... args)
91 -> std::future<std::invoke_result_t<F, Args...>>;
100 std::vector<std::thread> workers_;
103 std::atomic<bool> stop_;
104 size_t thread_count_;
111 for (
size_t i = 0; i < threads; ++i) {
112 task_queues_.push_back(
118 for (
size_t i = 0; i < threads; ++i) {
119 workers_.emplace_back([
this, i] {
120 auto& my_queue = *task_queues_[i];
122 std::function<void()> task;
123 my_queue.wait_dequeue(task);
133 if (thread_index >= thread_count_) {
134 thread_index = thread_index % thread_count_;
136 task_queues_[thread_index]->enqueue(std::move(task));
139template<
class F,
class... Args>
141 -> std::future<std::invoke_result_t<F, Args...>> {
143 using return_type = std::invoke_result_t<F, Args...>;
145 auto task = std::make_shared<std::packaged_task<return_type()>>(
146 std::bind(std::forward<F>(f), std::forward<Args>(args)...)
149 std::future<return_type> res = task->get_future();
152 throw std::runtime_error(
"enqueue on stopped ThreadPool");
156 static std::atomic<size_t> next_thread{0};
157 size_t thread_index = next_thread.fetch_add(1) % thread_count_;
158 task_queues_[thread_index]->enqueue([task]() { (*task)(); });
166 for (
size_t i = 0; i < thread_count_; ++i) {
167 task_queues_[i]->enqueue(
nullptr);
169 for (std::thread& worker : workers_) {
170 if (worker.joinable()) {
支持"连接绑定线程"的线程池
Definition thread_pool.hpp:48
auto enqueue(F &&f, Args &&... args) -> std::future< std::invoke_result_t< F, Args... > >
提交任务到任意空闲线程
Definition thread_pool.hpp:140
ThreadPool(size_t threads)
构造线程池
Definition thread_pool.hpp:109
size_t get_thread_count() const
获取线程池中的线程数量
Definition thread_pool.hpp:97
void enqueue_to(size_t thread_index, std::function< void()> task)
提交任务到指定线程
Definition thread_pool.hpp:131
~ThreadPool()
析构线程池
Definition thread_pool.hpp:163
Definition blockingconcurrentqueue.h:26
Definition matching_engine.hpp:23