Line data Source code
1 : /**
2 : * @file session.cpp
3 : * @brief FIX 会话层实现
4 : *
5 : * 实现 Session 类和各状态处理器类。
6 : */
7 :
8 : #include "fix/session.hpp"
9 :
10 : #include <iomanip>
11 : #include <utility>
12 : #include <stdexcept>
13 : #include <unordered_map>
14 :
15 : #include "core/connection.hpp"
16 : #include "fix/fix_messages.hpp"
17 : #include "fix/application.hpp"
18 : #include "base/timing_wheel.hpp"
19 : #include "base/config.hpp"
20 : #include "base/logger.hpp"
21 : #include "storage/store.hpp"
22 :
23 :
24 : namespace fix40 {
25 :
26 : namespace {
27 :
28 : constexpr const char* kPendingTargetCompID = "PENDING";
29 :
30 : } // namespace
31 :
32 : // =================================================================================
33 : // 状态类声明
34 : // =================================================================================
35 :
36 : class DisconnectedState;
37 : class LogonSentState;
38 : class EstablishedState;
39 : class LogoutSentState;
40 :
41 : /**
42 : * @class DisconnectedState
43 : * @brief 断开连接状态
44 : *
45 : * 初始状态,等待会话启动或接收 Logon 消息。
46 : */
47 : class DisconnectedState : public IStateHandler {
48 : public:
49 : void onMessageReceived(Session& context, const FixMessage& msg) override;
50 : void onTimerCheck(Session& context) override;
51 : void onSessionStart(Session& context) override;
52 : void onLogoutRequest(Session& context, const std::string& reason) override;
53 160 : const char* getStateName() const override { return "Disconnected"; }
54 : };
55 :
56 : /**
57 : * @class LogonSentState
58 : * @brief 已发送 Logon 状态(客户端)
59 : *
60 : * 客户端发送 Logon 后进入此状态,等待服务端确认。
61 : */
62 : class LogonSentState : public IStateHandler {
63 : public:
64 : void onMessageReceived(Session& context, const FixMessage& msg) override;
65 : void onTimerCheck(Session& context) override;
66 0 : void onSessionStart([[maybe_unused]] Session& context) override {}
67 : void onLogoutRequest(Session& context, const std::string& reason) override;
68 267 : const char* getStateName() const override { return "LogonSent"; }
69 : };
70 :
71 : /**
72 : * @class EstablishedState
73 : * @brief 会话已建立状态
74 : *
75 : * 正常工作状态,处理心跳、TestRequest 和业务消息。
76 : */
77 : class EstablishedState : public IStateHandler {
78 : private:
79 : using MessageHandler = void (EstablishedState::*)(Session&, const FixMessage&);
80 : const std::unordered_map<std::string, MessageHandler> messageHandlers_; ///< 消息处理器映射
81 :
82 : std::string awaitingTestReqId_; ///< 等待响应的 TestReqID
83 : std::chrono::steady_clock::time_point logout_initiation_time_; ///< 登出发起时间
84 : bool logout_initiated_ = false; ///< 是否已发起登出
85 :
86 : /** @brief 处理 Heartbeat 消息 */
87 : void handleHeartbeat(Session& context, const FixMessage& msg);
88 : /** @brief 处理 TestRequest 消息 */
89 : void handleTestRequest(Session& context, const FixMessage& msg);
90 : /** @brief 处理 ResendRequest 消息 */
91 : void handleResendRequest(Session& context, const FixMessage& msg);
92 : /** @brief 处理 SequenceReset 消息 */
93 : void handleSequenceReset(Session& context, const FixMessage& msg);
94 : /** @brief 处理 Logout 消息 */
95 : void handleLogout(Session& context, const FixMessage& msg);
96 : /** @brief 处理 Logon 消息(异常情况) */
97 : void handleLogon(Session& context, const FixMessage& msg);
98 :
99 : public:
100 : EstablishedState();
101 : void onMessageReceived(Session& context, const FixMessage& msg) override;
102 : void onTimerCheck(Session& context) override;
103 0 : void onSessionStart([[maybe_unused]] Session& context) override {}
104 : void onLogoutRequest(Session& context, const std::string& reason) override;
105 149 : const char* getStateName() const override { return "Established"; }
106 : };
107 :
108 : /**
109 : * @class LogoutSentState
110 : * @brief 已发送 Logout 状态
111 : *
112 : * 发送 Logout 后进入此状态,等待对方确认或超时。
113 : */
114 : class LogoutSentState : public IStateHandler {
115 : private:
116 : std::string reason_; ///< 登出原因
117 : std::chrono::steady_clock::time_point initiation_time_; ///< 发起时间
118 : public:
119 : /**
120 : * @brief 构造 LogoutSentState
121 : * @param reason 登出原因
122 : */
123 : explicit LogoutSentState(std::string reason);
124 : void onMessageReceived(Session& context, const FixMessage& msg) override;
125 : void onTimerCheck(Session& context) override;
126 0 : void onSessionStart([[maybe_unused]] Session& context) override {}
127 : void onLogoutRequest(Session& context, const std::string& reason) override;
128 6 : const char* getStateName() const override { return "LogoutSent"; }
129 : };
130 :
131 : // =================================================================================
132 : // Session 类实现
133 : // =================================================================================
134 :
135 280 : Session::Session(const std::string& sender,
136 : const std::string& target,
137 : int hb,
138 : ShutdownCallback shutdown_cb,
139 280 : IStore* store)
140 280 : : senderCompID(sender),
141 280 : targetCompID(target),
142 280 : heartBtInt(hb),
143 1120 : minHeartBtInt_(Config::instance().get_int("fix_session", "min_heartbeat_interval", 5)),
144 1120 : maxHeartBtInt_(Config::instance().get_int("fix_session", "max_heartbeat_interval", 120)),
145 280 : shutdown_callback_(std::move(shutdown_cb)),
146 840 : store_(store) {
147 :
148 : // 初始状态为断开
149 280 : currentState_ = std::make_unique<DisconnectedState>();
150 280 : update_last_recv_time();
151 280 : update_last_send_time();
152 :
153 : // 如果有存储接口,尝试恢复会话状态
154 280 : if (store_) {
155 : // 服务端 accept 阶段使用占位 TargetCompID(例如 "PENDING")创建 Session,
156 : // 此时真实客户端 CompID 尚未知晓,不能用占位 key 恢复序列号。
157 : // 待收到 Logon 并完成身份绑定后再恢复,避免错误读取或写入 session_states。
158 206 : if (targetCompID != kPendingTargetCompID) {
159 204 : restore_session_state();
160 : }
161 : }
162 280 : }
163 :
164 560 : Session::~Session() {
165 : // 取消定时任务,避免访问已销毁的对象
166 280 : if (timing_wheel_ && timer_task_id_ != INVALID_TIMER_ID) {
167 0 : timing_wheel_->cancel_task(timer_task_id_);
168 : }
169 280 : LOG() << "Session (" << senderCompID << " -> " << targetCompID << ") destroyed.";
170 280 : }
171 :
172 0 : void Session::set_connection(std::weak_ptr<Connection> conn) {
173 0 : connection_ = std::move(conn);
174 0 : }
175 :
176 15 : void Session::set_application(Application* app) {
177 15 : application_ = app;
178 15 : }
179 :
180 155 : Application* Session::get_application() const {
181 155 : return application_;
182 : }
183 :
184 46 : SessionID Session::get_session_id() const {
185 46 : return SessionID(senderCompID, targetCompID);
186 : }
187 :
188 1 : void Session::set_established_callback(EstablishedCallback cb) {
189 1 : std::lock_guard<std::recursive_mutex> lock(state_mutex_);
190 1 : established_callback_ = std::move(cb);
191 1 : }
192 :
193 140 : void Session::notify_established() {
194 : // Fast path:多数情况下 notify 只会调用一次;若已触发过则直接返回,避免进入锁竞争。
195 140 : if (established_notified_.load(std::memory_order_acquire)) {
196 1 : return;
197 : }
198 :
199 139 : EstablishedCallback cb;
200 : {
201 139 : std::lock_guard<std::recursive_mutex> lock(state_mutex_);
202 139 : if (established_notified_.exchange(true, std::memory_order_acq_rel)) {
203 0 : return;
204 : }
205 139 : cb = established_callback_;
206 139 : }
207 139 : if (cb) {
208 1 : cb(shared_from_this());
209 : }
210 139 : }
211 :
212 11 : void Session::set_target_comp_id(const std::string& target) {
213 11 : std::lock_guard<std::recursive_mutex> lock(state_mutex_);
214 11 : targetCompID = target;
215 11 : }
216 :
217 4 : void Session::send_app_message(FixMessage& msg) {
218 4 : std::lock_guard<std::recursive_mutex> lock(state_mutex_);
219 :
220 : // 调用应用层的 toApp 回调
221 4 : if (application_) {
222 : try {
223 2 : application_->toApp(msg, get_session_id());
224 0 : } catch (const std::exception& e) {
225 0 : LOG() << "Application::toApp threw exception: " << e.what();
226 0 : } catch (...) {
227 0 : LOG() << "Application::toApp threw unknown exception";
228 0 : }
229 : }
230 :
231 : // 使用标准发送流程
232 4 : send(msg);
233 4 : }
234 :
235 151 : void Session::start() {
236 151 : std::lock_guard<std::recursive_mutex> lock(state_mutex_);
237 151 : running_ = true;
238 151 : lastRecv = std::chrono::steady_clock::now();
239 151 : lastSend = std::chrono::steady_clock::now();
240 151 : currentState_->onSessionStart(*this);
241 151 : }
242 :
243 14 : void Session::stop() {
244 14 : running_ = false;
245 14 : }
246 :
247 291 : void Session::changeState(std::unique_ptr<IStateHandler> newState) {
248 291 : std::lock_guard<std::recursive_mutex> lock(state_mutex_);
249 291 : if (!newState) return;
250 291 : LOG() << "Session (" << senderCompID << "): State changing from <"
251 291 : << (currentState_ ? currentState_->getStateName() : "null") << "> to <"
252 291 : << newState->getStateName() << ">";
253 291 : currentState_ = std::move(newState);
254 291 : }
255 :
256 0 : void Session::send_buffered_data() {
257 0 : if (auto conn = connection_.lock()) {
258 0 : conn->handle_write();
259 0 : }
260 0 : }
261 :
262 0 : void Session::handle_write_ready() {
263 0 : if (auto conn = connection_.lock()) {
264 0 : conn->handle_write();
265 0 : }
266 0 : }
267 :
268 : // 事件处理委托
269 160 : void Session::on_message_received(const FixMessage& msg) {
270 160 : std::lock_guard<std::recursive_mutex> lock(state_mutex_);
271 160 : if (!running_) return;
272 :
273 160 : update_last_recv_time();
274 :
275 160 : const int msg_seq_num = msg.get_int(tags::MsgSeqNum);
276 160 : const std::string msg_type = msg.get_string(tags::MsgType);
277 :
278 : // Logon 属于会话层握手消息:
279 : // - Disconnected:服务端需要先从 Logon 解析出真实 clientCompID,再从 store 恢复/对齐序列号;
280 : // - LogonSent:客户端等待 LogonAck,此时服务端可能按历史会话继续序列号。
281 : // 因此在握手阶段不做严格的序列号校验,由状态机负责对齐 recvSeqNum。
282 434 : if (msg_type == "A" &&
283 274 : (dynamic_cast<DisconnectedState*>(currentState_.get()) != nullptr ||
284 131 : dynamic_cast<LogonSentState*>(currentState_.get()) != nullptr)) {
285 142 : currentState_->onMessageReceived(*this, msg);
286 142 : drain_pending_inbound_locked();
287 142 : return;
288 : }
289 :
290 : // 检查是否是 SequenceReset 消息(特殊处理)
291 18 : if (msg_type == "4") {
292 : // SequenceReset 消息需要特殊处理,不检查序列号
293 0 : currentState_->onMessageReceived(*this, msg);
294 0 : drain_pending_inbound_locked();
295 0 : return;
296 : }
297 :
298 : // 检查序列号
299 18 : if (msg_seq_num > recvSeqNum) {
300 : // 检测到序列号 gap:
301 : // - 发送 ResendRequest 请求补齐缺失消息;
302 : // - 暂存“未来序列号”的消息,等待缺失补齐或 GapFill 推进序列号后再按序投递。
303 4 : LOG() << "Sequence number gap detected. Expected: " << recvSeqNum
304 2 : << ", Got: " << msg_seq_num << ". Sending ResendRequest and buffering message.";
305 :
306 2 : const int end_seq_no = msg_seq_num - 1;
307 2 : const int begin_seq_no = std::max(recvSeqNum, last_resend_request_end_ + 1);
308 2 : if (end_seq_no >= begin_seq_no) {
309 2 : send_resend_request(begin_seq_no, end_seq_no);
310 2 : last_resend_request_end_ = std::max(last_resend_request_end_, end_seq_no);
311 : }
312 :
313 : // 暂存当前消息(避免覆盖同 seq 的重复到达)
314 2 : constexpr size_t kMaxPendingInbound = 10000;
315 2 : if (pending_inbound_.size() >= kMaxPendingInbound) {
316 0 : perform_shutdown("Too many buffered inbound messages (" +
317 0 : std::to_string(pending_inbound_.size()) + ").");
318 0 : return;
319 : }
320 2 : pending_inbound_.try_emplace(msg_seq_num, msg);
321 2 : return;
322 16 : } else if (msg_seq_num < recvSeqNum) {
323 : // 收到的序列号小于期望值
324 : // 检查是否是重复消息(PossDupFlag = Y)
325 0 : if (msg.has(tags::PossDupFlag) && msg.get_string(tags::PossDupFlag) == "Y") {
326 0 : LOG() << "Ignoring duplicate message with SeqNum=" << msg_seq_num;
327 0 : return;
328 : }
329 :
330 : // 非重复消息但序列号过低,这是严重错误
331 0 : perform_shutdown("Received message with sequence number lower than expected. Expected: " +
332 0 : std::to_string(recvSeqNum) + " Got: " + std::to_string(msg_seq_num));
333 0 : return;
334 : }
335 :
336 : // 序列号正确,正常处理
337 16 : currentState_->onMessageReceived(*this, msg);
338 16 : drain_pending_inbound_locked();
339 304 : }
340 :
341 158 : void Session::drain_pending_inbound_locked() {
342 : // 仅在会话正常运行时处理缓冲消息
343 158 : if (!running_) return;
344 :
345 : // 若 recvSeqNum 被 SequenceReset-GapFill 等推进,之前缓存的“更低 seq”消息已不再可用,
346 : // 必须丢弃,否则会造成缓存泄漏或错误重放。
347 151 : if (!pending_inbound_.empty()) {
348 2 : pending_inbound_.erase(pending_inbound_.begin(), pending_inbound_.lower_bound(recvSeqNum));
349 : }
350 :
351 : while (true) {
352 152 : auto it = pending_inbound_.find(recvSeqNum);
353 152 : if (it == pending_inbound_.end()) {
354 151 : break;
355 : }
356 :
357 1 : FixMessage next = it->second;
358 1 : pending_inbound_.erase(it);
359 :
360 : // 递归进入状态机处理(会话层会根据 MsgType 自行递增 recvSeqNum)
361 1 : currentState_->onMessageReceived(*this, next);
362 1 : if (!running_) {
363 0 : pending_inbound_.clear();
364 0 : break;
365 : }
366 2 : }
367 :
368 : // gap 已被补齐(或通过 GapFill 推进到更高序列),清理 resend 追踪状态
369 151 : if (pending_inbound_.empty() && last_resend_request_end_ > 0 && recvSeqNum > last_resend_request_end_) {
370 1 : last_resend_request_end_ = 0;
371 : }
372 : }
373 :
374 3 : void Session::on_timer_check() {
375 3 : std::lock_guard<std::recursive_mutex> lock(state_mutex_);
376 3 : if (!running_) return;
377 2 : currentState_->onTimerCheck(*this);
378 3 : }
379 :
380 1 : void Session::on_io_error(const std::string& reason) {
381 1 : perform_shutdown("I/O Error: " + reason);
382 1 : }
383 :
384 2 : void Session::on_shutdown(const std::string& reason) {
385 2 : perform_shutdown(reason);
386 2 : }
387 :
388 7 : void Session::initiate_logout(const std::string& reason) {
389 7 : std::lock_guard<std::recursive_mutex> lock(state_mutex_);
390 7 : if (!running_) return;
391 7 : currentState_->onLogoutRequest(*this, reason);
392 7 : }
393 :
394 265 : void Session::send(FixMessage& msg) {
395 265 : std::lock_guard<std::recursive_mutex> lock(state_mutex_);
396 265 : int seq_num = sendSeqNum++;
397 265 : msg.set(tags::MsgSeqNum, seq_num);
398 :
399 265 : std::string raw_msg = codec_.encode(msg);
400 :
401 : // 持久化消息(用于断线恢复时重传)
402 265 : if (store_) {
403 208 : StoredMessage stored_msg;
404 208 : stored_msg.seqNum = seq_num;
405 208 : stored_msg.senderCompID = senderCompID;
406 208 : stored_msg.targetCompID = targetCompID;
407 208 : stored_msg.msgType = msg.get_string(tags::MsgType);
408 208 : stored_msg.rawMessage = raw_msg;
409 208 : stored_msg.timestamp = std::chrono::duration_cast<std::chrono::milliseconds>(
410 416 : std::chrono::system_clock::now().time_since_epoch()).count();
411 :
412 208 : if (!store_->saveMessage(stored_msg)) {
413 0 : LOG() << "Warning: Failed to persist message with SeqNum=" << seq_num;
414 : }
415 :
416 : // 保存会话状态
417 208 : save_session_state();
418 208 : }
419 :
420 265 : internal_send(raw_msg);
421 265 : }
422 :
423 265 : void Session::internal_send(const std::string& raw_msg) {
424 795 : LOG() << ">>> SEND (" << (connection_.lock() ? std::to_string(connection_.lock()->fd()) : "N/A") << "): " << raw_msg;
425 :
426 265 : if (auto conn = connection_.lock()) {
427 0 : conn->send(raw_msg);
428 0 : update_last_send_time();
429 265 : }
430 265 : }
431 :
432 15 : void Session::perform_shutdown(const std::string& reason) {
433 15 : if (shutting_down_.exchange(true)) return;
434 :
435 13 : LOG() << "Session shutting down. Reason: " << reason;
436 :
437 : // 通知应用层会话即将断开
438 13 : if (application_) {
439 : try {
440 1 : application_->onLogout(get_session_id());
441 0 : } catch (const std::exception& e) {
442 0 : LOG() << "Application::onLogout threw exception: " << e.what();
443 0 : } catch (...) {
444 0 : LOG() << "Application::onLogout threw unknown exception";
445 0 : }
446 : }
447 :
448 13 : stop();
449 :
450 13 : changeState(std::make_unique<DisconnectedState>());
451 :
452 13 : if (auto conn = connection_.lock()) {
453 0 : conn->shutdown();
454 13 : }
455 :
456 13 : if (shutdown_callback_) {
457 13 : shutdown_callback_();
458 : }
459 : }
460 :
461 : // 时间管理
462 441 : void Session::update_last_recv_time() { lastRecv = std::chrono::steady_clock::now(); }
463 281 : void Session::update_last_send_time() { lastSend = std::chrono::steady_clock::now(); }
464 2 : std::chrono::steady_clock::time_point Session::get_last_recv_time() const { return lastRecv; }
465 2 : std::chrono::steady_clock::time_point Session::get_last_send_time() const { return lastSend; }
466 149 : int Session::get_heart_bt_int() const { return heartBtInt; }
467 10 : void Session::set_heart_bt_int(int new_hb) { heartBtInt = new_hb; }
468 12 : int Session::get_min_heart_bt_int() const { return minHeartBtInt_; }
469 12 : int Session::get_max_heart_bt_int() const { return maxHeartBtInt_; }
470 :
471 :
472 : // ... [其它辅助函数的实现]
473 :
474 0 : void Session::schedule_timer_tasks(TimingWheel* wheel) {
475 0 : if (!wheel || !running_) return;
476 :
477 0 : timing_wheel_ = wheel;
478 0 : std::weak_ptr<Session> weak_self = shared_from_this();
479 :
480 : // 使用周期性任务,一次注册永久生效,直到被取消
481 0 : timer_task_id_ = wheel->add_periodic_task(1000, [weak_self]() {
482 0 : if (auto self = weak_self.lock()) {
483 0 : if (auto conn = self->connection_.lock()) {
484 : // 将定时任务派发到连接绑定的工作线程执行
485 0 : conn->dispatch([self]() {
486 0 : if (self->is_running()) {
487 0 : self->on_timer_check();
488 : }
489 0 : });
490 0 : }
491 0 : }
492 : // 不再需要重新调度,周期性任务会自动重复执行
493 0 : });
494 0 : }
495 :
496 6 : void Session::send_logout(const std::string& reason) {
497 6 : auto logout_msg = create_logout_message(senderCompID, targetCompID, 0, reason);
498 6 : send(logout_msg);
499 6 : }
500 :
501 3 : void Session::send_heartbeat(const std::string& test_req_id) {
502 3 : auto hb_msg = create_heartbeat_message(senderCompID, targetCompID, 0, test_req_id);
503 3 : send(hb_msg);
504 3 : }
505 :
506 1 : void Session::send_test_request(const std::string& id) {
507 1 : auto tr_msg = create_test_request_message(senderCompID, targetCompID, 0, id);
508 1 : send(tr_msg);
509 1 : }
510 :
511 :
512 : // =================================================================================
513 : // 状态类实现
514 : // =================================================================================
515 :
516 : // --- 断开状态 ---
517 13 : void DisconnectedState::onMessageReceived(Session& context, const FixMessage& msg) {
518 : // 此逻辑主要用于服务器端。
519 13 : if (msg.get_string(tags::MsgType) == "A") {
520 12 : if (context.senderCompID == "SERVER") {
521 13 : const bool reset_seq = msg.has(tags::ResetSeqNumFlag) &&
522 13 : msg.get_string(tags::ResetSeqNumFlag) == "Y";
523 : // 从 Logon 消息中提取客户端标识
524 : // FIX 协议中,客户端发送的 Logon 消息的 SenderCompID 是客户端的标识
525 12 : if (!msg.has(tags::SenderCompID)) {
526 0 : LOG() << "Logon rejected: missing SenderCompID";
527 0 : auto logout_msg = create_logout_message(context.senderCompID, context.targetCompID, 1, "Missing SenderCompID");
528 0 : context.send(logout_msg);
529 0 : context.perform_shutdown("Missing SenderCompID in Logon");
530 0 : return;
531 0 : }
532 : // FIX 协议中,客户端 Logon 的 TargetCompID 应该是服务端的 CompID
533 12 : if (!msg.has(tags::TargetCompID) || msg.get_string(tags::TargetCompID) != context.senderCompID) {
534 1 : LOG() << "Logon rejected: invalid TargetCompID";
535 : // 目标不匹配属于协议错误,直接断开即可
536 1 : context.perform_shutdown("Invalid TargetCompID in Logon");
537 1 : return;
538 : }
539 11 : std::string clientCompID = msg.get_string(tags::SenderCompID);
540 11 : if (clientCompID.empty()) {
541 0 : LOG() << "Logon rejected: empty SenderCompID";
542 0 : auto logout_msg = create_logout_message(context.senderCompID, context.targetCompID, 1, "Empty SenderCompID");
543 0 : context.send(logout_msg);
544 0 : context.perform_shutdown("Empty SenderCompID in Logon");
545 0 : return;
546 0 : }
547 11 : context.set_client_comp_id(clientCompID);
548 11 : LOG() << "Client CompID extracted from Logon: " << clientCompID;
549 :
550 : // 将会话的 TargetCompID 更新为真实客户端 CompID,使后续发包 TargetCompID(56) 正确
551 11 : context.set_target_comp_id(clientCompID);
552 :
553 : // 现在已具备稳定的 (senderCompID, targetCompID) key,可以从 store 恢复序列号。
554 11 : if (auto* store = context.get_store()) {
555 2 : if (reset_seq) {
556 : // 若客户端请求重置序列号,需要清理该方向已持久化的历史消息,
557 : // 否则序列号重置后会出现重复 seq_num 的旧消息干扰重传。
558 1 : store->deleteMessagesForSession(context.senderCompID, clientCompID);
559 1 : context.set_send_seq_num(1);
560 1 : context.set_recv_seq_num(1);
561 : } else {
562 1 : context.restore_session_state();
563 : }
564 : }
565 :
566 : // Logon 消息本身已经被消费:将期望接收序列号推进到当前 MsgSeqNum + 1。
567 : // 若已从 store 恢复 recvSeqNum,这里也确保与对端 Logon 继续序列对齐。
568 11 : context.set_recv_seq_num(msg.get_int(tags::MsgSeqNum) + 1);
569 :
570 11 : const int client_hb_interval = msg.get_int(tags::HeartBtInt);
571 : // 注意:真实实现应当从会话上下文获取最小和最大值,
572 : // 此处为了示例假设构造函数已保存这些值。
573 : // 假设 Session 已保存了这些值,
574 : // 但当前没有相应的 getter 可用。
575 : // 为简单起见暂时使用固定范围,更好的方式是提供 getter。
576 11 : const int server_min_hb = context.get_min_heart_bt_int();
577 11 : const int server_max_hb = context.get_max_heart_bt_int();
578 :
579 11 : if (client_hb_interval >= server_min_hb && client_hb_interval <= server_max_hb) {
580 18 : LOG() << "Client requested HeartBtInt=" << client_hb_interval
581 9 : << ". Accepted. Establishing session.";
582 :
583 9 : context.set_heart_bt_int(client_hb_interval); // 采用客户端的心跳值
584 :
585 : auto logon_ack = create_logon_message(
586 9 : context.senderCompID,
587 9 : context.targetCompID,
588 : 1,
589 : context.get_heart_bt_int(),
590 9 : reset_seq);
591 9 : context.send(logon_ack);
592 9 : context.changeState(std::make_unique<EstablishedState>());
593 :
594 : // 会话已建立:在同一调用栈中同步执行“先回调、再通知应用层”的顺序。
595 : // 这样做的目的,是确保应用层收到 onLogon 事件之前,SessionManager 已完成注册,
596 : // 从而避免应用层在 onLogon 内部查找会话时出现“找不到 session”的竞态问题。
597 9 : context.notify_established();
598 :
599 : // 通知应用层会话已建立
600 9 : if (Application* app = context.get_application()) {
601 : try {
602 2 : app->onLogon(context.get_session_id());
603 0 : } catch (const std::exception& e) {
604 0 : LOG() << "Application::onLogon threw exception: " << e.what();
605 0 : } catch (...) {
606 0 : LOG() << "Application::onLogon threw unknown exception";
607 0 : }
608 : }
609 9 : } else {
610 2 : std::string reason = "HeartBtInt=" + std::to_string(client_hb_interval) + " is out of acceptable range [" + std::to_string(server_min_hb) + ", " + std::to_string(server_max_hb) + "].";
611 2 : LOG() << reason;
612 2 : auto logout_msg = create_logout_message(context.senderCompID, context.targetCompID, 1, reason);
613 2 : context.send(logout_msg);
614 2 : context.perform_shutdown(reason);
615 2 : }
616 11 : }
617 : } else {
618 2 : context.perform_shutdown("Received non-Logon message in disconnected state.");
619 : }
620 : }
621 1 : void DisconnectedState::onTimerCheck([[maybe_unused]] Session& context) {} // 无操作
622 151 : void DisconnectedState::onSessionStart(Session& context) {
623 : // 判断是否为客户端:targetCompID 为 "SERVER" 表示这是客户端 Session
624 : // 服务端 Session 的 senderCompID 为 "SERVER"
625 151 : bool isClient = (context.targetCompID == "SERVER");
626 :
627 151 : if (isClient) {
628 : // 客户端发起 Logon
629 : // 若客户端未启用持久化(store 为空),则请求重置序列号,避免服务端按旧会话序列号发送导致握手失败。
630 134 : const bool reset_seq = (context.get_store() == nullptr);
631 : auto logon_msg = create_logon_message(
632 134 : context.senderCompID,
633 134 : context.targetCompID,
634 : 1,
635 : context.get_heart_bt_int(),
636 134 : reset_seq);
637 134 : context.send(logon_msg);
638 134 : context.changeState(std::make_unique<LogonSentState>());
639 134 : LOG() << "Client session started, sending Logon to SERVER.";
640 134 : } else {
641 : // 服务器端仅等待
642 17 : LOG() << "Server session started, waiting for client Logon.";
643 : }
644 151 : }
645 1 : void DisconnectedState::onLogoutRequest([[maybe_unused]] Session& context, [[maybe_unused]] const std::string& reason) {} // 无操作
646 :
647 : // --- 已发送 Logon 状态 ---
648 131 : void LogonSentState::onMessageReceived(Session& context, const FixMessage& msg) {
649 131 : if (msg.get_string(tags::MsgType) == "A") {
650 : // LogonAck 的 MsgSeqNum 可能来自历史会话(双方启用持久化时会继续序列号),
651 : // 此处以对端的 MsgSeqNum 为准对齐期望接收序列号。
652 130 : context.set_recv_seq_num(msg.get_int(tags::MsgSeqNum) + 1);
653 130 : LOG() << "Logon confirmation received. Session established.";
654 130 : context.changeState(std::make_unique<EstablishedState>());
655 :
656 : // 会话已建立:在同一调用栈中同步执行“先回调、再通知应用层”的顺序。
657 : // 这样做的目的,是确保应用层收到 onLogon 事件之前,SessionManager 已完成注册。
658 130 : context.notify_established();
659 :
660 : // 通知应用层会话已建立
661 130 : if (Application* app = context.get_application()) {
662 : try {
663 8 : app->onLogon(context.get_session_id());
664 1 : } catch (const std::exception& e) {
665 1 : LOG() << "Application::onLogon threw exception: " << e.what();
666 1 : } catch (...) {
667 0 : LOG() << "Application::onLogon threw unknown exception";
668 0 : }
669 : }
670 : } else {
671 2 : context.perform_shutdown("Received non-Logon message while waiting for Logon confirmation.");
672 : }
673 131 : }
674 0 : void LogonSentState::onTimerCheck([[maybe_unused]] Session& context) { /* 这里可以添加登录超时逻辑 */ }
675 1 : void LogonSentState::onLogoutRequest(Session& context, const std::string& reason) {
676 1 : context.perform_shutdown("Logout requested during logon process: " + reason);
677 1 : }
678 :
679 : // --- 已建立状态 ---
680 973 : EstablishedState::EstablishedState() : messageHandlers_({
681 0 : {"0", &EstablishedState::handleHeartbeat},
682 0 : {"1", &EstablishedState::handleTestRequest},
683 0 : {"2", &EstablishedState::handleResendRequest},
684 0 : {"4", &EstablishedState::handleSequenceReset},
685 0 : {"5", &EstablishedState::handleLogout},
686 139 : {"A", &EstablishedState::handleLogon}
687 417 : }) {}
688 :
689 13 : void EstablishedState::onMessageReceived(Session& context, const FixMessage& msg) {
690 13 : const std::string msg_type = msg.get_string(tags::MsgType);
691 :
692 : // SequenceReset 消息不递增 recvSeqNum,由 handleSequenceReset 处理序列号更新
693 13 : if (msg_type != "4") {
694 13 : context.increment_recv_seq_num();
695 : }
696 :
697 13 : auto it = messageHandlers_.find(msg_type);
698 13 : if (it != messageHandlers_.end()) {
699 : // 会话层消息,由状态机处理
700 6 : (this->*(it->second))(context, msg);
701 :
702 : // 通知应用层收到管理消息(可选回调)
703 6 : if (Application* app = context.get_application()) {
704 : try {
705 1 : app->fromAdmin(msg, context.get_session_id());
706 0 : } catch (const std::exception& e) {
707 0 : LOG() << "Application::fromAdmin threw exception: " << e.what();
708 0 : } catch (...) {
709 0 : LOG() << "Application::fromAdmin threw unknown exception";
710 0 : }
711 : }
712 : } else {
713 : // 业务消息,委托给应用层处理
714 7 : Application* app = context.get_application();
715 7 : if (app) {
716 : try {
717 6 : app->fromApp(msg, context.get_session_id());
718 1 : } catch (const std::exception& e) {
719 1 : LOG() << "Application::fromApp threw exception: " << e.what();
720 : // 可选:发送 BusinessMessageReject
721 1 : } catch (...) {
722 0 : LOG() << "Application::fromApp threw unknown exception";
723 0 : }
724 : } else {
725 4 : LOG() << "Received business message (MsgType=" << msg_type
726 2 : << ") but no Application is set. Message ignored.";
727 : }
728 : }
729 13 : }
730 :
731 1 : void EstablishedState::onTimerCheck(Session& context) {
732 1 : auto now = std::chrono::steady_clock::now();
733 1 : auto seconds_since_recv = std::chrono::duration_cast<std::chrono::seconds>(now - context.get_last_recv_time()).count();
734 1 : auto seconds_since_send = std::chrono::duration_cast<std::chrono::seconds>(now - context.get_last_send_time()).count();
735 1 : const int hb_interval = context.get_heart_bt_int();
736 :
737 1 : if (logout_initiated_ &&
738 0 : std::chrono::duration_cast<std::chrono::seconds>(now - logout_initiation_time_).count() >=
739 1 : Config::instance().get_int("fix_session", "logout_confirm_timeout_sec", 10)) {
740 0 : context.perform_shutdown("Logout confirmation not received within timeout.");
741 0 : return;
742 : }
743 :
744 : // --- TestRequest 超时检查 ---
745 1 : if (!awaitingTestReqId_.empty() && seconds_since_recv >=
746 1 : static_cast<long>(hb_interval * Config::instance().get_double("fix_session", "test_request_timeout_multiplier", 1.5))) {
747 0 : context.perform_shutdown("TestRequest timeout. No response from peer.");
748 0 : return;
749 : }
750 :
751 : // --- 独立检查 1:是否需要向对端发送心跳 ---
752 : // 如果在一个心跳间隔内没有发送任何数据,则立刻发送心跳
753 1 : if (seconds_since_send >= hb_interval) {
754 0 : context.send_heartbeat();
755 : }
756 :
757 : // --- 独立检查 2:是否需要测试对端连接 (TestRequest) ---
758 : // 如果长时间未收到任何消息,且当前没有等待 TestReq 响应,
759 : // 发送一个 TestRequest。
760 1 : if (seconds_since_recv >= static_cast<long>(hb_interval * 1.2) && awaitingTestReqId_.empty()) {
761 0 : awaitingTestReqId_ = "TestReq_" + std::to_string(std::chrono::duration_cast<std::chrono::milliseconds>(now.time_since_epoch()).count());
762 0 : context.send_test_request(awaitingTestReqId_);
763 : }
764 : }
765 :
766 4 : void EstablishedState::onLogoutRequest(Session& context, const std::string& reason) {
767 4 : logout_initiated_ = true;
768 4 : logout_initiation_time_ = std::chrono::steady_clock::now();
769 4 : context.send_logout(reason);
770 4 : context.changeState(std::make_unique<LogoutSentState>(reason));
771 4 : }
772 :
773 3 : void EstablishedState::handleHeartbeat(Session&, const FixMessage& msg) {
774 3 : if (msg.has(tags::TestReqID)) {
775 1 : if (msg.get_string(tags::TestReqID) == awaitingTestReqId_) {
776 0 : awaitingTestReqId_.clear();
777 : }
778 : }
779 3 : }
780 :
781 1 : void EstablishedState::handleTestRequest(Session& context, const FixMessage& msg) {
782 1 : context.send_heartbeat(msg.get_string(tags::TestReqID));
783 1 : }
784 :
785 1 : void EstablishedState::handleLogout(Session& context, [[maybe_unused]] const FixMessage& msg) {
786 1 : if (logout_initiated_) {
787 : // 我方发起了登出,这是确认。
788 0 : context.perform_shutdown("Logout confirmation received.");
789 : } else {
790 : // 对方发起了登出,我们确认后等待最终确认或超时。
791 1 : context.send_logout("Confirming peer's logout request");
792 1 : context.changeState(std::make_unique<LogoutSentState>("Peer initiated logout"));
793 : }
794 1 : }
795 :
796 1 : void EstablishedState::handleLogon(Session& context, [[maybe_unused]] const FixMessage& msg) {
797 1 : context.perform_shutdown("Logon not expected after session is established.");
798 1 : }
799 :
800 0 : void EstablishedState::handleResendRequest(Session& context, const FixMessage& msg) {
801 0 : int begin_seq_no = msg.get_int(tags::BeginSeqNo);
802 0 : int end_seq_no = msg.get_int(tags::EndSeqNo);
803 :
804 0 : LOG() << "Received ResendRequest: BeginSeqNo=" << begin_seq_no << ", EndSeqNo=" << end_seq_no;
805 :
806 0 : IStore* store = context.get_store();
807 0 : if (!store) {
808 : // 没有存储,无法重传,发送 SequenceReset-GapFill 跳过所有请求的消息
809 0 : LOG() << "No store available for message resend. Sending SequenceReset-GapFill.";
810 0 : context.send_sequence_reset_gap_fill(begin_seq_no, context.get_send_seq_num());
811 0 : return;
812 : }
813 :
814 : // 如果 end_seq_no 为 0,表示请求到最新
815 0 : if (end_seq_no == 0) {
816 0 : end_seq_no = context.get_send_seq_num() - 1;
817 : }
818 :
819 : // 从存储加载消息
820 0 : auto messages = store->loadMessages(context.senderCompID, context.targetCompID,
821 0 : begin_seq_no, end_seq_no);
822 :
823 0 : if (messages.empty()) {
824 : // 没有找到消息,发送 SequenceReset-GapFill
825 0 : LOG() << "No messages found for resend. Sending SequenceReset-GapFill.";
826 0 : context.send_sequence_reset_gap_fill(begin_seq_no, end_seq_no + 1);
827 0 : return;
828 : }
829 :
830 : // 重传消息
831 0 : context.set_processing_resend(true);
832 :
833 0 : int gap_fill_start = -1;
834 0 : int last_seq = begin_seq_no - 1;
835 :
836 0 : for (const auto& stored_msg : messages) {
837 : // 检查是否有序列号 gap(消息丢失)
838 0 : if (stored_msg.seqNum > last_seq + 1) {
839 : // 发送 GapFill 跳过缺失的消息
840 0 : if (gap_fill_start == -1) {
841 0 : gap_fill_start = last_seq + 1;
842 : }
843 : }
844 :
845 : // 检查是否是管理消息
846 0 : if (is_admin_message(stored_msg.msgType)) {
847 : // 管理消息用 GapFill 跳过
848 0 : if (gap_fill_start == -1) {
849 0 : gap_fill_start = stored_msg.seqNum;
850 : }
851 : } else {
852 : // 业务消息需要重传
853 : // 先发送之前累积的 GapFill
854 0 : if (gap_fill_start != -1) {
855 0 : context.send_sequence_reset_gap_fill(gap_fill_start, stored_msg.seqNum);
856 0 : gap_fill_start = -1;
857 : }
858 :
859 : // 重传业务消息(添加 PossDupFlag)
860 0 : LOG() << "Resending message SeqNum=" << stored_msg.seqNum;
861 :
862 : // 解码原始消息,添加 PossDupFlag,重新编码发送
863 0 : FixMessage resend_msg = context.codec_.decode(stored_msg.rawMessage);
864 0 : resend_msg.set(tags::PossDupFlag, "Y");
865 0 : resend_msg.set(tags::OrigSendingTime, resend_msg.get_string(tags::SendingTime));
866 :
867 0 : std::string raw_msg = context.codec_.encode(resend_msg);
868 : // 直接发送,不更新序列号
869 0 : if (auto conn = context.get_connection().lock()) {
870 0 : conn->send(raw_msg);
871 0 : }
872 0 : }
873 :
874 0 : last_seq = stored_msg.seqNum;
875 : }
876 :
877 : // 发送最后的 GapFill(如果有)
878 0 : if (gap_fill_start != -1) {
879 0 : context.send_sequence_reset_gap_fill(gap_fill_start, end_seq_no + 1);
880 : }
881 :
882 : // 检查是否还有未覆盖的序列号
883 0 : if (last_seq < end_seq_no) {
884 0 : context.send_sequence_reset_gap_fill(last_seq + 1, end_seq_no + 1);
885 : }
886 :
887 0 : context.set_processing_resend(false);
888 0 : }
889 :
890 0 : void EstablishedState::handleSequenceReset(Session& context, const FixMessage& msg) {
891 0 : int new_seq_no = msg.get_int(tags::NewSeqNo);
892 0 : int msg_seq_num = msg.get_int(tags::MsgSeqNum);
893 0 : bool is_gap_fill = msg.has(tags::GapFillFlag) && msg.get_string(tags::GapFillFlag) == "Y";
894 :
895 0 : LOG() << "Received SequenceReset: MsgSeqNum=" << msg_seq_num
896 0 : << ", NewSeqNo=" << new_seq_no
897 0 : << ", GapFill=" << (is_gap_fill ? "Y" : "N");
898 :
899 : // 基本验证:NewSeqNo 必须大于 0
900 0 : if (new_seq_no <= 0) {
901 0 : LOG() << "Error: Invalid NewSeqNo=" << new_seq_no << ". Must be positive.";
902 0 : context.perform_shutdown("Invalid SequenceReset: NewSeqNo must be positive");
903 0 : return;
904 : }
905 :
906 0 : if (is_gap_fill) {
907 : // GapFill 模式:只能向前移动序列号
908 0 : int current_recv_seq = context.get_recv_seq_num();
909 0 : if (new_seq_no > current_recv_seq) {
910 0 : context.set_recv_seq_num(new_seq_no);
911 0 : LOG() << "Updated expected receive sequence number to " << new_seq_no;
912 0 : } else if (new_seq_no < current_recv_seq) {
913 0 : LOG() << "Warning: SequenceReset-GapFill with NewSeqNo=" << new_seq_no
914 0 : << " is less than expected " << current_recv_seq << ". Ignoring.";
915 : }
916 : } else {
917 : // Reset 模式:可以重置到任意值(但仍需大于 0,已在上面验证)
918 0 : context.set_recv_seq_num(new_seq_no);
919 0 : LOG() << "Reset expected receive sequence number to " << new_seq_no;
920 : }
921 : }
922 :
923 : // --- 已发送 Logout 状态 ---
924 5 : LogoutSentState::LogoutSentState(std::string reason)
925 5 : : reason_(std::move(reason)), initiation_time_(std::chrono::steady_clock::now()) {}
926 :
927 2 : void LogoutSentState::onMessageReceived(Session& context, const FixMessage& msg) {
928 2 : if (msg.get_string(tags::MsgType) == "5") {
929 1 : context.increment_recv_seq_num();
930 2 : context.perform_shutdown("Logout confirmation received.");
931 : } else {
932 1 : LOG() << "Warning: Received non-Logout message while waiting for Logout confirmation.";
933 : // 根据规范忽略其他消息
934 : }
935 2 : }
936 :
937 0 : void LogoutSentState::onTimerCheck(Session& context) {
938 0 : auto now = std::chrono::steady_clock::now();
939 0 : if (std::chrono::duration_cast<std::chrono::seconds>(now - initiation_time_).count() >=
940 0 : Config::instance().get_int("fix_session", "logout_confirm_timeout_sec", 10)) {
941 0 : context.perform_shutdown("Logout confirmation not received within timeout. Reason: " + reason_);
942 : }
943 0 : }
944 :
945 1 : void LogoutSentState::onLogoutRequest([[maybe_unused]] Session& context, [[maybe_unused]] const std::string& reason) {
946 : // 已在登出过程中,不执行任何操作。
947 1 : }
948 :
949 : // =================================================================================
950 : // 断线恢复相关方法实现
951 : // =================================================================================
952 :
953 2 : void Session::send_resend_request(int begin_seq_no, int end_seq_no) {
954 2 : auto rr_msg = create_resend_request_message(senderCompID, targetCompID, 0, begin_seq_no, end_seq_no);
955 2 : send(rr_msg);
956 2 : LOG() << "Sent ResendRequest: BeginSeqNo=" << begin_seq_no << ", EndSeqNo=" << end_seq_no;
957 2 : }
958 :
959 0 : void Session::send_sequence_reset_gap_fill(int seq_num, int new_seq_no) {
960 0 : auto sr_msg = create_sequence_reset_message(senderCompID, targetCompID, seq_num, new_seq_no, true);
961 :
962 : // 注意:根据 FIX 协议,SequenceReset-GapFill 消息不应设置 PossDupFlag="Y"
963 : // GapFill 是一种特殊消息,用于跳过序列号,而不是重传
964 :
965 : // 直接编码发送,不递增序列号(因为这是重传流程的一部分)
966 0 : std::string raw_msg = codec_.encode(sr_msg);
967 0 : internal_send(raw_msg);
968 :
969 0 : LOG() << "Sent SequenceReset-GapFill: SeqNum=" << seq_num << ", NewSeqNo=" << new_seq_no;
970 0 : }
971 :
972 208 : void Session::save_session_state() {
973 208 : if (!store_) return;
974 :
975 208 : SessionState state;
976 208 : state.senderCompID = senderCompID;
977 208 : state.targetCompID = targetCompID;
978 208 : state.sendSeqNum = sendSeqNum;
979 208 : state.recvSeqNum = recvSeqNum;
980 208 : state.lastUpdateTime = std::chrono::duration_cast<std::chrono::milliseconds>(
981 416 : std::chrono::system_clock::now().time_since_epoch()).count();
982 :
983 208 : if (!store_->saveSessionState(state)) {
984 0 : LOG() << "Warning: Failed to save session state";
985 : }
986 208 : }
987 :
988 205 : bool Session::restore_session_state() {
989 205 : if (!store_) return false;
990 :
991 205 : auto state = store_->loadSessionState(senderCompID, targetCompID);
992 205 : if (!state) {
993 103 : LOG() << "No saved session state found for " << senderCompID << " -> " << targetCompID;
994 103 : return false;
995 : }
996 :
997 102 : sendSeqNum = state->sendSeqNum;
998 102 : recvSeqNum = state->recvSeqNum;
999 :
1000 204 : LOG() << "Restored session state: SendSeqNum=" << sendSeqNum
1001 102 : << ", RecvSeqNum=" << recvSeqNum;
1002 :
1003 102 : return true;
1004 205 : }
1005 :
1006 : } // fix40 名称空间结束
|