Line data Source code
1 : /**
2 : * @file connection.cpp
3 : * @brief Connection 类实现
4 : */
5 :
6 : #include "core/connection.hpp"
7 : #include "fix/session.hpp"
8 : #include "core/reactor.hpp"
9 : #include "base/thread_pool.hpp"
10 : #include "base/config.hpp"
11 : #include "base/logger.hpp"
12 :
13 : #include <unistd.h>
14 : #include <cerrno>
15 : #include <cstring>
16 : #include <sys/socket.h>
17 : #include <vector>
18 :
19 : namespace fix40 {
20 :
21 0 : Connection::Connection(int fd, Reactor* reactor, std::shared_ptr<Session> session,
22 0 : ThreadPool* thread_pool, size_t thread_index)
23 0 : : fd_(fd),
24 0 : reactor_(reactor),
25 0 : session_(std::move(session)),
26 0 : thread_pool_(thread_pool),
27 0 : thread_index_(thread_index),
28 0 : frame_decoder_(
29 0 : Config::instance().get_int("protocol", "max_buffer_size", 1048576),
30 0 : Config::instance().get_int("protocol", "max_body_length", 4096)
31 0 : ) {
32 0 : LOG() << "Connection created for fd " << fd_ << ", bindded to thread " << thread_index_;
33 0 : }
34 :
35 0 : Connection::~Connection() {
36 0 : LOG() << "Connection destroyed for fd " << fd_;
37 0 : close_fd();
38 0 : }
39 :
40 0 : void Connection::handle_read() {
41 0 : if (is_closed_) return;
42 :
43 0 : std::vector<char> read_buf(Config::instance().get_int("protocol", "max_buffer_size", 4096));
44 0 : ssize_t bytes_read = 0;
45 0 : bool buffer_overflow = false;
46 :
47 : // ET 模式需要一直读到 EAGAIN
48 : while (true) {
49 0 : bytes_read = ::read(fd_, read_buf.data(), read_buf.size());
50 0 : if (bytes_read > 0) {
51 : // 在 append 前检查是否会溢出,避免抛异常
52 0 : if (!frame_decoder_.can_append(bytes_read)) {
53 0 : buffer_overflow = true;
54 0 : break;
55 : }
56 0 : frame_decoder_.append(read_buf.data(), bytes_read);
57 : } else {
58 0 : break;
59 : }
60 : }
61 :
62 0 : if (buffer_overflow) {
63 0 : session_->on_io_error("Buffer overflow: client sending too much data.");
64 0 : return;
65 : }
66 :
67 0 : if (bytes_read == 0) {
68 0 : session_->on_shutdown("Connection closed by peer.");
69 0 : return;
70 : }
71 :
72 0 : if (bytes_read < 0) {
73 0 : if (errno != EAGAIN && errno != EWOULDBLOCK) {
74 0 : session_->on_io_error("Socket read error.");
75 0 : return;
76 : }
77 : }
78 :
79 : try {
80 0 : std::string raw_msg;
81 0 : while (frame_decoder_.next_message(raw_msg)) {
82 0 : FixMessage fix_msg = session_->codec_.decode(raw_msg);
83 0 : LOG() << "<<< RECV (" << fd_ << "): " << raw_msg;
84 0 : session_->on_message_received(fix_msg);
85 0 : }
86 0 : } catch (const std::exception& e) {
87 0 : session_->on_io_error("Frame decoder or parser error: " + std::string(e.what()));
88 0 : }
89 0 : }
90 :
91 0 : void Connection::handle_write() {
92 : // ET 模式下需要循环发送直到 EAGAIN
93 0 : while (!write_buffer_.empty()) {
94 0 : ssize_t sent = ::send(fd_, write_buffer_.c_str(), write_buffer_.length(), 0);
95 :
96 0 : if (sent > 0) {
97 0 : write_buffer_.erase(0, sent);
98 0 : } else if (sent == 0) {
99 : // 发送了 0 字节,不太常见,退出循环等待下次事件
100 0 : break;
101 : } else {
102 : // sent < 0
103 0 : if (errno == EAGAIN || errno == EWOULDBLOCK) {
104 : // 内核缓冲区满,等待下次写事件
105 : break;
106 : } else {
107 0 : session_->on_io_error("Socket write error.");
108 0 : return;
109 : }
110 : }
111 : }
112 :
113 : // 如果缓冲区已清空,取消写事件监听
114 0 : if (write_buffer_.empty()) {
115 0 : reactor_->modify_fd(fd_, static_cast<uint32_t>(EventType::READ), nullptr);
116 : }
117 : }
118 :
119 0 : void Connection::send(std::string_view data) {
120 0 : if (is_closed_) return;
121 :
122 : // 将发送操作派发到绑定的线程执行
123 0 : std::string data_copy(data);
124 0 : dispatch([this, data_copy = std::move(data_copy)]() {
125 0 : do_send(data_copy);
126 0 : });
127 0 : }
128 :
129 0 : void Connection::do_send(const std::string& data) {
130 0 : if (is_closed_) return;
131 :
132 : // 现在在绑定的线程中,不需要锁
133 0 : if (write_buffer_.empty()) {
134 0 : ssize_t sent = ::send(fd_, data.data(), data.length(), 0);
135 0 : if (sent >= 0) {
136 0 : if (static_cast<size_t>(sent) < data.length()) {
137 0 : write_buffer_.append(data.substr(sent));
138 : } else {
139 0 : return; // 全部发送完成
140 : }
141 : } else {
142 0 : if (errno == EAGAIN || errno == EWOULDBLOCK) {
143 0 : write_buffer_.append(data);
144 : } else {
145 0 : session_->on_io_error("Initial send error");
146 0 : return;
147 : }
148 : }
149 : } else {
150 0 : write_buffer_.append(data);
151 : }
152 :
153 : // 有待发送数据,注册写事件
154 0 : if (!write_buffer_.empty()) {
155 : // 注意:这里需要捕获 weak_ptr 避免循环引用
156 0 : std::weak_ptr<Connection> weak_self = shared_from_this();
157 0 : reactor_->modify_fd(fd_,
158 : static_cast<uint32_t>(EventType::READ) | static_cast<uint32_t>(EventType::WRITE),
159 0 : [weak_self](int) {
160 0 : if (auto self = weak_self.lock()) {
161 : // 写事件也派发到绑定线程
162 0 : self->dispatch([self]() {
163 0 : self->handle_write();
164 0 : });
165 0 : }
166 0 : });
167 0 : }
168 : }
169 :
170 0 : void Connection::dispatch(std::function<void()> task) {
171 0 : if (thread_pool_) {
172 0 : thread_pool_->enqueue_to(thread_index_, std::move(task));
173 : }
174 0 : }
175 :
176 0 : void Connection::shutdown() {
177 : // 确保只执行一次
178 0 : if (is_closed_.exchange(true)) {
179 0 : return;
180 : }
181 :
182 0 : LOG() << "Shutting down connection for fd " << fd_;
183 :
184 : // 先从 epoll 移除,再关闭 fd
185 : // 注意:remove_fd 是同步执行的(虽然通过任务队列,但我们需要等它完成)
186 : // 这里直接关闭 fd,epoll 会自动移除已关闭的 fd(Linux 特性)
187 0 : if (reactor_) {
188 0 : reactor_->remove_fd(fd_);
189 : }
190 :
191 0 : ::shutdown(fd_, SHUT_RDWR);
192 0 : ::close(fd_);
193 : }
194 :
195 0 : void Connection::close_fd() {
196 : // 统一由 shutdown() 处理关闭逻辑
197 0 : shutdown();
198 0 : }
199 :
200 : } // namespace fix40
|