Line data Source code
1 : /**
2 : * @file session.hpp
3 : * @brief FIX 会话层实现
4 : *
5 : * 实现 FIX 协议的会话层状态机,管理会话生命周期、
6 : * 心跳检测、消息序列号等。
7 : *
8 : * 会话层仅处理 FIX 协议的会话消息(Logon、Logout、Heartbeat、TestRequest),
9 : * 业务消息通过 Application 接口委托给应用层处理。
10 : */
11 :
12 : #pragma once
13 :
14 : #include <chrono>
15 : #include <string>
16 : #include <thread>
17 : #include <atomic>
18 : #include <functional>
19 : #include <memory>
20 : #include <map>
21 :
22 : #include "fix/fix_codec.hpp"
23 : #include "fix/application.hpp"
24 : #include "base/concurrentqueue.h"
25 : #include "base/timing_wheel.hpp"
26 :
27 : namespace fix40 {
28 :
29 : class Connection;
30 : class Session;
31 : class IStore; // 前向声明
32 :
33 : /**
34 : * @class IStateHandler
35 : * @brief 会话状态处理器接口(状态模式)
36 : *
37 : * 定义会话在不同状态下的行为接口。
38 : * 具体状态类(DisconnectedState、LogonSentState、EstablishedState、LogoutSentState)
39 : * 实现此接口以处理各状态下的事件。
40 : */
41 : class IStateHandler {
42 : public:
43 571 : virtual ~IStateHandler() = default;
44 :
45 : /**
46 : * @brief 处理收到的消息
47 : * @param context 会话上下文
48 : * @param msg 收到的 FIX 消息
49 : */
50 : virtual void onMessageReceived(Session& context, const FixMessage& msg) = 0;
51 :
52 : /**
53 : * @brief 处理定时器检查事件
54 : * @param context 会话上下文
55 : *
56 : * 用于心跳发送、超时检测等周期性任务。
57 : */
58 : virtual void onTimerCheck(Session& context) = 0;
59 :
60 : /**
61 : * @brief 处理会话启动事件
62 : * @param context 会话上下文
63 : */
64 : virtual void onSessionStart(Session& context) = 0;
65 :
66 : /**
67 : * @brief 处理登出请求
68 : * @param context 会话上下文
69 : * @param reason 登出原因
70 : */
71 : virtual void onLogoutRequest(Session& context, const std::string& reason) = 0;
72 :
73 : /**
74 : * @brief 获取状态名称
75 : * @return const char* 状态名称字符串
76 : */
77 : virtual const char* getStateName() const = 0;
78 : };
79 :
80 :
81 : /**
82 : * @class Session
83 : * @brief FIX 会话管理器
84 : *
85 : * 实现 FIX 协议的会话层,管理:
86 : * - 会话状态机(Disconnected -> LogonSent -> Established -> LogoutSent)
87 : * - 消息序列号
88 : * - 心跳检测和 TestRequest
89 : * - 优雅登出流程
90 : *
91 : * @par 状态转换图
92 : * @code
93 : * ┌──────────────┐
94 : * │ Disconnected │
95 : * └──────┬───────┘
96 : * │ start() [客户端发送 Logon]
97 : * ▼
98 : * ┌──────────────┐
99 : * │ LogonSent │
100 : * └──────┬───────┘
101 : * │ 收到 Logon 确认
102 : * ▼
103 : * ┌──────────────┐
104 : * │ Established │
105 : * └──────┬───────┘
106 : * │ 发起 Logout
107 : * ▼
108 : * ┌──────────────┐
109 : * │ LogoutSent │
110 : * └──────┬───────┘
111 : * │ 收到 Logout 确认
112 : * ▼
113 : * ┌──────────────┐
114 : * │ Disconnected │
115 : * └──────────────┘
116 : * @endcode
117 : *
118 : * @note 该类继承 std::enable_shared_from_this,必须通过 std::shared_ptr 管理
119 : */
120 : class Session : public std::enable_shared_from_this<Session> {
121 : public:
122 : /// 会话关闭回调类型
123 : using ShutdownCallback = std::function<void()>;
124 :
125 : /**
126 : * @brief 会话建立回调类型
127 : *
128 : * 在完成 Logon 握手并进入 Established 状态后触发。
129 : * 常用于:服务端在此时将 Session 注册到 SessionManager。
130 : *
131 : * @note 回调可能在连接绑定的工作线程中执行,需保证线程安全。
132 : */
133 : using EstablishedCallback = std::function<void(std::shared_ptr<Session>)>;
134 :
135 : /**
136 : * @brief 构造会话对象
137 : * @param sender 发送方 CompID
138 : * @param target 接收方 CompID
139 : * @param hb 心跳间隔(秒)
140 : * @param shutdown_cb 会话关闭时的回调函数
141 : * @param store 存储接口指针(可选,用于消息持久化和断线恢复)
142 : */
143 : Session(const std::string& sender,
144 : const std::string& target,
145 : int hb,
146 : ShutdownCallback shutdown_cb,
147 : IStore* store = nullptr);
148 :
149 : /**
150 : * @brief 析构函数
151 : *
152 : * 取消定时任务,释放资源。
153 : */
154 : ~Session();
155 :
156 : /**
157 : * @brief 关联 Connection 对象
158 : * @param conn Connection 的弱引用
159 : */
160 : void set_connection(std::weak_ptr<Connection> conn);
161 :
162 : /**
163 : * @brief 设置应用层处理器
164 : * @param app Application 实现类指针
165 : *
166 : * 设置后,收到业务消息时会调用 app->fromApp(),
167 : * 会话建立/断开时会调用 app->onLogon()/onLogout()。
168 : *
169 : * @note Application 的生命周期必须比 Session 长
170 : * @note 可传入 nullptr 取消关联
171 : */
172 : void set_application(Application* app);
173 :
174 : /**
175 : * @brief 获取应用层处理器
176 : * @return Application* 当前关联的 Application 指针,可能为 nullptr
177 : */
178 : Application* get_application() const;
179 :
180 : /**
181 : * @brief 获取会话标识符
182 : * @return SessionID 包含 senderCompID 和 targetCompID
183 : */
184 : SessionID get_session_id() const;
185 :
186 : /**
187 : * @brief 设置会话建立回调
188 : * @param cb 会话建立回调
189 : *
190 : * 当会话完成 Logon 握手并进入 Established 状态时调用。
191 : *
192 : * @note 回调只会触发一次(幂等)。
193 : */
194 : void set_established_callback(EstablishedCallback cb);
195 :
196 : /**
197 : * @brief 通知会话已建立
198 : *
199 : * 仅供内部状态机在完成 Logon 握手后调用,用于触发 EstablishedCallback。
200 : *
201 : * @note 该方法是幂等的:多次调用只会触发一次回调。
202 : */
203 : void notify_established();
204 :
205 : /**
206 : * @brief 更新对端 CompID
207 : * @param target 对端的真实 CompID(通常来自 Logon SenderCompID)
208 : *
209 : * 服务端在 accept 新连接时往往无法立即知道客户端 CompID。
210 : * 收到客户端 Logon 后,应将 targetCompID 更新为真实客户端 CompID,
211 : * 以便后续发送消息时能正确设置 TargetCompID(56),并形成稳定的 SessionID。
212 : */
213 : void set_target_comp_id(const std::string& target);
214 :
215 : /**
216 : * @brief 发送业务消息
217 : * @param msg 要发送的业务消息
218 : *
219 : * 与 send() 的区别是会先调用 Application::toApp() 回调,
220 : * 允许应用层在发送前拦截或修改消息。
221 : */
222 : void send_app_message(FixMessage& msg);
223 :
224 : /**
225 : * @brief 启动会话
226 : *
227 : * 客户端:发送 Logon 消息
228 : * 服务端:等待客户端 Logon
229 : */
230 : void start();
231 :
232 : /**
233 : * @brief 停止会话
234 : */
235 : void stop();
236 :
237 : /**
238 : * @brief 检查会话是否正在运行
239 : * @return true 正在运行
240 : * @return false 已停止
241 : */
242 50 : bool is_running() const { return running_; }
243 :
244 : /**
245 : * @brief 发送 FIX 消息
246 : * @param msg 要发送的消息(会自动设置序列号)
247 : */
248 : void send(FixMessage& msg);
249 :
250 : /**
251 : * @brief 发送缓冲区中的数据
252 : */
253 : void send_buffered_data();
254 :
255 : /**
256 : * @brief 处理写就绪事件
257 : */
258 : void handle_write_ready();
259 :
260 : /**
261 : * @brief 将原始消息加入发送队列
262 : * @param raw_msg 原始消息字符串
263 : */
264 : void enqueue_raw_for_send(std::string&& raw_msg);
265 :
266 : /**
267 : * @brief 检查发送队列是否为空
268 : * @return true 队列为空
269 : * @return false 队列非空
270 : */
271 : bool is_outbound_queue_empty() const;
272 :
273 : /**
274 : * @brief 处理收到的消息
275 : * @param msg 解码后的 FIX 消息
276 : */
277 : void on_message_received(const FixMessage& msg);
278 :
279 : /**
280 : * @brief 处理定时器检查
281 : *
282 : * 由时间轮周期性调用,用于心跳发送和超时检测。
283 : */
284 : void on_timer_check();
285 :
286 : /**
287 : * @brief 处理 I/O 错误
288 : * @param reason 错误原因
289 : */
290 : void on_io_error(const std::string& reason);
291 :
292 : /**
293 : * @brief 处理连接关闭
294 : * @param reason 关闭原因
295 : */
296 : void on_shutdown(const std::string& reason);
297 :
298 : /**
299 : * @brief 发起优雅登出流程
300 : * @param reason 登出原因
301 : */
302 : void initiate_logout(const std::string& reason);
303 :
304 : /**
305 : * @brief 调度周期性定时任务
306 : * @param wheel 时间轮指针
307 : */
308 : void schedule_timer_tasks(TimingWheel* wheel);
309 :
310 : /**
311 : * @brief 切换会话状态
312 : * @param newState 新状态对象
313 : */
314 : void changeState(std::unique_ptr<IStateHandler> newState);
315 :
316 : const std::string senderCompID; ///< 发送方 CompID
317 : std::string targetCompID; ///< 接收方 CompID(服务端可在 Logon 后更新为真实客户端 CompID)
318 : FixCodec codec_; ///< FIX 编解码器
319 :
320 : // --- 公共辅助函数(供状态类调用)---
321 :
322 : /**
323 : * @brief 发送 Logout 消息
324 : * @param reason 登出原因
325 : */
326 : void send_logout(const std::string& reason);
327 :
328 : /**
329 : * @brief 发送 Heartbeat 消息
330 : * @param test_req_id TestReqID(响应 TestRequest 时填写)
331 : */
332 : void send_heartbeat(const std::string& test_req_id = "");
333 :
334 : /**
335 : * @brief 发送 TestRequest 消息
336 : * @param id 测试请求标识符
337 : */
338 : void send_test_request(const std::string& id);
339 :
340 : /**
341 : * @brief 执行会话关闭
342 : * @param reason 关闭原因
343 : */
344 : void perform_shutdown(const std::string& reason);
345 :
346 : /** @brief 更新最后接收时间 */
347 : void update_last_recv_time();
348 :
349 : /** @brief 更新最后发送时间 */
350 : void update_last_send_time();
351 :
352 : /** @brief 获取最后接收时间 */
353 : std::chrono::steady_clock::time_point get_last_recv_time() const;
354 :
355 : /** @brief 获取最后发送时间 */
356 : std::chrono::steady_clock::time_point get_last_send_time() const;
357 :
358 : /** @brief 获取心跳间隔(秒) */
359 : int get_heart_bt_int() const;
360 :
361 : /** @brief 设置心跳间隔 */
362 : void set_heart_bt_int(int new_hb);
363 :
364 : /** @brief 获取最小允许心跳间隔 */
365 : int get_min_heart_bt_int() const;
366 :
367 : /** @brief 获取最大允许心跳间隔 */
368 : int get_max_heart_bt_int() const;
369 :
370 : // --- 序列号管理 ---
371 :
372 : /** @brief 获取发送序列号 */
373 115 : int get_send_seq_num() { return sendSeqNum; }
374 :
375 : /** @brief 获取接收序列号 */
376 118 : int get_recv_seq_num() { return recvSeqNum; }
377 :
378 : /** @brief 递增发送序列号 */
379 1 : void increment_send_seq_num() { sendSeqNum++; }
380 :
381 : /** @brief 递增接收序列号 */
382 15 : void increment_recv_seq_num() { recvSeqNum++; }
383 :
384 : /** @brief 设置接收序列号 */
385 143 : void set_recv_seq_num(int seq) { recvSeqNum = seq; }
386 :
387 : /**
388 : * @brief 设置发送序列号
389 : * @param seq 新的发送序列号
390 : *
391 : * 用于断线恢复时从存储中恢复序列号状态。
392 : * 注意:正常发送流程中不应直接调用此方法,
393 : * 序列号由 send() 方法自动管理。
394 : */
395 1 : void set_send_seq_num(int seq) { sendSeqNum = seq; }
396 :
397 : // --- 断线恢复相关 ---
398 :
399 : /**
400 : * @brief 发送 ResendRequest 消息
401 : * @param begin_seq_no 请求重传的起始序列号
402 : * @param end_seq_no 请求重传的结束序列号(0 表示到最新)
403 : */
404 : void send_resend_request(int begin_seq_no, int end_seq_no);
405 :
406 : /**
407 : * @brief 发送 SequenceReset-GapFill 消息
408 : * @param seq_num 消息序列号
409 : * @param new_seq_no 新的序列号
410 : */
411 : void send_sequence_reset_gap_fill(int seq_num, int new_seq_no);
412 :
413 : /**
414 : * @brief 保存会话状态到存储
415 : */
416 : void save_session_state();
417 :
418 : /**
419 : * @brief 从存储恢复会话状态
420 : * @return true 如果成功恢复,false 如果没有保存的状态
421 : */
422 : bool restore_session_state();
423 :
424 : /**
425 : * @brief 获取存储接口
426 : * @return IStore* 存储接口指针,可能为 nullptr
427 : */
428 145 : IStore* get_store() const { return store_; }
429 :
430 : /**
431 : * @brief 获取连接对象
432 : * @return std::weak_ptr<Connection> 连接的弱引用
433 : */
434 0 : std::weak_ptr<Connection> get_connection() const { return connection_; }
435 :
436 : /**
437 : * @brief 检查是否正在处理重传请求
438 : */
439 : bool is_processing_resend() const { return processingResend_; }
440 :
441 : /**
442 : * @brief 设置重传处理状态
443 : */
444 0 : void set_processing_resend(bool processing) { processingResend_ = processing; }
445 :
446 : // --- 客户端身份标识 ---
447 :
448 : /**
449 : * @brief 获取客户端标识
450 : * @return 客户端的 CompID(从 Logon 消息中提取)
451 : *
452 : * 对于 Server 端 Session,返回连接客户端的真实标识。
453 : * 对于 Client 端 Session,返回自身的 senderCompID。
454 : */
455 8 : const std::string& get_client_comp_id() const { return clientCompID_; }
456 :
457 : /**
458 : * @brief 设置客户端标识
459 : * @param clientId 客户端的 CompID
460 : *
461 : * 由 DisconnectedState::onMessageReceived 在收到 Logon 消息时调用。
462 : */
463 15 : void set_client_comp_id(const std::string& clientId) { clientCompID_ = clientId; }
464 :
465 : private:
466 : std::atomic<bool> shutting_down_{false}; ///< 关闭中标志
467 : std::recursive_mutex state_mutex_; ///< 状态保护锁
468 : std::unique_ptr<IStateHandler> currentState_; ///< 当前状态对象
469 :
470 : /**
471 : * @brief 内部发送实现
472 : * @param raw_msg 原始消息字符串
473 : */
474 : void internal_send(const std::string& raw_msg);
475 :
476 : /**
477 : * @brief 按序处理暂存的入站消息
478 : *
479 : * 当检测到 MsgSeqNum gap(收到未来序列号的消息)时,会先发送 ResendRequest 并暂存消息。
480 : * 当缺失消息补齐、或收到 SequenceReset-GapFill 推进 recvSeqNum 后,调用此方法即可继续
481 : * 投递暂存消息,保证回调到状态机/应用层的顺序是单调递增的。
482 : *
483 : * @note 必须在持有 state_mutex_ 的情况下调用。
484 : */
485 : void drain_pending_inbound_locked();
486 :
487 : int heartBtInt; ///< 心跳间隔(秒)
488 : const int minHeartBtInt_; ///< 最小心跳间隔
489 : const int maxHeartBtInt_; ///< 最大心跳间隔
490 : ShutdownCallback shutdown_callback_; ///< 关闭回调
491 : std::weak_ptr<Connection> connection_; ///< 关联的连接
492 :
493 : moodycamel::ConcurrentQueue<std::string> outbound_q_; ///< 发送队列
494 :
495 : std::atomic<bool> running_{false}; ///< 运行状态
496 :
497 : int sendSeqNum = 1; ///< 发送序列号
498 : int recvSeqNum = 1; ///< 期望接收的序列号
499 : std::chrono::steady_clock::time_point lastRecv; ///< 最后接收时间
500 : std::chrono::steady_clock::time_point lastSend; ///< 最后发送时间
501 :
502 : /// 暂存的入站消息(用于处理 MsgSeqNum gap),按序列号排序
503 : std::map<int, FixMessage> pending_inbound_;
504 : /// 最近一次发送的 ResendRequest 的 EndSeqNo(0 表示未发起/已完成)
505 : int last_resend_request_end_ = 0;
506 :
507 : TimingWheel* timing_wheel_ = nullptr; ///< 时间轮指针
508 : TimerTaskId timer_task_id_ = INVALID_TIMER_ID; ///< 定时任务 ID
509 :
510 : Application* application_ = nullptr; ///< 应用层处理器指针
511 : IStore* store_ = nullptr; ///< 存储接口指针(用于消息持久化)
512 : bool processingResend_ = false; ///< 是否正在处理重传请求
513 : std::string clientCompID_; ///< 客户端标识(从 Logon 消息提取)
514 : EstablishedCallback established_callback_; ///< 会话建立回调
515 : std::atomic<bool> established_notified_{false}; ///< 会话建立回调触发标志(幂等)
516 : };
517 :
518 : } // namespace fix40
|