TcpConnection是muduo 网络库核心的 TCP 连接类,封装了一个完整的 TCP 客户端 - 服务端连接,是网络通信中处理数据读写、连接状态、事件回调的核心组件,我会逐行、分模块给你讲透。
核心定位
- 对应一个已建立的 TCP socket 连接,生命周期由智能指针管理
- 运行在 subLoop(IO 线程) 中,不与主线程耦合
- 封装 socket、channel、缓冲区,对外提供简洁的连接操作接口
- 处理读、写、关闭、错误四种 IO 事件
TcpConnection.h
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| #pragma once
#include "Buffer.h" #include "InetAddress.h" #include "Callbacks.h" #include "Timestamp.h" #include "noncopyable.h"
#include <string> #include <atomic> #include <memory>
class EventLoop; class Socket; class Channel;
|
1
| class TcpConnection : noncopyable, public std::enable_shared_from_this<TcpConnection>
|
私有成员
1 2 3 4 5 6 7 8
| enum stateE { kDisconnected, kDisconnecting, kConnected, kConnecting }; std::atomic<int> state_;
|
用原子类型保证多线程下状态修改不冲突
状态机管理连接生命周期,避免非法操作
1 2 3 4 5 6 7 8 9
| EventLoop* loop_; std::string name_; bool reading_;
std::unique_ptr<Socket> socket_; std::unique_ptr<Channel> channel_;
const InetAddress localAddr_; const InetAddress peerAddr_;
|
- 一个连接持有唯一
socket和channel,所以使用unique_ptr
- 存储本地和对端的地址
1 2 3 4 5 6
| ConnectionCallback connectionCallback_; MessageCallback messageCallback_; WriteCompleteCallback writeCompleteCallback_; HighWaterMarkCallback highWaterMarkCallback_; CloseCallback closeCallback_; size_t highWaterMark_;
|
网络层只负责 IO,业务逻辑通过回调函数注入,实现解耦
1 2
| Buffer inputBuffer_; Buffer outputBuffer_;
|
读写双缓冲区
内部函数
1 2 3 4
| void handleRead(Timestamp receiveTime); void handleWrite(); void handleClose(); void handleError();
|
事件处理函数,channel绑定的函数,channel 监听到事件后,自动调用这些函数
1
| void setState(stateE state) { state_ = state; }
|
设置状态
1 2
| void sendInLoop(const void* message, size_t len); void shutdownInLoop();
|
外部调用send/shutdown,最终会执行这两个函数
公有函数(对外接口)
1 2 3 4 5 6
| TcpConnection(EventLoop* loop, const std::string& nameArg, int sockfd, const InetAddress& localAddr, const InetAddress& peerAddr); ~TcpConnection();
|
1 2 3 4 5
| EventLoop* getLoop() const { return loop_; } const std::string& name() const { return name_; } const InetAddress& localAddr() const { return localAddr_; } const InetAddress& peerAddr() const { return peerAddr_; } bool connected() const { return state_ == kConnected; }
|
只读获取信息
1 2 3 4
| void send(const std::string& buf); void shutdown();
|
分别配合sendInLoop、shutdownInLoop实现跨线程安全的操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| void setConnectionCallback(const ConnectionCallback& cb) { connectionCallback_ = cb; }
void setMessageCallback(const MessageCallback& cb) { messageCallback_ = cb; }
void setWriteCompleteCallback(const WriteCompleteCallback& cb) { writeCompleteCallback_ = cb; }
void setHighWaterMarkCallback(const HighWaterMarkCallback& cb, size_t highWaterMark) { highWaterMarkCallback_ = cb; highWaterMark_ = highWaterMark; }
void setCloseCallback(const CloseCallback& cb) { closeCallback_ = cb; }
|
业务层通过这些接口注册自己的处理逻辑
1 2 3 4
| void connectEstablish();
void connectDestroy();
|
TcpConnection对象创建后,要立即调用connectEstablish,在这个函数里会进行一系列操作让连接 “活过来” ,如:保证连接的生命周期
TcpConnection.cc
1 2 3 4 5
| #include "Logger.h" #include "TcpConnection.h" #include "Channel.h" #include "Socket.h" #include "EventLoop.h"
|
1 2 3 4 5 6 7 8
| static EventLoop* CheckLoopNotNull(EventLoop *loop) { if (loop == nullptr) { LOG_FATAL("{}:{}:{} TcpConnection Loop is null! \n", __FILE__, __FUNCTION__, __LINE__); } return loop; }
|
空指针安全检查
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| TcpConnection::TcpConnection(EventLoop* loop, const std::string& nameArg, int sockfd, const InetAddress& localAddr, const InetAddress& peerAddr) : loop_(CheckLoopNotNull(loop)) , name_(nameArg) , state_(kConnecting) , reading_(true) , socket_(new Socket(sockfd)) , channel_(new Channel(loop, sockfd)) , localAddr_(localAddr) , peerAddr_(peerAddr) { channel_->setReadCallback( std::bind(&TcpConnection::handleRead, this, std::placeholders::_1) ); channel_->setWriteCallback( std::bind(&TcpConnection::handleWrite, this) ); channel_->setCloseCallback( std::bind(&TcpConnection::handleClose, this) ); channel_->setErrorCallback( std::bind(&TcpConnection::handleError, this) ); LOG_INFO("TcpConnection::ctor[{}] at fd={}", name_, sockfd); socket_->setKeepAlive(true); }
|
- 设置回调,this->指向当前
TcpConnection 对象
- 打印连接创建信息,开启 TCP-Alive 保活机制
1 2 3 4 5
| TcpConnection::~TcpConnection() { LOG_INFO("TcpConnection::dtor[{}] at fd={} state={}", name_, channel_->fd(), (int)state_); }
|
析构函数,打印信息,dtor= destructor 的缩写
1 2 3 4 5 6 7 8 9 10
| void TcpConnection::connectEstablished() { setState(kConnected); channel_->tie(shared_from_this()); channel_->enableReading();
connectionCallback_(shared_from_this()); }
|
channel_->tie(shared_from_this());:把 TcpConnection 自身的 shared_ptr 绑定到 Channel,让 Channel 持有一个 weak_ptr 指向当前连接。事件触发时:Channel 先 lock() 一下,如果连接还活着 → 处理事件,如果连接已经销毁 → 直接跳过,不访问野指针
1 2 3 4 5 6 7 8 9 10
| void TcpConnection::connectDestroy() { if(state_ == kConnected) { setState(kDisconnected); channel_->disableAll(); connectionCallback_(shared_from_this()); } channel_->remove(); }
|
- 让这个 TCP 连接彻底 “停止工作”,不再接收任何事件
- 这个函数执行完,连接并没有真正销毁,这个函数只是:关闭事件、修改状态、取消监听
- 真正的销毁时机是:当最后一个
shared_ptr<TcpConnection> 被释放时
- 整个关闭流程的最后一环
connectDestroyed 执行
- 即使还有旧回调在队列里
- 调用 lock()
- 发现连接已销毁 → 返回
nullptr
- 不执行,直接跳过,不崩溃
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| void TcpConnection::shutdown() { if(state_ == kConnected) { setState(kDisconnecting); loop_->runInLoop(std::bind(&TcpConnection::shutdownInLoop, this)); } }
void TcpConnection::shutdownInLoop() { if(!channel_->isWriting()) { socket_->shutdownWrite(); } }
|
shutdown:设置正在关闭连接的状态,用 runInLoop 将真正关闭函数(shutdownInLoop)跑到 IO 线程执行。
shutdownInLoop:判断 !isWriting(),如果还在发送数据(outputBuffer 还有内容),不能关闭,要等发送完成后,才能关闭
正在发数据时会因isWriting()为true先跳过此次函数(shutdownInLoop),handleWrite逻辑中,发完数据后会检查 state_ == kDisconnecting,将自动调用shutdownInLoop。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| void TcpConnection::send(const std::string& buf) { if(state_ == kConnected) { if(loop_->isInLoopThread()) { sendInLoop(buf.c_str(), buf.size()); } else { loop_->queueInLoop(std::bind(&TcpConnection::sendInLoop, this, buf.c_str(), buf.size())); } } }
|
state_ == kConnected只有已连接状态才能发送数据
loop_->isInLoopThread()判断当前调用 send 的线程是不是这个连接所属的 IO 线程
- 是 IO 线程 → 直接
sendInLoop
- 不是 IO 线程 →
queueInLoop,把发送任务打包丢给 IO 线程,保证 sendInLoop 一定在正确的线程运行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
| void TcpConnection::sendInLoop(const void* message, size_t len) { size_t nwrote = 0; size_t remaining = len; bool faultError = false;
if(state_ == kDisconnected) { LOG_ERROR("Disconnected, stop writing"); return; }
if(!channel_->isWriting() && outputBuffer_.readableBytes() == 0) { nwrote = ::write(channel_->fd(), message, len); if(nwrote >= 0) { remaining = len - nwrote; if(remaining == 0 && writeCompleteCallback_) { loop_->queueInLoop(std::bind(writeCompleteCallback_, shared_from_this())); } } else { nwrote = 0; if (errno != EWOULDBLOCK) { LOG_ERROR("TcpConnection::sendInLoop"); if (errno == EPIPE || errno == ECONNRESET) { faultError = true; } } } }
if (!faultError && remaining > 0) { size_t oldLen = outputBuffer_.readableBytes(); if(oldLen + remaining >= highWaterMark_ && oldLen < highWaterMark_ && highWaterMarkCallback_) { loop_->queueInLoop( std::bind(highWaterMarkCallback_, shared_from_this(), oldLen + remaining) ); } outputBuffer_.append((char*)message + nwrote, remaining); if(!channel_->isWriting()) { channel_->enableWriting(); } } }
|
核心逻辑:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| void TcpConnection::handleRead(Timestamp receiveTime) { int savedErrno = 0; ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno); if(n > 0) { messageCallback_(shared_from_this(), &inputBuffer_, receiveTime); } else if(n == 0) { handleClose(); } else { errno = savedErrno; LOG_ERROR("TcpConnection::handleRead"); handleError(); } }
|
客户端发消息过来时真正处理读操作的函数
调用缓冲区类实现的读函数,将数据读到接收缓冲区
读取成功(n > 0):把数据交给上层业务(messageCallback_回调)
读到 0:对端关闭连接,调用handleClose()
读取出错:把错误码恢复回去,打印日志,调用 handleError() 处理异常关闭
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| void TcpConnection::handleWrite() { if(channel_->isWriting()) { int savedErrno = 0; ssize_t n = outputBuffer_.writeFd(channel_->fd(), &savedErrno); if(n > 0) { outputBuffer_.retrieve(n); if(outputBuffer_.readableBytes() == 0) { channel_->disableWriting(); if(writeCompleteCallback_) { loop_->queueInLoop(std::bind(writeCompleteCallback_, shared_from_this())); }
if(state_ == kDisconnecting) { shutdownInLoop(); } } } else { LOG_ERROR("TcpConnection::handleWrite"); } } else { LOG_ERROR("TcpConnection fd={} is down, no more writing ", channel_->fd()); } }
|
先检查当前 channel 是否注册了写事件(isWriting)
调用缓冲区的writeFd(),从 outputBuffer_ 拿数据
读取成功(n > 0):
retrieve(n):缓冲区里前 n 字节已经发走了,把这部分数据扔掉,剩下的留在缓冲区下次发
readableBytes() == 0:数据全部发完了
channel_->disableWriting()发送缓冲区空了,不需要再监听写事件了
writeCompleteCallback_:通知上层业务,数据发完了,业务层可以继续发下一条消息
- 如果用户之前调用了 shutdown(),但当时缓冲区还有数据没发完,现在发完了,可以真正关闭连接了,调用
shutdownInLoop()关闭socket,因为handleWrite 已经在 IO 线程,所以直接调用shutdownInLoop,而不是shutdown
- 发送数据失败(
n <= 0):打印错误信息
channel 并没有开启写事件,但 epoll却触发了 write 回调,属于逻辑错误,打印日志,帮助排查网络库内部 bug
1 2 3 4 5 6 7 8 9 10
| void TcpConnection::handleClose() { LOG_INFO("TcpConnection::handleClose fd={} state={}", channel_->fd(), (int)state_); setState(kDisconnected); channel_->disableAll();
TcpConnectionPtr connPtr(shared_from_this()); connectionCallback_(connPtr); closeCallback_(connPtr); }
|
这是 TCP 连接关闭的总入口,所有关闭流程最终都会走到这里
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| void TcpConnection::handleError() { int optval; socklen_t optlen = sizeof optval; int err = 0; if(::getsockopt(channel_->fd(), SOL_SOCKET, SO_ERROR, &optval, &optlen) < 0) { err = errno; } else { err = optval; } LOG_ERROR("TcpConnection::handleError name:{} - SO_ERROR:{}", name_, err); }
|
TCP 连接出错时的错误处理函数
为什么要调用 getsockopt(...SO_ERROR...)?
handleWrite 和 send 有什么区别?
send():主动发数据,用户调用,数据进缓冲区
handleWrite():内核可写事件触发,自动发送缓冲区剩余数据
TcpConnection流程图
一、总流程(主线)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| 客户端发起连接 ↓ TcpServer::newConnection 创建 TcpConnection ↓ TcpConnection::connectEstablished 【连接建立】 ↓ 注册 Channel 事件 + weak_ptr 安全回调 触发 connectionCallback_(已连接) ↓ 【事件循环】 可读 → handleRead 可写 → handleWrite 出错 → handleError 对端关闭 → handleClose ↓ 【连接关闭】 handleRead 返回 0 / 主动 shutdown ↓ handleClose 【统一关闭入口】 ↓ 关闭所有事件 → 回调上层 → 通知 TcpServer 回收 ↓ TcpConnection::connectDestroyed ↓ 从 epoll 移除 Channel,清理资源 ↓ 最后一个 shared_ptr 释放 ↓ TcpConnection 析构 → Socket 关闭 → 资源释放
|
二、分阶段详细流程图
1)连接建立阶段
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| accept 得到 connfd ↓ new TcpConnection(loop, name, sockfd, local, peer) ↓ 创建 Socket、Channel ↓ connectEstablished() ↓ setState(kConnected) ↓ Channel 注册读/写/关闭/错误回调 ↓ 每个回调:weak_ptr + lock() 保护 ↓ channel_->enableReading() 开始监听读 ↓ connectionCallback_(conn) 通知上层:已连接
|
2)读数据流程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| epoll 触发 EPOLLIN ↓ Channel::handleEvent ↓ readCallback 被调用 ↓ weak_self.lock() ↓ 成功 → self->handleRead(receiveTime) ↓ inputBuffer_.readFd(fd) 从内核读数据 ↓ n > 0 → messageCallback_(业务处理数据) n = 0 → handleClose() 对端关闭 n < 0 → handleError()
|
3)写数据流程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| 上层调用 TcpConnection::send ↓ 数据放入 outputBuffer_ ↓ 注册写事件 channel_->enableWriting() ↓ epoll 触发 EPOLLOUT ↓ writeCallback → handleWrite() ↓ outputBuffer_.writeFd(fd) 发送数据 ↓ retrieve(n) 扔掉已发送数据 ↓ 缓冲区空: channel_->disableWriting() writeCompleteCallback_ 发送完成 若 state==kDisconnecting → shutdownInLoop()
|
4)错误处理流程
1 2 3 4 5 6 7 8 9 10 11
| socket 发生错误 ↓ epoll 触发 EPOLLERR ↓ errorCallback → handleError() ↓ getsockopt(SO_ERROR) 获取真实错误 ↓ 打印错误日志(104/110/111等) ↓ 通常随后触发 handleClose 关闭连接
|
5)关闭流程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| 触发关闭: ① handleRead n=0 ② 主动 shutdown ③ 出错 ↓ handleClose() ↓ setState(kDisconnected) channel_->disableAll() 关闭所有事件 ↓ connectionCallback_(断开) closeCallback_(通知TcpServer) ↓ TcpServer 从 map 移除连接 ↓ connectDestroyed() ↓ channel_->remove() 从 epoll 移除 ↓ Channel 回调被清空 ↓ weak_ptr 失效 ↓ 引用计数归零 ↓ ~TcpConnection() 析构 ↓ socket 关闭,fd 释放
|
源码地址
Callbacks.h:https://gitee.com/lpzdinghai/lpzmuduo/blob/master/Callbacks.h
TcpConnection.h:https://gitee.com/lpzdinghai/lpzmuduo/blob/master/TcpConnection.h
TcpConnecthttps://gitee.com/lpzdinghai/lpzmuduo/blob/master/TcpConnection.cc