FIX 4.0 Demo 1.0
Loading...
Searching...
No Matches
session.hpp
Go to the documentation of this file.
1
12#pragma once
13
14#include <chrono>
15#include <string>
16#include <thread>
17#include <atomic>
18#include <functional>
19#include <memory>
20#include <map>
21
22#include "fix/fix_codec.hpp"
23#include "fix/application.hpp"
25#include "base/timing_wheel.hpp"
26
27namespace fix40 {
28
29class Connection;
30class Session;
31class IStore; // 前向声明
32
42public:
43 virtual ~IStateHandler() = default;
44
50 virtual void onMessageReceived(Session& context, const FixMessage& msg) = 0;
51
58 virtual void onTimerCheck(Session& context) = 0;
59
64 virtual void onSessionStart(Session& context) = 0;
65
71 virtual void onLogoutRequest(Session& context, const std::string& reason) = 0;
72
77 virtual const char* getStateName() const = 0;
78};
79
80
120 class Session : public std::enable_shared_from_this<Session> {
121 public:
123 using ShutdownCallback = std::function<void()>;
124
133 using EstablishedCallback = std::function<void(std::shared_ptr<Session>)>;
134
143 Session(const std::string& sender,
144 const std::string& target,
145 int hb,
146 ShutdownCallback shutdown_cb,
147 IStore* store = nullptr);
148
154 ~Session();
155
160 void set_connection(std::weak_ptr<Connection> conn);
161
172 void set_application(Application* app);
173
179
185
195
203 void notify_established();
204
213 void set_target_comp_id(const std::string& target);
214
222 void send_app_message(FixMessage& msg);
223
230 void start();
231
235 void stop();
236
242 bool is_running() const { return running_; }
243
248 void send(FixMessage& msg);
249
253 void send_buffered_data();
254
258 void handle_write_ready();
259
264 void enqueue_raw_for_send(std::string&& raw_msg);
265
272
277 void on_message_received(const FixMessage& msg);
278
284 void on_timer_check();
285
290 void on_io_error(const std::string& reason);
291
296 void on_shutdown(const std::string& reason);
297
302 void initiate_logout(const std::string& reason);
303
309
314 void changeState(std::unique_ptr<IStateHandler> newState);
315
316 const std::string senderCompID;
317 std::string targetCompID;
319
320 // --- 公共辅助函数(供状态类调用)---
321
326 void send_logout(const std::string& reason);
327
332 void send_heartbeat(const std::string& test_req_id = "");
333
338 void send_test_request(const std::string& id);
339
344 void perform_shutdown(const std::string& reason);
345
348
351
353 std::chrono::steady_clock::time_point get_last_recv_time() const;
354
356 std::chrono::steady_clock::time_point get_last_send_time() const;
357
359 int get_heart_bt_int() const;
360
362 void set_heart_bt_int(int new_hb);
363
365 int get_min_heart_bt_int() const;
366
368 int get_max_heart_bt_int() const;
369
370 // --- 序列号管理 ---
371
373 int get_send_seq_num() { return sendSeqNum; }
374
376 int get_recv_seq_num() { return recvSeqNum; }
377
379 void increment_send_seq_num() { sendSeqNum++; }
380
382 void increment_recv_seq_num() { recvSeqNum++; }
383
385 void set_recv_seq_num(int seq) { recvSeqNum = seq; }
386
395 void set_send_seq_num(int seq) { sendSeqNum = seq; }
396
397 // --- 断线恢复相关 ---
398
404 void send_resend_request(int begin_seq_no, int end_seq_no);
405
411 void send_sequence_reset_gap_fill(int seq_num, int new_seq_no);
412
416 void save_session_state();
417
423
428 IStore* get_store() const { return store_; }
429
434 std::weak_ptr<Connection> get_connection() const { return connection_; }
435
439 bool is_processing_resend() const { return processingResend_; }
440
444 void set_processing_resend(bool processing) { processingResend_ = processing; }
445
446 // --- 客户端身份标识 ---
447
455 const std::string& get_client_comp_id() const { return clientCompID_; }
456
463 void set_client_comp_id(const std::string& clientId) { clientCompID_ = clientId; }
464
465private:
466 std::atomic<bool> shutting_down_{false};
467 std::recursive_mutex state_mutex_;
468 std::unique_ptr<IStateHandler> currentState_;
469
474 void internal_send(const std::string& raw_msg);
475
485 void drain_pending_inbound_locked();
486
487 int heartBtInt;
488 const int minHeartBtInt_;
489 const int maxHeartBtInt_;
490 ShutdownCallback shutdown_callback_;
491 std::weak_ptr<Connection> connection_;
492
494
495 std::atomic<bool> running_{false};
496
497 int sendSeqNum = 1;
498 int recvSeqNum = 1;
499 std::chrono::steady_clock::time_point lastRecv;
500 std::chrono::steady_clock::time_point lastSend;
501
503 std::map<int, FixMessage> pending_inbound_;
505 int last_resend_request_end_ = 0;
506
507 TimingWheel* timing_wheel_ = nullptr;
508 TimerTaskId timer_task_id_ = INVALID_TIMER_ID;
509
510 Application* application_ = nullptr;
511 IStore* store_ = nullptr;
512 bool processingResend_ = false;
513 std::string clientCompID_;
514 EstablishedCallback established_callback_;
515 std::atomic<bool> established_notified_{false};
516 };
517
518 } // namespace fix40
FIX 应用层接口定义
FIX 应用层抽象接口
Definition application.hpp:116
FIX 消息编解码器
Definition fix_codec.hpp:133
FIX 消息的面向对象封装
Definition fix_codec.hpp:46
会话状态处理器接口(状态模式)
Definition session.hpp:41
virtual void onSessionStart(Session &context)=0
处理会话启动事件
virtual void onLogoutRequest(Session &context, const std::string &reason)=0
处理登出请求
virtual const char * getStateName() const =0
获取状态名称
virtual ~IStateHandler()=default
virtual void onMessageReceived(Session &context, const FixMessage &msg)=0
处理收到的消息
virtual void onTimerCheck(Session &context)=0
处理定时器检查事件
存储接口
Definition store.hpp:61
FIX 会话管理器
Definition session.hpp:120
void send_heartbeat(const std::string &test_req_id="")
发送 Heartbeat 消息
Definition session.cpp:501
void set_processing_resend(bool processing)
设置重传处理状态
Definition session.hpp:444
void start()
启动会话
Definition session.cpp:235
bool is_outbound_queue_empty() const
检查发送队列是否为空
int get_recv_seq_num()
获取接收序列号
Definition session.hpp:376
void on_message_received(const FixMessage &msg)
处理收到的消息
Definition session.cpp:269
void send_resend_request(int begin_seq_no, int end_seq_no)
发送 ResendRequest 消息
Definition session.cpp:953
void set_connection(std::weak_ptr< Connection > conn)
关联 Connection 对象
Definition session.cpp:172
const std::string & get_client_comp_id() const
获取客户端标识
Definition session.hpp:455
Application * get_application() const
获取应用层处理器
Definition session.cpp:180
void save_session_state()
保存会话状态到存储
Definition session.cpp:972
void schedule_timer_tasks(TimingWheel *wheel)
调度周期性定时任务
Definition session.cpp:474
std::function< void()> ShutdownCallback
会话关闭回调类型
Definition session.hpp:123
bool is_processing_resend() const
检查是否正在处理重传请求
Definition session.hpp:439
bool restore_session_state()
从存储恢复会话状态
Definition session.cpp:988
std::string targetCompID
接收方 CompID(服务端可在 Logon 后更新为真实客户端 CompID)
Definition session.hpp:317
void set_client_comp_id(const std::string &clientId)
设置客户端标识
Definition session.hpp:463
void set_target_comp_id(const std::string &target)
更新对端 CompID
Definition session.cpp:212
void send_buffered_data()
发送缓冲区中的数据
Definition session.cpp:256
void send_sequence_reset_gap_fill(int seq_num, int new_seq_no)
发送 SequenceReset-GapFill 消息
Definition session.cpp:959
void set_heart_bt_int(int new_hb)
设置心跳间隔
Definition session.cpp:467
void perform_shutdown(const std::string &reason)
执行会话关闭
Definition session.cpp:432
const std::string senderCompID
发送方 CompID
Definition session.hpp:316
void send_test_request(const std::string &id)
发送 TestRequest 消息
Definition session.cpp:506
void send_app_message(FixMessage &msg)
Definition session.cpp:217
bool is_running() const
检查会话是否正在运行
Definition session.hpp:242
void set_application(Application *app)
设置应用层处理器
Definition session.cpp:176
void stop()
停止会话
Definition session.cpp:243
void send(FixMessage &msg)
发送 FIX 消息
Definition session.cpp:394
SessionID get_session_id() const
获取会话标识符
Definition session.cpp:184
std::chrono::steady_clock::time_point get_last_send_time() const
获取最后发送时间
Definition session.cpp:465
IStore * get_store() const
获取存储接口
Definition session.hpp:428
FixCodec codec_
FIX 编解码器
Definition session.hpp:318
std::function< void(std::shared_ptr< Session >)> EstablishedCallback
会话建立回调类型
Definition session.hpp:133
void enqueue_raw_for_send(std::string &&raw_msg)
将原始消息加入发送队列
void update_last_recv_time()
更新最后接收时间
Definition session.cpp:462
void on_shutdown(const std::string &reason)
处理连接关闭
Definition session.cpp:384
~Session()
析构函数
Definition session.cpp:164
void notify_established()
通知会话已建立
Definition session.cpp:193
std::weak_ptr< Connection > get_connection() const
获取连接对象
Definition session.hpp:434
int get_max_heart_bt_int() const
获取最大允许心跳间隔
Definition session.cpp:469
void set_recv_seq_num(int seq)
设置接收序列号
Definition session.hpp:385
std::chrono::steady_clock::time_point get_last_recv_time() const
获取最后接收时间
Definition session.cpp:464
void increment_recv_seq_num()
递增接收序列号
Definition session.hpp:382
void handle_write_ready()
处理写就绪事件
Definition session.cpp:262
void send_logout(const std::string &reason)
发送 Logout 消息
Definition session.cpp:496
int get_min_heart_bt_int() const
获取最小允许心跳间隔
Definition session.cpp:468
void on_timer_check()
处理定时器检查
Definition session.cpp:374
void increment_send_seq_num()
递增发送序列号
Definition session.hpp:379
void set_send_seq_num(int seq)
设置发送序列号
Definition session.hpp:395
void initiate_logout(const std::string &reason)
发起优雅登出流程
Definition session.cpp:388
void update_last_send_time()
更新最后发送时间
Definition session.cpp:463
void on_io_error(const std::string &reason)
处理 I/O 错误
Definition session.cpp:380
int get_send_seq_num()
获取发送序列号
Definition session.hpp:373
void set_established_callback(EstablishedCallback cb)
设置会话建立回调
Definition session.cpp:188
int get_heart_bt_int() const
获取心跳间隔(秒)
Definition session.cpp:466
void changeState(std::unique_ptr< IStateHandler > newState)
切换会话状态
Definition session.cpp:247
支持周期性任务的时间轮定时器
Definition timing_wheel.hpp:67
Definition concurrentqueue.h:768
FIX 消息编解码器
Definition matching_engine.hpp:23
uint64_t TimerTaskId
定时任务唯一标识符类型
Definition timing_wheel.hpp:25
constexpr TimerTaskId INVALID_TIMER_ID
无效的定时任务 ID
Definition timing_wheel.hpp:30
FIX 会话标识符
Definition application.hpp:28
时间轮定时器实现