TcpConnectionmuduo 网络库核心的 TCP 连接类,封装了一个完整的 TCP 客户端 - 服务端连接,是网络通信中处理数据读写、连接状态、事件回调的核心组件,我会逐行、分模块给你讲透。

核心定位

  1. 对应一个已建立的 TCP socket 连接,生命周期由智能指针管理
  2. 运行在 subLoop(IO 线程) 中,不与主线程耦合
  3. 封装 socket、channel、缓冲区,对外提供简洁的连接操作接口
  4. 处理读、写、关闭、错误四种 IO 事件

TcpConnection.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#pragma once

#include "Buffer.h"
#include "InetAddress.h"
#include "Callbacks.h"
#include "Timestamp.h"
#include "noncopyable.h"

#include <string>
#include <atomic>
#include <memory>

class EventLoop;
class Socket;
class Channel;
1
class TcpConnection : noncopyable, public std::enable_shared_from_this<TcpConnection>
  • noncopyable:禁止拷贝 / 赋值,一个 TCP 连接只能有一个实例

  • enable_shared_from_this:安全获取自身的shared_ptr,解决异步回调中对象生命周期失效问题

私有成员

1
2
3
4
5
6
7
8
enum stateE
{
kDisconnected, //已断开连接
kDisconnecting, //正在断开连接
kConnected, //已连接
kConnecting //正在连接
};
std::atomic<int> state_;
  • 原子类型保证多线程下状态修改不冲突

  • 状态机管理连接生命周期,避免非法操作

1
2
3
4
5
6
7
8
9
EventLoop* loop_; //这里不是baseloop;TcpConnection是在subloop中管理的
std::string name_;
bool reading_; // 是否正在监听读事件

std::unique_ptr<Socket> socket_;
std::unique_ptr<Channel> channel_;

const InetAddress localAddr_;
const InetAddress peerAddr_;
  • 一个连接持有唯一socketchannel,所以使用unique_ptr
  • 存储本地和对端的地址
1
2
3
4
5
6
ConnectionCallback connectionCallback_; // 有新连接时的回调
MessageCallback messageCallback_; // 有读写消息时的回调
WriteCompleteCallback writeCompleteCallback_; // 消息发送完成以后的回调
HighWaterMarkCallback highWaterMarkCallback_;
CloseCallback closeCallback_;
size_t highWaterMark_;

网络层只负责 IO,业务逻辑通过回调函数注入,实现解耦

1
2
Buffer inputBuffer_;  //接收数据的缓冲区
Buffer outputBuffer_; //发送数据的缓冲区

读写双缓冲区

内部函数

1
2
3
4
void handleRead(Timestamp receiveTime);   // 处理读事件
void handleWrite(); // 处理写事件
void handleClose(); // 对端关闭连接
void handleError(); // 处理socket错误

事件处理函数,channel绑定的函数,channel 监听到事件后,自动调用这些函数

1
void setState(stateE state) { state_ = state; }

设置状态

1
2
void sendInLoop(const void* message, size_t len);
void shutdownInLoop();

外部调用send/shutdown,最终会执行这两个函数

公有函数(对外接口)

1
2
3
4
5
6
TcpConnection(EventLoop* loop,
const std::string& nameArg,
int sockfd,
const InetAddress& localAddr,
const InetAddress& peerAddr);
~TcpConnection();
1
2
3
4
5
EventLoop* getLoop() const { return loop_; }
const std::string& name() const { return name_; }
const InetAddress& localAddr() const { return localAddr_; }
const InetAddress& peerAddr() const { return peerAddr_; }
bool connected() const { return state_ == kConnected; }

只读获取信息

1
2
3
4
//发送数据
void send(const std::string& buf);
//关闭连接
void shutdown();

分别配合sendInLoopshutdownInLoop实现跨线程安全的操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
void setConnectionCallback(const ConnectionCallback& cb)
{ connectionCallback_ = cb; }

void setMessageCallback(const MessageCallback& cb)
{ messageCallback_ = cb; }

void setWriteCompleteCallback(const WriteCompleteCallback& cb)
{ writeCompleteCallback_ = cb; }

void setHighWaterMarkCallback(const HighWaterMarkCallback& cb, size_t highWaterMark)
{ highWaterMarkCallback_ = cb; highWaterMark_ = highWaterMark; }

void setCloseCallback(const CloseCallback& cb)
{ closeCallback_ = cb; }

业务层通过这些接口注册自己的处理逻辑

1
2
3
4
//连接建立
void connectEstablish();
//连接销毁
void connectDestroy();

TcpConnection对象创建后,要立即调用connectEstablish,在这个函数里会进行一系列操作让连接 “活过来” ,如:保证连接的生命周期

TcpConnection.cc

1
2
3
4
5
#include "Logger.h"
#include "TcpConnection.h"
#include "Channel.h"
#include "Socket.h"
#include "EventLoop.h"
1
2
3
4
5
6
7
8
static EventLoop* CheckLoopNotNull(EventLoop *loop)
{
if (loop == nullptr)
{
LOG_FATAL("{}:{}:{} TcpConnection Loop is null! \n", __FILE__, __FUNCTION__, __LINE__);
}
return loop;
}

空指针安全检查

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
TcpConnection::TcpConnection(EventLoop* loop,
const std::string& nameArg,
int sockfd,
const InetAddress& localAddr,
const InetAddress& peerAddr)
: loop_(CheckLoopNotNull(loop))
, name_(nameArg)
, state_(kConnecting)
, reading_(true)
, socket_(new Socket(sockfd))
, channel_(new Channel(loop, sockfd))
, localAddr_(localAddr)
, peerAddr_(peerAddr)
{
// 下面给channel设置相应的回调函数,poller给channel通知感兴趣的事件发生了,channel会回调相应的操作函数
channel_->setReadCallback(
std::bind(&TcpConnection::handleRead, this, std::placeholders::_1)
);
channel_->setWriteCallback(
std::bind(&TcpConnection::handleWrite, this)
);
channel_->setCloseCallback(
std::bind(&TcpConnection::handleClose, this)
);
channel_->setErrorCallback(
std::bind(&TcpConnection::handleError, this)
);

LOG_INFO("TcpConnection::ctor[{}] at fd={}", name_, sockfd);
socket_->setKeepAlive(true);
}
  • 设置回调,this->指向当前 TcpConnection 对象
  • 打印连接创建信息,开启 TCP-Alive 保活机制
1
2
3
4
5
TcpConnection::~TcpConnection()
{
LOG_INFO("TcpConnection::dtor[{}] at fd={} state={}",
name_, channel_->fd(), (int)state_);
}

析构函数,打印信息,dtor= destructor 的缩写

1
2
3
4
5
6
7
8
9
10
// 连接建立
void TcpConnection::connectEstablished()
{
setState(kConnected);
channel_->tie(shared_from_this());
channel_->enableReading(); // 向poller注册channel的epollin事件

// 新连接建立,执行回调
connectionCallback_(shared_from_this());
}

channel_->tie(shared_from_this());把 TcpConnection 自身的 shared_ptr 绑定到 Channel,让 Channel 持有一个 weak_ptr 指向当前连接。事件触发时:Channel 先 lock() 一下,如果连接还活着 → 处理事件,如果连接已经销毁 → 直接跳过,不访问野指针

1
2
3
4
5
6
7
8
9
10
void TcpConnection::connectDestroy()
{
if(state_ == kConnected)
{
setState(kDisconnected);
channel_->disableAll();
connectionCallback_(shared_from_this());
}
channel_->remove();
}
  • 让这个 TCP 连接彻底 “停止工作”,不再接收任何事件
  • 这个函数执行完,连接并没有真正销毁,这个函数只是:关闭事件修改状态取消监听
  • 真正的销毁时机是:当最后一个 shared_ptr<TcpConnection> 被释放时
  • 整个关闭流程的最后一环
    • connectDestroyed 执行
      • 关闭事件
      • 不再触发回调
    • 即使还有旧回调在队列里
      • 调用 lock()
      • 发现连接已销毁 → 返回 nullptr
      • 不执行,直接跳过,不崩溃
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
void TcpConnection::shutdown()
{
if(state_ == kConnected)
{
setState(kDisconnecting);
loop_->runInLoop(std::bind(&TcpConnection::shutdownInLoop, this));
}
}

void TcpConnection::shutdownInLoop()
{
if(!channel_->isWriting())
{
socket_->shutdownWrite();
}
}
  • shutdown:设置正在关闭连接的状态,用 runInLoop 将真正关闭函数(shutdownInLoop)跑到 IO 线程执行。

  • shutdownInLoop:判断 !isWriting(),如果还在发送数据(outputBuffer 还有内容),不能关闭,要等发送完成后,才能关闭

  • 正在发数据时会因isWriting()true先跳过此次函数(shutdownInLoop),handleWrite逻辑中,发完数据后会检查 state_ == kDisconnecting,将自动调用shutdownInLoop

1
2
3
4
5
6
7
8
9
10
11
12
13
14
void TcpConnection::send(const std::string& buf)
{
if(state_ == kConnected)
{
if(loop_->isInLoopThread())
{
sendInLoop(buf.c_str(), buf.size());
}
else
{
loop_->queueInLoop(std::bind(&TcpConnection::sendInLoop, this, buf.c_str(), buf.size()));
}
}
}
  • state_ == kConnected只有已连接状态才能发送数据
  • loop_->isInLoopThread()判断当前调用 send 的线程是不是这个连接所属的 IO 线程
    • 是 IO 线程 → 直接 sendInLoop
    • 不是 IO 线程 → queueInLoop,把发送任务打包丢给 IO 线程,保证 sendInLoop 一定在正确的线程运行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
void TcpConnection::sendInLoop(const void* message, size_t len)
{
size_t nwrote = 0; //已发送字节数
size_t remaining = len; //剩余没发送字节数
bool faultError = false;

// 之前调用过该connection的shutdown,不能再进行发送了
if(state_ == kDisconnected)
{
LOG_ERROR("Disconnected, stop writing");
return;
}

if(!channel_->isWriting() && outputBuffer_.readableBytes() == 0)
{
nwrote = ::write(channel_->fd(), message, len);
if(nwrote >= 0)
{
remaining = len - nwrote;
if(remaining == 0 && writeCompleteCallback_)
{
loop_->queueInLoop(std::bind(writeCompleteCallback_, shared_from_this()));
}
}
else
{
nwrote = 0;
if (errno != EWOULDBLOCK)
{
LOG_ERROR("TcpConnection::sendInLoop");
if (errno == EPIPE || errno == ECONNRESET) // SIGPIPE RESET
{
faultError = true;
}
}
}
}

// 说明当前这一次write,并没有把数据全部发送出去,剩余的数据需要保存到缓冲区当中,然后给channel
// 注册epollout事件,poller发现tcp的发送缓冲区有空间,会通知相应的sock-channel,调用writeCallback_回调方法
// 也就是调用TcpConnection::handleWrite方法,把发送缓冲区中的数据全部发送完成
if (!faultError && remaining > 0)
{
size_t oldLen = outputBuffer_.readableBytes();
if(oldLen + remaining >= highWaterMark_
&& oldLen < highWaterMark_
&& highWaterMarkCallback_)
{
loop_->queueInLoop(
std::bind(highWaterMarkCallback_, shared_from_this(), oldLen + remaining)
);
}
outputBuffer_.append((char*)message + nwrote, remaining);
if(!channel_->isWriting())
{
channel_->enableWriting();
}
}
}

核心逻辑:

  • 先尝试直接发送数据:channel 没在写 + 发送缓冲区为空 → 可以直接 write 发送

    • 发送成功:数据一次性发完,触发发送完成回调
    • 发送失败:
      • EWOULDBLOCK = 发送缓冲区满,正常,不是错误
      • EPIPE / ECONNRESET = 连接断开,错误
  • 没发完的数据存入缓冲区(没发生致命错误 + 还有数据没发完)

    • 高水位判断(缓冲区太大了):没发完的数据(remaining)放入缓冲区之前没到高水位线,放入后超过高水位线,调用高水位回调(作用:发送缓冲区太大,通知用户限流)
    • 剩余数据追加到缓冲区,并注册写事件,让 epoll 通知可写
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
void TcpConnection::handleRead(Timestamp receiveTime)
{
int savedErrno = 0;
ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno);
if(n > 0)
{
messageCallback_(shared_from_this(), &inputBuffer_, receiveTime);
}
else if(n == 0)
{
handleClose();
}
else
{
errno = savedErrno;
LOG_ERROR("TcpConnection::handleRead");
handleError();
}
}

客户端发消息过来时真正处理读操作的函数

  • 调用缓冲区类实现的读函数,将数据读到接收缓冲区

  • 读取成功(n > 0):把数据交给上层业务(messageCallback_回调)

  • 读到 0:对端关闭连接,调用handleClose()

  • 读取出错:把错误码恢复回去,打印日志,调用 handleError() 处理异常关闭

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
void TcpConnection::handleWrite()
{
if(channel_->isWriting())
{
int savedErrno = 0;
ssize_t n = outputBuffer_.writeFd(channel_->fd(), &savedErrno);
if(n > 0)
{
outputBuffer_.retrieve(n);
if(outputBuffer_.readableBytes() == 0)
{
channel_->disableWriting();
if(writeCompleteCallback_)
{
loop_->queueInLoop(std::bind(writeCompleteCallback_, shared_from_this()));
}

if(state_ == kDisconnecting)
{
shutdownInLoop();
}
}
}
else
{
LOG_ERROR("TcpConnection::handleWrite");
}
}
else
{
LOG_ERROR("TcpConnection fd={} is down, no more writing ", channel_->fd());
}
}
  • 先检查当前 channel 是否注册了写事件isWriting

  • 调用缓冲区的writeFd(),从 outputBuffer_ 拿数据

  • 读取成功(n > 0):

    • retrieve(n):缓冲区里前 n 字节已经发走了,把这部分数据扔掉,剩下的留在缓冲区下次发
    • readableBytes() == 0:数据全部发完了
      • channel_->disableWriting()发送缓冲区空了,不需要再监听写事件了
      • writeCompleteCallback_:通知上层业务,数据发完了,业务层可以继续发下一条消息
      • 如果用户之前调用了 shutdown(),但当时缓冲区还有数据没发完,现在发完了,可以真正关闭连接了,调用shutdownInLoop()关闭socket,因为handleWrite 已经在 IO 线程,所以直接调用shutdownInLoop,而不是shutdown
    • 发送数据失败(n <= 0):打印错误信息
  • channel 并没有开启写事件,但 epoll却触发了 write 回调,属于逻辑错误,打印日志,帮助排查网络库内部 bug

1
2
3
4
5
6
7
8
9
10
void TcpConnection::handleClose()
{
LOG_INFO("TcpConnection::handleClose fd={} state={}", channel_->fd(), (int)state_);
setState(kDisconnected);
channel_->disableAll();

TcpConnectionPtr connPtr(shared_from_this());
connectionCallback_(connPtr);
closeCallback_(connPtr);
}

这是 TCP 连接关闭的总入口,所有关闭流程最终都会走到这里

  • 先打印日志,再把连接状态标记为已断开

  • 关闭所有事件,epoll 再也不会管这个连接

  • connPtr:拿到自己的强引用

  • connectionCallback_(connPtr);通知上层业务:连接断开了!上层在这里可以做:用户下线、清理会话、关闭资源、记录日志

  • closeCallback_(connPtr);通知 TcpServer把这个连接删掉,这个回调是给网络库自己用的:TcpServer 从 map 中移除这个连接,调用 connectDestroyed(),最终释放连接对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
void TcpConnection::handleError()
{
int optval;
socklen_t optlen = sizeof optval;
int err = 0;
if(::getsockopt(channel_->fd(), SOL_SOCKET, SO_ERROR, &optval, &optlen) < 0)
{
err = errno;
}
else
{
err = optval;
}
LOG_ERROR("TcpConnection::handleError name:{} - SO_ERROR:{}", name_, err);
}

TCP 连接出错时的错误处理函数

  • getsockopt(...):从 socket 中获取真正的错误信息

    • err = errno;:如果 getsockopt 自己失败了,取 errno
    • err = optval;:成功获取,错误码存在 optval 里
  • 打印错误日志

为什么要调用 getsockopt(...SO_ERROR...)

  • 连接出错时,epoll只告诉你出错了,epoll通知的错误,不会把 errno 直接给你

  • 必须主动调用 getsockopt 才能拿到真正的 socket 错误

handleWritesend 有什么区别?

  • send()主动发数据,用户调用,数据进缓冲区
  • handleWrite()内核可写事件触发,自动发送缓冲区剩余数据

TcpConnection流程图

一、总流程(主线)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
客户端发起连接

TcpServer::newConnection 创建 TcpConnection

TcpConnection::connectEstablished 【连接建立】

注册 Channel 事件 + weak_ptr 安全回调
触发 connectionCallback_(已连接)

【事件循环】
可读 → handleRead
可写 → handleWrite
出错 → handleError
对端关闭 → handleClose

【连接关闭】
handleRead 返回 0 / 主动 shutdown

handleClose 【统一关闭入口】

关闭所有事件 → 回调上层 → 通知 TcpServer 回收

TcpConnection::connectDestroyed

从 epoll 移除 Channel,清理资源

最后一个 shared_ptr 释放

TcpConnection 析构 → Socket 关闭 → 资源释放

二、分阶段详细流程图

1)连接建立阶段

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
accept 得到 connfd

new TcpConnection(loop, name, sockfd, local, peer)

创建 Socket、Channel

connectEstablished()

setState(kConnected)

Channel 注册读/写/关闭/错误回调

每个回调:weak_ptr + lock() 保护

channel_->enableReading() 开始监听读

connectionCallback_(conn) 通知上层:已连接

2)读数据流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
epoll 触发 EPOLLIN

Channel::handleEvent

readCallback 被调用

weak_self.lock()

成功 → self->handleRead(receiveTime)

inputBuffer_.readFd(fd) 从内核读数据

n > 0 → messageCallback_(业务处理数据)
n = 0 → handleClose() 对端关闭
n < 0 → handleError()

3)写数据流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
上层调用 TcpConnection::send

数据放入 outputBuffer_

注册写事件 channel_->enableWriting()

epoll 触发 EPOLLOUT

writeCallback → handleWrite()

outputBuffer_.writeFd(fd) 发送数据

retrieve(n) 扔掉已发送数据

缓冲区空:
channel_->disableWriting()
writeCompleteCallback_ 发送完成
若 state==kDisconnecting → shutdownInLoop()

4)错误处理流程

1
2
3
4
5
6
7
8
9
10
11
socket 发生错误

epoll 触发 EPOLLERR

errorCallback → handleError()

getsockopt(SO_ERROR) 获取真实错误

打印错误日志(104/110/111等)

通常随后触发 handleClose 关闭连接

5)关闭流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
触发关闭:
① handleRead n=0
② 主动 shutdown
③ 出错

handleClose()

setState(kDisconnected)
channel_->disableAll() 关闭所有事件

connectionCallback_(断开)
closeCallback_(通知TcpServer)

TcpServer 从 map 移除连接

connectDestroyed()

channel_->remove() 从 epoll 移除

Channel 回调被清空

weak_ptr 失效

引用计数归零

~TcpConnection() 析构

socket 关闭,fd 释放

源码地址

Callbacks.h:https://gitee.com/lpzdinghai/lpzmuduo/blob/master/Callbacks.h

TcpConnection.h:https://gitee.com/lpzdinghai/lpzmuduo/blob/master/TcpConnection.h

TcpConnecthttps://gitee.com/lpzdinghai/lpzmuduo/blob/master/TcpConnection.cc