LCOV - code coverage report
Current view: top level - include/base - thread_pool.hpp (source / functions) Coverage Total Hit
Test: coverage.info Lines: 97.4 % 38 37
Test Date: 2025-12-19 03:13:09 Functions: 100.0 % 19 19

            Line data    Source code
       1              : /**
       2              :  * @file thread_pool.hpp
       3              :  * @brief 支持连接绑定的线程池实现
       4              :  *
       5              :  * 提供高性能的线程池,支持将任务派发到指定线程执行,
       6              :  * 实现"连接绑定线程"模型,避免同一连接的操作产生锁竞争。
       7              :  */
       8              : 
       9              : #pragma once
      10              : 
      11              : #include <vector>
      12              : #include <thread>
      13              : #include <functional>
      14              : #include <future>
      15              : #include <type_traits>
      16              : #include <atomic>
      17              : 
      18              : #include "base/blockingconcurrentqueue.h"
      19              : 
      20              : namespace fix40 {
      21              : 
      22              : /**
      23              :  * @class ThreadPool
      24              :  * @brief 支持"连接绑定线程"的线程池
      25              :  *
      26              :  * 每个工作线程有独立的任务队列,可以指定任务在哪个线程执行。
      27              :  * 这样同一个连接的所有操作都在同一个线程中串行执行,避免锁竞争。
      28              :  *
      29              :  * @par 设计特点
      30              :  * - 每个线程独立的无锁任务队列(moodycamel::BlockingConcurrentQueue)
      31              :  * - 支持指定线程执行任务(enqueue_to)
      32              :  * - 支持轮询分配任务(enqueue)
      33              :  * - 优雅关闭:发送空任务通知线程退出
      34              :  *
      35              :  * @par 使用示例
      36              :  * @code
      37              :  * ThreadPool pool(4);
      38              :  * 
      39              :  * // 将任务派发到指定线程(连接绑定场景)
      40              :  * pool.enqueue_to(connection_fd % pool.get_thread_count(), [&]() {
      41              :  *     handle_connection(connection_fd);
      42              :  * });
      43              :  * 
      44              :  * // 提交任务到任意线程
      45              :  * auto future = pool.enqueue([]() { return compute_result(); });
      46              :  * @endcode
      47              :  */
      48              : class ThreadPool {
      49              : public:
      50              :     /**
      51              :      * @brief 构造线程池
      52              :      * @param threads 工作线程数量
      53              :      *
      54              :      * 创建指定数量的工作线程,每个线程有独立的任务队列。
      55              :      */
      56              :     explicit ThreadPool(size_t threads);
      57              : 
      58              :     /**
      59              :      * @brief 析构线程池
      60              :      *
      61              :      * 向每个线程发送空任务以通知退出,然后等待所有线程结束。
      62              :      */
      63              :     ~ThreadPool();
      64              : 
      65              :     /**
      66              :      * @brief 提交任务到指定线程
      67              :      * @param thread_index 目标线程索引(会自动取模)
      68              :      * @param task 要执行的任务
      69              :      *
      70              :      * 用于连接绑定场景,确保同一连接的所有操作在同一线程串行执行。
      71              :      *
      72              :      * @note 如果 thread_index >= thread_count_,会自动取模
      73              :      * @note 线程池停止后调用此方法无效
      74              :      */
      75              :     void enqueue_to(size_t thread_index, std::function<void()> task);
      76              : 
      77              :     /**
      78              :      * @brief 提交任务到任意空闲线程
      79              :      * @tparam F 可调用对象类型
      80              :      * @tparam Args 参数类型
      81              :      * @param f 可调用对象
      82              :      * @param args 调用参数
      83              :      * @return std::future 用于获取任务返回值
      84              :      *
      85              :      * 使用轮询方式分配任务到各线程,实现简单的负载均衡。
      86              :      *
      87              :      * @throws std::runtime_error 如果线程池已停止
      88              :      */
      89              :     template<class F, class... Args>
      90              :     auto enqueue(F&& f, Args&&... args)
      91              :         -> std::future<std::invoke_result_t<F, Args...>>;
      92              : 
      93              :     /**
      94              :      * @brief 获取线程池中的线程数量
      95              :      * @return size_t 线程数量
      96              :      */
      97            3 :     size_t get_thread_count() const { return thread_count_; }
      98              : 
      99              : private:
     100              :     std::vector<std::thread> workers_; ///< 工作线程数组
     101              :     /// 每个线程独立的任务队列
     102              :     std::vector<std::unique_ptr<moodycamel::BlockingConcurrentQueue<std::function<void()>>>> task_queues_;
     103              :     std::atomic<bool> stop_;   ///< 停止标志
     104              :     size_t thread_count_;      ///< 线程数量
     105              : };
     106              : 
     107              : // --- 实现 ---
     108              : 
     109           12 : inline ThreadPool::ThreadPool(size_t threads) : stop_(false), thread_count_(threads) {
     110              :     // 为每个线程创建独立的任务队列
     111           53 :     for (size_t i = 0; i < threads; ++i) {
     112           41 :         task_queues_.push_back(
     113           82 :             std::make_unique<moodycamel::BlockingConcurrentQueue<std::function<void()>>>()
     114              :         );
     115              :     }
     116              : 
     117              :     // 创建工作线程,每个线程只从自己的队列取任务
     118           53 :     for (size_t i = 0; i < threads; ++i) {
     119           41 :         workers_.emplace_back([this, i] {
     120           41 :             auto& my_queue = *task_queues_[i];
     121              :             while (true) {
     122         1172 :                 std::function<void()> task;
     123         1172 :                 my_queue.wait_dequeue(task);
     124         1172 :                 if (!task) break;  // 空任务表示退出
     125         1131 :                 task();
     126         2303 :             }
     127           41 :         });
     128              :     }
     129           12 : }
     130              : 
     131           26 : inline void ThreadPool::enqueue_to(size_t thread_index, std::function<void()> task) {
     132           26 :     if (stop_) return;
     133           26 :     if (thread_index >= thread_count_) {
     134            1 :         thread_index = thread_index % thread_count_;
     135              :     }
     136           26 :     task_queues_[thread_index]->enqueue(std::move(task));
     137              : }
     138              : 
     139              : template<class F, class... Args>
     140         1105 : auto ThreadPool::enqueue(F&& f, Args&&... args)
     141              :     -> std::future<std::invoke_result_t<F, Args...>> {
     142              : 
     143              :     using return_type = std::invoke_result_t<F, Args...>;
     144              : 
     145         1105 :     auto task = std::make_shared<std::packaged_task<return_type()>>(
     146         1104 :         std::bind(std::forward<F>(f), std::forward<Args>(args)...)
     147              :     );
     148              : 
     149         1105 :     std::future<return_type> res = task->get_future();
     150              : 
     151         1105 :     if (stop_) {
     152            0 :         throw std::runtime_error("enqueue on stopped ThreadPool");
     153              :     }
     154              : 
     155              :     // 轮询分配到各个线程(简单的负载均衡)
     156              :     static std::atomic<size_t> next_thread{0};
     157         1105 :     size_t thread_index = next_thread.fetch_add(1) % thread_count_;
     158         2210 :     task_queues_[thread_index]->enqueue([task]() { (*task)(); });
     159              : 
     160         2210 :     return res;
     161         1105 : }
     162              : 
     163           12 : inline ThreadPool::~ThreadPool() {
     164           12 :     stop_ = true;
     165              :     // 向每个线程的队列发送空任务,唤醒并退出
     166           53 :     for (size_t i = 0; i < thread_count_; ++i) {
     167           41 :         task_queues_[i]->enqueue(nullptr);
     168              :     }
     169           53 :     for (std::thread& worker : workers_) {
     170           41 :         if (worker.joinable()) {
     171           41 :             worker.join();
     172              :         }
     173              :     }
     174           12 : }
     175              : 
     176              : } // namespace fix40
        

Generated by: LCOV version 2.0-1