TcpServer是用户使用 muduo 的唯一入口,负责启动服务器、接收新连接、分配 IO 线程、管理所有客户端连接。所有底层细节全部被 TcpServer 封装隐藏,用户只操作 TcpServer

Callbacks.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#pragma once

#include <memory>
#include <functional>

class Timestamp;
class Buffer;
class TcpConnection;

using TcpConnectionPtr = std::shared_ptr<TcpConnection>;
using ConnectionCallback = std::function<void(const TcpConnectionPtr&)>;
using CloseCallback = std::function<void(const TcpConnectionPtr&)>;
using WriteCompleteCallback = std::function<void (const TcpConnectionPtr&)>;
using MessageCallback = std::function<void (const TcpConnectionPtr&,
Buffer*,
Timestamp)>;
using HighWaterMarkCallback = std::function<void (const TcpConnectionPtr&, size_t)>;
  • 给所有网络事件起别名、定义类型
  • Buffer类和TcpConnection类还未实现。

TcpServer.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#include "noncopyable.h"
#include "Callbacks.h"
#include "EventLoopThreadPool.h"
#include "Acceptor.h"
#include "InetAddress.h"
#include "EventLoop.h"
#include "TcpConnection.h"

#include <unordered_map>
#include <memory>
#include <string>
#include <atomic>

class EventLoop;
class InetAddress;

1
2
3
4
5
6
7
8
9
10
class TcpServer : noncopyable
{
public:
using ThreadInitCallback = std::function<void(EventLoop*)>;

enum class Option
{
kNoReusePort,
kReusePort,
};
  • noncopyacble服务器只有一个,不能复制
  • ThreadInitCallback 线程初始化回调
  • Option端口复用选项,用 enum class强枚举类型:类型安全、不会隐式转换

私有成员变量

1
2
3
EventLoop* loop_; //baseLoop 用户设置的loop
const std::string name_;
const std::string ipPort_;
  • loop_:负责接收监听的主线程循环(mainLoop/baseLoop
  • name_:服务器名字
  • ipPort_:监听地址(ip + 端口)
1
2
3
4
ThreadInitCallback threadInitCallback_; // loop线程初始化的回调
ConnectionCallback connectionCallback_; // 有新连接时的回调
MessageCallback messageCallback_; // 有读写消息时的回调
WriteCompleteCallback writeCompleteCallback_; // 消息发送完成以后的回调

四个回调函数是网络库与用户业务代码的桥梁—— 网络库只管底层 IO,业务逻辑(你要干嘛)全靠这些回调来实现

  • 回调调用时机
1
2
3
4
5
6
7
8
9
10
11
12
13
服务器启动

客户端连上来
→ ConnectionCallback(连接建立)

客户端发消息
→ MessageCallback(处理消息)

你回复消息
→ WriteCompleteCallback(发送完成)

客户端断开
→ ConnectionCallback(连接断开)
1
2
std::unique_ptr<Acceptor> acceptor_;
std::shared_ptr<EventLoopThreadPool> threadPool_;
  • unique_ptr<Acceptor>独占所有权,一个 TcpServer 只能有一个接收器,绝不共享

  • shared_ptr<EventLoopThreadPool>共享所有权,线程池需要在多处、多线程被安全持有,必须共享

1
2
3
4
5
std::atomic_int started_;
int nextConnId_;

using ConnectionMap = std::unordered_map<std::string, TcpConnectionPtr>;
ConnectionMap connections_;
  • started_:保证服务器只启动一次,线程安全,用原子变量是为了**防止用户在多线程里乱调用 start ()**(atomic_intatomic<int>的别名,作用完全一样)
  • nextConnId_给每一个新连接分配唯一 ID,从 1 开始递增,与 name_ + nextConnId_ 拼接成连接唯一名称
  • connecions_:服务器管理所有客户端连接的哈希表

内部函数

1
2
3
4
private:
void newConnection(int sockfd, const InetAddress &peerAddr);
void removeConnection(const TcpConnectionPtr &conn);
void removeConnectionInLoop(const TcpConnectionPtr &conn);
  • newConnection处理新客户端连接 → 创建 TcpConnection → 加入管理,Acceptor 监听到新客户端,调用这个函数
  • removeConnection(关闭连接的安全入口)和removeConnectionInLoop(真正删除连接)配合关闭连接

成员函数

1
2
3
4
5
TcpServer(EventLoop* loop, 
const InetAddress& listenAddr,
const std::string& nameArg,
Option option = Option::kNoReusePort);
~TcpServer();

构造函数:创建一个 TCP 服务器对象,但还不启动监听

1
2
3
4
void setThreadInitcallback(const ThreadInitCallback &cb) { threadInitCallback_ = cb; }
void setConnectionCallback(const ConnectionCallback &cb) { connectionCallback_ = cb; }
void setMessageCallback(const MessageCallback &cb) { messageCallback_ = cb; }
void setWriteCompleteCallback(const WriteCompleteCallback &cb) { writeCompleteCallback_ = cb; }

设置四类回调的接口

  • setThreadInitCallback:每个 IO 线程刚启动时调用,初始化线程(绑定 CPU、初始化日志、配置参数),很少用
  • setConnectionCallback:客户端连接和断开时调用
    • 打印日志
    • 统计在线人数
    • 初始化连接资源
  • setMessageCallback:收到客户端数据(socket 有数据可读)时调用,写业务逻辑时常用
    • 解析协议
    • 执行业务逻辑
    • 发送回复
  • setWriteCompleteCallback:数据发送完毕调用,发送缓冲区清空
    • 大文件分片发送
    • 流量控制
    • 发送完成日志
1
2
3
4
5
//设置底层subloop的个数
void setThreadNum(int numThreads);

//开启服务器监听
void start();

TcpServer.cc

1
2
#include "TcpServer.h"
#include "Logger.h"
1
2
3
4
5
6
7
8
static EventLoop* checkLoopNotNull(EventLoop* loop)
{
if(loop == nullptr)
{
LOG_FATAL("{}:{}:{} mainLoop is null", __FILE__, __FUNCTION__, __LINE__);
}
return loop;
}
  • 防止用户传入空指针(nullptr)导致程序崩溃

  • static:只在当前源文件内可见

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
TcpServer::TcpServer(EventLoop* loop, 
const InetAddress& listenAddr,
const std::string& nameArg,
Option option)
: loop_(loop)
, name_(nameArg)
, ipPort_(listenAddr.toIpPort())
, acceptor_(new Acceptor(loop, listenAddr, option == Option::kReusePort))
, threadPool_(new EventLoopThreadPool(loop, nameArg))
, nextConnId_(1)
, started_(0)
{
acceptor_->setNewConnectionCallback(std::bind(&TcpServer::newConnection, this,
std::placeholders::_1, std::placeholders::_2));
}
  • TcpServer::newConnection函数绑定给 Acceptor,监听到新连接后,调用TcpServer 的 newConnection 函数

  • Acceptor 只负责 accept 连接,拿到 sockfd,Acceptor 不知道怎么处理连接,所以把处理逻辑通过回调交给 TcpServer

  • 因为 Acceptor 将来调用回调时,会传进来 2 个参数(sockfd 和 peerAddr),现在我们不知道这两个值是什么,所以先用 _1_2 占个位置

1
2
3
4
5
6
7
8
9
10
11
12
13
TcpServer::~TcpServer()
{
for(auto& item : connections_)
{
TcpConnectionPtr conn(item.second);
item.second.reset();

//销毁链接
conn->getLoop()->runInLoop(
std::bind(&TcpConnection::connectDestroyed, conn)
);
}
}
  • 析构逻辑:

    • 遍历所有还活着的客户端连接,取出value:TcpConnectionPtr(也就是 shared_ptr<TcpConnection>

    • 把连接从 TcpServer 的管理 map 中移除

    • 把每个连接丢回它自己的 IO 线程安全销毁(TcpConnection相关函数还没实现)

  • TcpConnectionPtr conn(item.second);把 map 里的 shared_ptr 引用计数 +1,让连接对象暂时不会被释放

  • item.second.reset();这会让 map 里的 shared_ptr 断开引用。如果不先复制给局部 conn连接可能在这里直接被释放掉,后面就没法用了

  • 销毁链接:

    • conn->getLoop()

      每个 TcpConnection 都有 自己的 IO 线程(EventLoop)连接的读写、销毁必须在自己的 IO线程执行

    • runInLoop()

      跨线程安全调用
      作用:把要执行的函数,丢到连接所属的 IO 线程里去执行,保证线程安全、不发生并发冲突。

    • std::bind(&TcpConnection::connectDestroyed, conn)
      绑定要执行的销毁函数
      conn(shared_ptr)绑定进去
      → 引用计数再次 +1
      → 确保函数执行完之前,连接对象不会死

1
2
3
4
5
//设置底层subloop的个数
void TcpServer::setThreadNum(int numThreads)
{
threadPool_->setNumThreads(numThreads);
}

线程池才是管理线程的人,通过线程池设置线程数。

1
2
3
4
5
6
7
8
9
//开启服务器监听
void TcpServer::start()
{
if(started_++ == 0)
{
threadPool_->start(threadInitCallback_);
loop_->runInLoop(std::bind(&Acceptor::listen, acceptor_.get()));
}
}
  • threadPool_->start(threadInitCallback_);启动工作线程池,创建并启动所有 subLoop 线程(IO 工作线程)

  • loop_->runInLoop()让主线程安全执行监听,让 Acceptor 在 baseLoop 线程中调用 listen() 开始监听端口

  • get():获取被智能指针包装的对象的裸指针

为什么要loop_->runInLoop ()?不能直接调用 acceptor_->listen ()

  • 所有 IO 操作(listen、read、write)必须在创建它的 EventLoop 线程中执行

  • acceptor_ 是在 loop_(baseLoop 主线程) 中创建的,所以 acceptor_->listen() 必须在主线程中调用

  • runInLoop()会保证把回调函数丢到loop_ 所属的主线程去执行

==接下来还剩三个内部函数没有实现,因为这三个函数会涉及TcpConnection相关内容,所以等实现完TcpConnection后再实现这三个函数。==

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
void TcpServer::removeConnection(const TcpConnectionPtr &conn)
{
loop_->runInLoop(std::bind(&TcpServer::removeConnectionInLoop, this, conn));
}

void TcpServer::removeConnectionInLoop(const TcpConnectionPtr &conn)
{
LOG_INFO("TcpServer::removeConnectionInLoop [{}] - connection {}",
name_, conn->name());

connections_.erase(conn->name());
EventLoop* ioLoop = conn->getLoop();
ioLoop->queueInLoop(
std::bind(&TcpConnection::connectDestroy, conn)
);
}
  • TcpServer::removeConnection:接口函数,把连接销毁函数扔到对应正确线程执行

  • TcpServer::removeConnectionInLoop

    • 从 TcpServer 管理的连接列表中删掉这个连接,TcpServer 不再持有这个 TcpConnection 的 shared_ptr
    • 找到并取出这个连接所属的 IO 线程
    • 把真正的销毁操作,丢到连接所属的 IO 线程去执行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
void TcpServer::newConnection(int sockfd, const InetAddress &peerAddr)
{
EventLoop* ioLoop = threadPool_->getNextLoop();

char buf[64] = { 0 };
snprintf(buf, sizeof buf, "-%s#%d", ipPort_.c_str(), nextConnId_);
nextConnId_++;
std::string connName = name_ + buf;

LOG_INFO("TcpServer::newConnection [{}] - new connection [{}] from {} ",
name_, connName, peerAddr.toIpPort());

sockaddr_in localAddr;
::bzero(&localAddr, sizeof localAddr);
socklen_t addrlen = sizeof(localAddr);
if(::getsockname(sockfd, (sockaddr*)&localAddr, &addrlen) < 0)
{
LOG_ERROR("sockets:getsockname");
}
InetAddress localInetAddr(localAddr);

TcpConnectionPtr conn(new TcpConnection(ioLoop, connName, sockfd, localInetAddr, peerAddr));
connections_[connName] = conn;

// 下面的回调都是用户设置给TcpServer=>TcpConnection=>Channel=>Poller=>notify channel调用回调
conn->setConnectionCallback(connectionCallback_);
conn->setMessageCallback(messageCallback_);
conn->setWriteCompleteCallback(writeCompleteCallback_);

// 设置了如何关闭连接的回调 conn->shutDown()
conn->setCloseCallback(
std::bind(&TcpServer::removeConnection, this, std::placeholders::_1)
);

// 直接调用TcpConnection::connectEstablished
ioLoop->runInLoop(std::bind(&TcpConnection::connectEstablish, conn, conn));
}

接收客户端新连接 → 分配 IO 线程 → 创建 TcpConnection → 设置所有回调 → 启动连接

  • 轮询算法从线程池获取下一个 IO 线程

  • 生成唯一连接名

  • 获取本地地址

  • 创建 TcpConnection 并加入管理

  • 设置用户回调

  • 设置关闭回调(连接断开时自动调用)

  • 调用TcpConnection::connectEstablished,后面这个函数改了一次,所以多了个参数

源码地址

Callbacks.h:https://gitee.com/lpzdinghai/lpzmuduo/blob/master/Callbacks.h

TcpServer.h:https://gitee.com/lpzdinghai/lpzmuduo/blob/master/TcpServer.h

TcpServer.cc:https://gitee.com/lpzdinghai/lpzmuduo/blob/master/TcpServer.cc