LCOV - code coverage report
Current view: top level - src/core - connection.cpp (source / functions) Coverage Total Hit
Test: coverage.info Lines: 0.0 % 107 0
Test Date: 2025-12-19 03:13:09 Functions: 0.0 % 12 0

            Line data    Source code
       1              : /**
       2              :  * @file connection.cpp
       3              :  * @brief Connection 类实现
       4              :  */
       5              : 
       6              : #include "core/connection.hpp"
       7              : #include "fix/session.hpp"
       8              : #include "core/reactor.hpp"
       9              : #include "base/thread_pool.hpp"
      10              : #include "base/config.hpp"
      11              : #include "base/logger.hpp"
      12              : 
      13              : #include <unistd.h>
      14              : #include <cerrno>
      15              : #include <cstring>
      16              : #include <sys/socket.h>
      17              : #include <vector>
      18              : 
      19              : namespace fix40 {
      20              : 
      21            0 : Connection::Connection(int fd, Reactor* reactor, std::shared_ptr<Session> session,
      22            0 :                        ThreadPool* thread_pool, size_t thread_index)
      23            0 :     : fd_(fd),
      24            0 :       reactor_(reactor),
      25            0 :       session_(std::move(session)),
      26            0 :       thread_pool_(thread_pool),
      27            0 :       thread_index_(thread_index),
      28            0 :       frame_decoder_(
      29            0 :           Config::instance().get_int("protocol", "max_buffer_size", 1048576),
      30            0 :           Config::instance().get_int("protocol", "max_body_length", 4096)
      31            0 :       ) {
      32            0 :     LOG() << "Connection created for fd " << fd_ << ", bindded to thread " << thread_index_;
      33            0 : }
      34              : 
      35            0 : Connection::~Connection() {
      36            0 :     LOG() << "Connection destroyed for fd " << fd_;
      37            0 :     close_fd();
      38            0 : }
      39              : 
      40            0 : void Connection::handle_read() {
      41            0 :     if (is_closed_) return;
      42              : 
      43            0 :     std::vector<char> read_buf(Config::instance().get_int("protocol", "max_buffer_size", 4096));
      44            0 :     ssize_t bytes_read = 0;
      45            0 :     bool buffer_overflow = false;
      46              : 
      47              :     // ET 模式需要一直读到 EAGAIN
      48              :     while (true) {
      49            0 :         bytes_read = ::read(fd_, read_buf.data(), read_buf.size());
      50            0 :         if (bytes_read > 0) {
      51              :             // 在 append 前检查是否会溢出,避免抛异常
      52            0 :             if (!frame_decoder_.can_append(bytes_read)) {
      53            0 :                 buffer_overflow = true;
      54            0 :                 break;
      55              :             }
      56            0 :             frame_decoder_.append(read_buf.data(), bytes_read);
      57              :         } else {
      58            0 :             break;
      59              :         }
      60              :     }
      61              : 
      62            0 :     if (buffer_overflow) {
      63            0 :         session_->on_io_error("Buffer overflow: client sending too much data.");
      64            0 :         return;
      65              :     }
      66              : 
      67            0 :     if (bytes_read == 0) {
      68            0 :         session_->on_shutdown("Connection closed by peer.");
      69            0 :         return;
      70              :     }
      71              : 
      72            0 :     if (bytes_read < 0) {
      73            0 :         if (errno != EAGAIN && errno != EWOULDBLOCK) {
      74            0 :             session_->on_io_error("Socket read error.");
      75            0 :             return;
      76              :         }
      77              :     }
      78              : 
      79              :     try {
      80            0 :         std::string raw_msg;
      81            0 :         while (frame_decoder_.next_message(raw_msg)) {
      82            0 :             FixMessage fix_msg = session_->codec_.decode(raw_msg);
      83            0 :             LOG() << "<<< RECV (" << fd_ << "): " << raw_msg;
      84            0 :             session_->on_message_received(fix_msg);
      85            0 :         }
      86            0 :     } catch (const std::exception& e) {
      87            0 :         session_->on_io_error("Frame decoder or parser error: " + std::string(e.what()));
      88            0 :     }
      89            0 : }
      90              : 
      91            0 : void Connection::handle_write() {
      92              :     // ET 模式下需要循环发送直到 EAGAIN
      93            0 :     while (!write_buffer_.empty()) {
      94            0 :         ssize_t sent = ::send(fd_, write_buffer_.c_str(), write_buffer_.length(), 0);
      95              : 
      96            0 :         if (sent > 0) {
      97            0 :             write_buffer_.erase(0, sent);
      98            0 :         } else if (sent == 0) {
      99              :             // 发送了 0 字节,不太常见,退出循环等待下次事件
     100            0 :             break;
     101              :         } else {
     102              :             // sent < 0
     103            0 :             if (errno == EAGAIN || errno == EWOULDBLOCK) {
     104              :                 // 内核缓冲区满,等待下次写事件
     105              :                 break;
     106              :             } else {
     107            0 :                 session_->on_io_error("Socket write error.");
     108            0 :                 return;
     109              :             }
     110              :         }
     111              :     }
     112              : 
     113              :     // 如果缓冲区已清空,取消写事件监听
     114            0 :     if (write_buffer_.empty()) {
     115            0 :         reactor_->modify_fd(fd_, static_cast<uint32_t>(EventType::READ), nullptr);
     116              :     }
     117              : }
     118              : 
     119            0 : void Connection::send(std::string_view data) {
     120            0 :     if (is_closed_) return;
     121              : 
     122              :     // 将发送操作派发到绑定的线程执行
     123            0 :     std::string data_copy(data);
     124            0 :     dispatch([this, data_copy = std::move(data_copy)]() {
     125            0 :         do_send(data_copy);
     126            0 :     });
     127            0 : }
     128              : 
     129            0 : void Connection::do_send(const std::string& data) {
     130            0 :     if (is_closed_) return;
     131              : 
     132              :     // 现在在绑定的线程中,不需要锁
     133            0 :     if (write_buffer_.empty()) {
     134            0 :         ssize_t sent = ::send(fd_, data.data(), data.length(), 0);
     135            0 :         if (sent >= 0) {
     136            0 :             if (static_cast<size_t>(sent) < data.length()) {
     137            0 :                 write_buffer_.append(data.substr(sent));
     138              :             } else {
     139            0 :                 return;  // 全部发送完成
     140              :             }
     141              :         } else {
     142            0 :             if (errno == EAGAIN || errno == EWOULDBLOCK) {
     143            0 :                 write_buffer_.append(data);
     144              :             } else {
     145            0 :                 session_->on_io_error("Initial send error");
     146            0 :                 return;
     147              :             }
     148              :         }
     149              :     } else {
     150            0 :         write_buffer_.append(data);
     151              :     }
     152              : 
     153              :     // 有待发送数据,注册写事件
     154            0 :     if (!write_buffer_.empty()) {
     155              :         // 注意:这里需要捕获 weak_ptr 避免循环引用
     156            0 :         std::weak_ptr<Connection> weak_self = shared_from_this();
     157            0 :         reactor_->modify_fd(fd_, 
     158              :             static_cast<uint32_t>(EventType::READ) | static_cast<uint32_t>(EventType::WRITE),
     159            0 :             [weak_self](int) {
     160            0 :                 if (auto self = weak_self.lock()) {
     161              :                     // 写事件也派发到绑定线程
     162            0 :                     self->dispatch([self]() {
     163            0 :                         self->handle_write();
     164            0 :                     });
     165            0 :                 }
     166            0 :             });
     167            0 :     }
     168              : }
     169              : 
     170            0 : void Connection::dispatch(std::function<void()> task) {
     171            0 :     if (thread_pool_) {
     172            0 :         thread_pool_->enqueue_to(thread_index_, std::move(task));
     173              :     }
     174            0 : }
     175              : 
     176            0 : void Connection::shutdown() {
     177              :     // 确保只执行一次
     178            0 :     if (is_closed_.exchange(true)) {
     179            0 :         return;
     180              :     }
     181              : 
     182            0 :     LOG() << "Shutting down connection for fd " << fd_;
     183              : 
     184              :     // 先从 epoll 移除,再关闭 fd
     185              :     // 注意:remove_fd 是同步执行的(虽然通过任务队列,但我们需要等它完成)
     186              :     // 这里直接关闭 fd,epoll 会自动移除已关闭的 fd(Linux 特性)
     187            0 :     if (reactor_) {
     188            0 :         reactor_->remove_fd(fd_);
     189              :     }
     190              : 
     191            0 :     ::shutdown(fd_, SHUT_RDWR);
     192            0 :     ::close(fd_);
     193              : }
     194              : 
     195            0 : void Connection::close_fd() {
     196              :     // 统一由 shutdown() 处理关闭逻辑
     197            0 :     shutdown();
     198            0 : }
     199              : 
     200              : } // namespace fix40
        

Generated by: LCOV version 2.0-1