LCOV - code coverage report
Current view: top level - include/fix - session.hpp (source / functions) Coverage Total Hit
Test: coverage.info Lines: 84.6 % 13 11
Test Date: 2025-12-19 03:13:09 Functions: 78.6 % 14 11

            Line data    Source code
       1              : /**
       2              :  * @file session.hpp
       3              :  * @brief FIX 会话层实现
       4              :  *
       5              :  * 实现 FIX 协议的会话层状态机,管理会话生命周期、
       6              :  * 心跳检测、消息序列号等。
       7              :  * 
       8              :  * 会话层仅处理 FIX 协议的会话消息(Logon、Logout、Heartbeat、TestRequest),
       9              :  * 业务消息通过 Application 接口委托给应用层处理。
      10              :  */
      11              : 
      12              : #pragma once
      13              : 
      14              : #include <chrono>
      15              : #include <string>
      16              : #include <thread>
      17              : #include <atomic>
      18              : #include <functional>
      19              : #include <memory>
      20              : #include <map>
      21              : 
      22              : #include "fix/fix_codec.hpp"
      23              : #include "fix/application.hpp"
      24              : #include "base/concurrentqueue.h"
      25              : #include "base/timing_wheel.hpp"
      26              : 
      27              : namespace fix40 {
      28              : 
      29              : class Connection;
      30              : class Session;
      31              : class IStore;  // 前向声明
      32              : 
      33              : /**
      34              :  * @class IStateHandler
      35              :  * @brief 会话状态处理器接口(状态模式)
      36              :  *
      37              :  * 定义会话在不同状态下的行为接口。
      38              :  * 具体状态类(DisconnectedState、LogonSentState、EstablishedState、LogoutSentState)
      39              :  * 实现此接口以处理各状态下的事件。
      40              :  */
      41              : class IStateHandler {
      42              : public:
      43          571 :     virtual ~IStateHandler() = default;
      44              : 
      45              :     /**
      46              :      * @brief 处理收到的消息
      47              :      * @param context 会话上下文
      48              :      * @param msg 收到的 FIX 消息
      49              :      */
      50              :     virtual void onMessageReceived(Session& context, const FixMessage& msg) = 0;
      51              : 
      52              :     /**
      53              :      * @brief 处理定时器检查事件
      54              :      * @param context 会话上下文
      55              :      *
      56              :      * 用于心跳发送、超时检测等周期性任务。
      57              :      */
      58              :     virtual void onTimerCheck(Session& context) = 0;
      59              : 
      60              :     /**
      61              :      * @brief 处理会话启动事件
      62              :      * @param context 会话上下文
      63              :      */
      64              :     virtual void onSessionStart(Session& context) = 0;
      65              : 
      66              :     /**
      67              :      * @brief 处理登出请求
      68              :      * @param context 会话上下文
      69              :      * @param reason 登出原因
      70              :      */
      71              :     virtual void onLogoutRequest(Session& context, const std::string& reason) = 0;
      72              : 
      73              :     /**
      74              :      * @brief 获取状态名称
      75              :      * @return const char* 状态名称字符串
      76              :      */
      77              :     virtual const char* getStateName() const = 0;
      78              : };
      79              : 
      80              : 
      81              : /**
      82              :  * @class Session
      83              :  * @brief FIX 会话管理器
      84              :  *
      85              :  * 实现 FIX 协议的会话层,管理:
      86              :  * - 会话状态机(Disconnected -> LogonSent -> Established -> LogoutSent)
      87              :  * - 消息序列号
      88              :  * - 心跳检测和 TestRequest
      89              :  * - 优雅登出流程
      90              :  *
      91              :  * @par 状态转换图
      92              :  * @code
      93              :  *                     ┌──────────────┐
      94              :  *                     │ Disconnected │
      95              :  *                     └──────┬───────┘
      96              :  *                            │ start() [客户端发送 Logon]
      97              :  *                            ▼
      98              :  *                     ┌──────────────┐
      99              :  *                     │  LogonSent   │
     100              :  *                     └──────┬───────┘
     101              :  *                            │ 收到 Logon 确认
     102              :  *                            ▼
     103              :  *                     ┌──────────────┐
     104              :  *                     │ Established  │
     105              :  *                     └──────┬───────┘
     106              :  *                            │ 发起 Logout
     107              :  *                            ▼
     108              :  *                     ┌──────────────┐
     109              :  *                     │  LogoutSent  │
     110              :  *                     └──────┬───────┘
     111              :  *                            │ 收到 Logout 确认
     112              :  *                            ▼
     113              :  *                     ┌──────────────┐
     114              :  *                     │ Disconnected │
     115              :  *                     └──────────────┘
     116              :  * @endcode
     117              :  *
     118              :  * @note 该类继承 std::enable_shared_from_this,必须通过 std::shared_ptr 管理
     119              :  */
     120              :         class Session : public std::enable_shared_from_this<Session> {
     121              :         public:
     122              :             /// 会话关闭回调类型
     123              :             using ShutdownCallback = std::function<void()>;
     124              : 
     125              :             /**
     126              :              * @brief 会话建立回调类型
     127              :              *
     128              :              * 在完成 Logon 握手并进入 Established 状态后触发。
     129              :              * 常用于:服务端在此时将 Session 注册到 SessionManager。
     130              :              *
     131              :              * @note 回调可能在连接绑定的工作线程中执行,需保证线程安全。
     132              :              */
     133              :             using EstablishedCallback = std::function<void(std::shared_ptr<Session>)>;
     134              : 
     135              :             /**
     136              :              * @brief 构造会话对象
     137              :              * @param sender 发送方 CompID
     138              :              * @param target 接收方 CompID
     139              :      * @param hb 心跳间隔(秒)
     140              :      * @param shutdown_cb 会话关闭时的回调函数
     141              :      * @param store 存储接口指针(可选,用于消息持久化和断线恢复)
     142              :      */
     143              :     Session(const std::string& sender,
     144              :             const std::string& target,
     145              :             int hb,
     146              :             ShutdownCallback shutdown_cb,
     147              :             IStore* store = nullptr);
     148              : 
     149              :     /**
     150              :      * @brief 析构函数
     151              :      *
     152              :      * 取消定时任务,释放资源。
     153              :      */
     154              :     ~Session();
     155              : 
     156              :     /**
     157              :      * @brief 关联 Connection 对象
     158              :      * @param conn Connection 的弱引用
     159              :      */
     160              :     void set_connection(std::weak_ptr<Connection> conn);
     161              : 
     162              :     /**
     163              :      * @brief 设置应用层处理器
     164              :      * @param app Application 实现类指针
     165              :      *
     166              :      * 设置后,收到业务消息时会调用 app->fromApp(),
     167              :      * 会话建立/断开时会调用 app->onLogon()/onLogout()。
     168              :      *
     169              :      * @note Application 的生命周期必须比 Session 长
     170              :      * @note 可传入 nullptr 取消关联
     171              :      */
     172              :     void set_application(Application* app);
     173              : 
     174              :     /**
     175              :      * @brief 获取应用层处理器
     176              :      * @return Application* 当前关联的 Application 指针,可能为 nullptr
     177              :      */
     178              :     Application* get_application() const;
     179              : 
     180              :             /**
     181              :              * @brief 获取会话标识符
     182              :              * @return SessionID 包含 senderCompID 和 targetCompID
     183              :              */
     184              :             SessionID get_session_id() const;
     185              : 
     186              :             /**
     187              :              * @brief 设置会话建立回调
     188              :              * @param cb 会话建立回调
     189              :              *
     190              :              * 当会话完成 Logon 握手并进入 Established 状态时调用。
     191              :              *
     192              :              * @note 回调只会触发一次(幂等)。
     193              :              */
     194              :             void set_established_callback(EstablishedCallback cb);
     195              : 
     196              :             /**
     197              :              * @brief 通知会话已建立
     198              :              *
     199              :              * 仅供内部状态机在完成 Logon 握手后调用,用于触发 EstablishedCallback。
     200              :              *
     201              :              * @note 该方法是幂等的:多次调用只会触发一次回调。
     202              :              */
     203              :             void notify_established();
     204              : 
     205              :             /**
     206              :              * @brief 更新对端 CompID
     207              :              * @param target 对端的真实 CompID(通常来自 Logon SenderCompID)
     208              :              *
     209              :              * 服务端在 accept 新连接时往往无法立即知道客户端 CompID。
     210              :              * 收到客户端 Logon 后,应将 targetCompID 更新为真实客户端 CompID,
     211              :              * 以便后续发送消息时能正确设置 TargetCompID(56),并形成稳定的 SessionID。
     212              :              */
     213              :             void set_target_comp_id(const std::string& target);
     214              : 
     215              :             /**
     216              :              * @brief 发送业务消息
     217              :              * @param msg 要发送的业务消息
     218              :      *
     219              :      * 与 send() 的区别是会先调用 Application::toApp() 回调,
     220              :      * 允许应用层在发送前拦截或修改消息。
     221              :      */
     222              :     void send_app_message(FixMessage& msg);
     223              : 
     224              :     /**
     225              :      * @brief 启动会话
     226              :      *
     227              :      * 客户端:发送 Logon 消息
     228              :      * 服务端:等待客户端 Logon
     229              :      */
     230              :     void start();
     231              : 
     232              :     /**
     233              :      * @brief 停止会话
     234              :      */
     235              :     void stop();
     236              : 
     237              :     /**
     238              :      * @brief 检查会话是否正在运行
     239              :      * @return true 正在运行
     240              :      * @return false 已停止
     241              :      */
     242           50 :     bool is_running() const { return running_; }
     243              : 
     244              :     /**
     245              :      * @brief 发送 FIX 消息
     246              :      * @param msg 要发送的消息(会自动设置序列号)
     247              :      */
     248              :     void send(FixMessage& msg);
     249              : 
     250              :     /**
     251              :      * @brief 发送缓冲区中的数据
     252              :      */
     253              :     void send_buffered_data();
     254              : 
     255              :     /**
     256              :      * @brief 处理写就绪事件
     257              :      */
     258              :     void handle_write_ready();
     259              : 
     260              :     /**
     261              :      * @brief 将原始消息加入发送队列
     262              :      * @param raw_msg 原始消息字符串
     263              :      */
     264              :     void enqueue_raw_for_send(std::string&& raw_msg);
     265              : 
     266              :     /**
     267              :      * @brief 检查发送队列是否为空
     268              :      * @return true 队列为空
     269              :      * @return false 队列非空
     270              :      */
     271              :     bool is_outbound_queue_empty() const;
     272              : 
     273              :     /**
     274              :      * @brief 处理收到的消息
     275              :      * @param msg 解码后的 FIX 消息
     276              :      */
     277              :     void on_message_received(const FixMessage& msg);
     278              : 
     279              :     /**
     280              :      * @brief 处理定时器检查
     281              :      *
     282              :      * 由时间轮周期性调用,用于心跳发送和超时检测。
     283              :      */
     284              :     void on_timer_check();
     285              : 
     286              :     /**
     287              :      * @brief 处理 I/O 错误
     288              :      * @param reason 错误原因
     289              :      */
     290              :     void on_io_error(const std::string& reason);
     291              : 
     292              :     /**
     293              :      * @brief 处理连接关闭
     294              :      * @param reason 关闭原因
     295              :      */
     296              :     void on_shutdown(const std::string& reason);
     297              : 
     298              :     /**
     299              :      * @brief 发起优雅登出流程
     300              :      * @param reason 登出原因
     301              :      */
     302              :     void initiate_logout(const std::string& reason);
     303              : 
     304              :     /**
     305              :      * @brief 调度周期性定时任务
     306              :      * @param wheel 时间轮指针
     307              :      */
     308              :     void schedule_timer_tasks(TimingWheel* wheel);
     309              : 
     310              :             /**
     311              :              * @brief 切换会话状态
     312              :              * @param newState 新状态对象
     313              :              */
     314              :             void changeState(std::unique_ptr<IStateHandler> newState);
     315              : 
     316              :             const std::string senderCompID;  ///< 发送方 CompID
     317              :             std::string targetCompID;  ///< 接收方 CompID(服务端可在 Logon 后更新为真实客户端 CompID)
     318              :             FixCodec codec_;                 ///< FIX 编解码器
     319              : 
     320              :             // --- 公共辅助函数(供状态类调用)---
     321              : 
     322              :     /**
     323              :      * @brief 发送 Logout 消息
     324              :      * @param reason 登出原因
     325              :      */
     326              :     void send_logout(const std::string& reason);
     327              : 
     328              :     /**
     329              :      * @brief 发送 Heartbeat 消息
     330              :      * @param test_req_id TestReqID(响应 TestRequest 时填写)
     331              :      */
     332              :     void send_heartbeat(const std::string& test_req_id = "");
     333              : 
     334              :     /**
     335              :      * @brief 发送 TestRequest 消息
     336              :      * @param id 测试请求标识符
     337              :      */
     338              :     void send_test_request(const std::string& id);
     339              : 
     340              :     /**
     341              :      * @brief 执行会话关闭
     342              :      * @param reason 关闭原因
     343              :      */
     344              :     void perform_shutdown(const std::string& reason);
     345              : 
     346              :     /** @brief 更新最后接收时间 */
     347              :     void update_last_recv_time();
     348              : 
     349              :     /** @brief 更新最后发送时间 */
     350              :     void update_last_send_time();
     351              : 
     352              :     /** @brief 获取最后接收时间 */
     353              :     std::chrono::steady_clock::time_point get_last_recv_time() const;
     354              : 
     355              :     /** @brief 获取最后发送时间 */
     356              :     std::chrono::steady_clock::time_point get_last_send_time() const;
     357              : 
     358              :     /** @brief 获取心跳间隔(秒) */
     359              :     int get_heart_bt_int() const;
     360              : 
     361              :     /** @brief 设置心跳间隔 */
     362              :     void set_heart_bt_int(int new_hb);
     363              : 
     364              :     /** @brief 获取最小允许心跳间隔 */
     365              :     int get_min_heart_bt_int() const;
     366              : 
     367              :     /** @brief 获取最大允许心跳间隔 */
     368              :     int get_max_heart_bt_int() const;
     369              : 
     370              :     // --- 序列号管理 ---
     371              : 
     372              :     /** @brief 获取发送序列号 */
     373          115 :     int get_send_seq_num() { return sendSeqNum; }
     374              : 
     375              :     /** @brief 获取接收序列号 */
     376          118 :     int get_recv_seq_num() { return recvSeqNum; }
     377              : 
     378              :     /** @brief 递增发送序列号 */
     379            1 :     void increment_send_seq_num() { sendSeqNum++; }
     380              : 
     381              :     /** @brief 递增接收序列号 */
     382           15 :     void increment_recv_seq_num() { recvSeqNum++; }
     383              : 
     384              :     /** @brief 设置接收序列号 */
     385          143 :     void set_recv_seq_num(int seq) { recvSeqNum = seq; }
     386              : 
     387              :     /**
     388              :      * @brief 设置发送序列号
     389              :      * @param seq 新的发送序列号
     390              :      *
     391              :      * 用于断线恢复时从存储中恢复序列号状态。
     392              :      * 注意:正常发送流程中不应直接调用此方法,
     393              :      * 序列号由 send() 方法自动管理。
     394              :      */
     395            1 :     void set_send_seq_num(int seq) { sendSeqNum = seq; }
     396              : 
     397              :     // --- 断线恢复相关 ---
     398              : 
     399              :     /**
     400              :      * @brief 发送 ResendRequest 消息
     401              :      * @param begin_seq_no 请求重传的起始序列号
     402              :      * @param end_seq_no 请求重传的结束序列号(0 表示到最新)
     403              :      */
     404              :     void send_resend_request(int begin_seq_no, int end_seq_no);
     405              : 
     406              :     /**
     407              :      * @brief 发送 SequenceReset-GapFill 消息
     408              :      * @param seq_num 消息序列号
     409              :      * @param new_seq_no 新的序列号
     410              :      */
     411              :     void send_sequence_reset_gap_fill(int seq_num, int new_seq_no);
     412              : 
     413              :     /**
     414              :      * @brief 保存会话状态到存储
     415              :      */
     416              :     void save_session_state();
     417              : 
     418              :     /**
     419              :      * @brief 从存储恢复会话状态
     420              :      * @return true 如果成功恢复,false 如果没有保存的状态
     421              :      */
     422              :     bool restore_session_state();
     423              : 
     424              :     /**
     425              :      * @brief 获取存储接口
     426              :      * @return IStore* 存储接口指针,可能为 nullptr
     427              :      */
     428          145 :     IStore* get_store() const { return store_; }
     429              : 
     430              :     /**
     431              :      * @brief 获取连接对象
     432              :      * @return std::weak_ptr<Connection> 连接的弱引用
     433              :      */
     434            0 :     std::weak_ptr<Connection> get_connection() const { return connection_; }
     435              : 
     436              :     /**
     437              :      * @brief 检查是否正在处理重传请求
     438              :      */
     439              :     bool is_processing_resend() const { return processingResend_; }
     440              : 
     441              :     /**
     442              :      * @brief 设置重传处理状态
     443              :      */
     444            0 :     void set_processing_resend(bool processing) { processingResend_ = processing; }
     445              : 
     446              :     // --- 客户端身份标识 ---
     447              : 
     448              :     /**
     449              :      * @brief 获取客户端标识
     450              :      * @return 客户端的 CompID(从 Logon 消息中提取)
     451              :      * 
     452              :      * 对于 Server 端 Session,返回连接客户端的真实标识。
     453              :      * 对于 Client 端 Session,返回自身的 senderCompID。
     454              :      */
     455            8 :     const std::string& get_client_comp_id() const { return clientCompID_; }
     456              : 
     457              :     /**
     458              :      * @brief 设置客户端标识
     459              :      * @param clientId 客户端的 CompID
     460              :      * 
     461              :      * 由 DisconnectedState::onMessageReceived 在收到 Logon 消息时调用。
     462              :      */
     463           15 :     void set_client_comp_id(const std::string& clientId) { clientCompID_ = clientId; }
     464              : 
     465              : private:
     466              :     std::atomic<bool> shutting_down_{false};   ///< 关闭中标志
     467              :     std::recursive_mutex state_mutex_;          ///< 状态保护锁
     468              :     std::unique_ptr<IStateHandler> currentState_; ///< 当前状态对象
     469              : 
     470              :     /**
     471              :      * @brief 内部发送实现
     472              :      * @param raw_msg 原始消息字符串
     473              :      */
     474              :     void internal_send(const std::string& raw_msg);
     475              : 
     476              :     /**
     477              :      * @brief 按序处理暂存的入站消息
     478              :      *
     479              :      * 当检测到 MsgSeqNum gap(收到未来序列号的消息)时,会先发送 ResendRequest 并暂存消息。
     480              :      * 当缺失消息补齐、或收到 SequenceReset-GapFill 推进 recvSeqNum 后,调用此方法即可继续
     481              :      * 投递暂存消息,保证回调到状态机/应用层的顺序是单调递增的。
     482              :      *
     483              :      * @note 必须在持有 state_mutex_ 的情况下调用。
     484              :      */
     485              :     void drain_pending_inbound_locked();
     486              : 
     487              :     int heartBtInt;                  ///< 心跳间隔(秒)
     488              :     const int minHeartBtInt_;        ///< 最小心跳间隔
     489              :     const int maxHeartBtInt_;        ///< 最大心跳间隔
     490              :     ShutdownCallback shutdown_callback_; ///< 关闭回调
     491              :     std::weak_ptr<Connection> connection_; ///< 关联的连接
     492              : 
     493              :     moodycamel::ConcurrentQueue<std::string> outbound_q_; ///< 发送队列
     494              : 
     495              :     std::atomic<bool> running_{false}; ///< 运行状态
     496              : 
     497              :     int sendSeqNum = 1;  ///< 发送序列号
     498              :     int recvSeqNum = 1;  ///< 期望接收的序列号
     499              :     std::chrono::steady_clock::time_point lastRecv; ///< 最后接收时间
     500              :     std::chrono::steady_clock::time_point lastSend; ///< 最后发送时间
     501              : 
     502              :     /// 暂存的入站消息(用于处理 MsgSeqNum gap),按序列号排序
     503              :     std::map<int, FixMessage> pending_inbound_;
     504              :     /// 最近一次发送的 ResendRequest 的 EndSeqNo(0 表示未发起/已完成)
     505              :     int last_resend_request_end_ = 0;
     506              : 
     507              :     TimingWheel* timing_wheel_ = nullptr;  ///< 时间轮指针
     508              :     TimerTaskId timer_task_id_ = INVALID_TIMER_ID; ///< 定时任务 ID
     509              : 
     510              :             Application* application_ = nullptr;  ///< 应用层处理器指针
     511              :             IStore* store_ = nullptr;             ///< 存储接口指针(用于消息持久化)
     512              :             bool processingResend_ = false;       ///< 是否正在处理重传请求
     513              :             std::string clientCompID_;            ///< 客户端标识(从 Logon 消息提取)
     514              :             EstablishedCallback established_callback_; ///< 会话建立回调
     515              :             std::atomic<bool> established_notified_{false}; ///< 会话建立回调触发标志(幂等)
     516              :         };
     517              : 
     518              :         } // namespace fix40
        

Generated by: LCOV version 2.0-1