Line data Source code
1 : /**
2 : * @file connection.hpp
3 : * @brief TCP 连接管理类
4 : *
5 : * 封装单个 TCP 连接的 I/O 操作,实现连接绑定线程模型,
6 : * 确保同一连接的所有操作在同一线程串行执行。
7 : */
8 :
9 : #pragma once
10 :
11 : #include <memory>
12 : #include <string>
13 : #include <string_view>
14 : #include <functional>
15 : #include <atomic>
16 : #include "fix/fix_frame_decoder.hpp"
17 :
18 : namespace fix40 {
19 :
20 : // 前置声明
21 : class Session;
22 : class Reactor;
23 : class ThreadPool;
24 :
25 : /**
26 : * @class Connection
27 : * @brief TCP 连接管理类
28 : *
29 : * 负责管理单个客户端连接的 I/O 操作,包括:
30 : * - 非阻塞读取(ET 模式)
31 : * - 带缓冲的非阻塞写入
32 : * - FIX 消息帧解码
33 : * - 连接生命周期管理
34 : *
35 : * @par 线程模型
36 : * 每个连接绑定到一个固定的工作线程,所有操作(读、写、定时任务)
37 : * 都在该线程中串行执行,避免锁竞争。
38 : *
39 : * @par 数据流
40 : * @code
41 : * 接收: socket -> handle_read() -> FixFrameDecoder -> Session::on_message_received()
42 : * 发送: Session::send() -> Connection::send() -> do_send() -> socket
43 : * @endcode
44 : *
45 : * @note 该类继承 std::enable_shared_from_this,必须通过 std::shared_ptr 管理
46 : */
47 : class Connection : public std::enable_shared_from_this<Connection> {
48 : public:
49 : /// 读缓冲区最大大小(1 MB)
50 : static constexpr size_t kMaxReadBufferSize = 1 * 1024 * 1024;
51 :
52 : /**
53 : * @brief 构造连接对象
54 : * @param fd 已连接的 socket 文件描述符(应已设置为非阻塞)
55 : * @param reactor Reactor 指针,用于注册/修改 I/O 事件
56 : * @param session 关联的 FIX 会话对象
57 : * @param thread_pool 线程池指针,用于派发任务
58 : * @param thread_index 绑定的工作线程索引
59 : */
60 : Connection(int fd, Reactor* reactor, std::shared_ptr<Session> session,
61 : ThreadPool* thread_pool, size_t thread_index);
62 :
63 : /**
64 : * @brief 析构函数
65 : *
66 : * 自动关闭连接并释放资源。
67 : */
68 : ~Connection();
69 :
70 : /**
71 : * @brief 处理读事件
72 : *
73 : * 在 ET(边缘触发)模式下循环读取数据直到 EAGAIN,
74 : * 将数据送入帧解码器,解析出完整的 FIX 消息后交给 Session 处理。
75 : *
76 : * @note 必须在绑定的工作线程中调用
77 : *
78 : * @par 错误处理
79 : * - 缓冲区溢出:通知 Session I/O 错误
80 : * - 对端关闭:通知 Session 连接关闭
81 : * - 解码错误:通知 Session I/O 错误
82 : */
83 : void handle_read();
84 :
85 : /**
86 : * @brief 处理写事件
87 : *
88 : * 在 ET 模式下循环发送写缓冲区中的数据直到 EAGAIN 或发送完毕。
89 : * 发送完毕后取消写事件监听。
90 : *
91 : * @note 必须在绑定的工作线程中调用
92 : */
93 : void handle_write();
94 :
95 : /**
96 : * @brief 发送数据
97 : * @param data 要发送的数据
98 : *
99 : * 可以从任意线程调用,内部会将发送操作派发到绑定的工作线程执行。
100 : * 如果连接已关闭,调用无效。
101 : */
102 : void send(std::string_view data);
103 :
104 : /**
105 : * @brief 派发任务到绑定的工作线程执行
106 : * @param task 要执行的任务
107 : *
108 : * 确保任务在连接绑定的线程中串行执行,避免竞态条件。
109 : */
110 : void dispatch(std::function<void()> task);
111 :
112 : /**
113 : * @brief 关闭连接
114 : *
115 : * 从 Reactor 移除 fd,关闭 socket。
116 : * 该方法是幂等的,多次调用安全。
117 : */
118 : void shutdown();
119 :
120 : /**
121 : * @brief 关闭文件描述符
122 : *
123 : * 内部调用 shutdown(),保持接口一致性。
124 : */
125 : void close_fd();
126 :
127 : /**
128 : * @brief 获取 socket 文件描述符
129 : * @return int 文件描述符
130 : */
131 0 : int fd() const { return fd_; }
132 :
133 : /**
134 : * @brief 获取绑定的线程索引
135 : * @return size_t 线程索引
136 : */
137 : size_t thread_index() const { return thread_index_; }
138 :
139 : /**
140 : * @brief 获取关联的 Session 对象
141 : * @return std::shared_ptr<Session> Session 指针
142 : */
143 : std::shared_ptr<Session> session() const { return session_; }
144 :
145 : private:
146 : /**
147 : * @brief 内部发送实现
148 : * @param data 要发送的数据
149 : *
150 : * 在绑定的工作线程中执行,尝试直接发送,
151 : * 发送不完则缓存并注册写事件。
152 : */
153 : void do_send(const std::string& data);
154 :
155 : const int fd_; ///< socket 文件描述符
156 : Reactor* reactor_; ///< Reactor 指针
157 : std::shared_ptr<Session> session_; ///< 关联的 FIX 会话
158 : ThreadPool* thread_pool_; ///< 线程池指针
159 : const size_t thread_index_; ///< 绑定的工作线程索引
160 : std::atomic<bool> is_closed_{false}; ///< 连接关闭标志
161 :
162 : FixFrameDecoder frame_decoder_; ///< FIX 消息帧解码器
163 : std::string write_buffer_; ///< 写缓冲区
164 : // 注意:移除了 write_mutex_,因为同一连接的所有操作都在同一线程
165 : };
166 :
167 : } // namespace fix40
|