EventLoopThreadPoolmuduo 网络库核心的 IO 线程池类,专门用于多 Reactor 网络模型:一个主线程(baseLoop)处理连接,多个子 IO 线程处理读写事件,实现高并发网络编程。

EventLoopThreadPool = 事件循环线程池(预先创建固定数量的长期运行线程,每个线程绑定一个EventLoop,通过轮询分配网络连接)

  • 管理一组 EventLoopThread(每个线程自带一个EventLoop
  • 对外提供轮询分配IO 线程的能力
  • 保证线程安全、不可拷贝(网络库通用设计)

EventLoopThreadPool.h

1
2
3
4
5
6
7
8
9
10
11
12
#pragma once

#include "noncopyable.h"

#include <string>
#include <functional>
#include <atomic>
#include <vector>
#include <memory>

class EventLoop;
class EventLoopThread;
1
2
3
4
class EventLoopThreadPool : noncopyable
{
public:
using ThreadInitCallback = std::function<void(EventLoop*)>;
  • 线程池是全局唯一资源,禁止拷贝
  • ThreadInitCallback 线程初始化回调
1
2
3
4
5
6
7
8
private:
bool started_;
EventLoop* baseLoop_;
std::string name_;
int numThreads_;
std::atomic<int> next_; //轮询计数器
std::vector<std::unique_ptr<EventLoopThread>> threads_;
std::vector<EventLoop*> loops_;
  • baseLoop_主线程的 EventLoop,Acceptor(接收连接) 依附它
  • numThreads_子 IO 线程的数量
  • next_轮询计数器,原子变量
  • std::vector<std::unique_ptr<EventLoopThread>> threads_存储子线程对象,用unique_ptr自动管理,避免内存泄漏
  • loops_每个子线程启动后,会创建一个EventLoop,统一存在这里,供getNextLoop()快速取用

为什么 next_ 设为原子变量?

因为 getNextLoop() 这个函数,可能被多个不同的线程同时调用。只要有人调用 getNextLoop(),就会读写 next_。用原子变量,不使用 mutex,性能更高

例子:

  1. 主线程(Acceptor 线程)

    来新连接 → 调用 getNextLoop()

  2. 某个子 IO 线程

    内部业务逻辑需要分配新任务 → 也调用 getNextLoop()

1
2
EventLoopThreadPool(EventLoop* baseLoop, const std::string& nameArg);
~EventLoopThreadPool();
  • baseLoop主线程的 EventLoop

  • nameArg:线程池名字

1
void start(const ThreadInitCallback& cb = ThreadInitCallback());

核心接口,启动线程池

1
2
3
void setNumThreads(int numThreads) { numThreads_ = numThreads; }
bool started() const { return started_; }
std::string name() const { return name_; }

set()/get()函数

1
2
EventLoop* getNextLoop();
std::vector<EventLoop*> getAllLoops();
  • getNextLoop通过轮询,负载均衡选择子 IO 线程
  • getAllLoops返回存有所有子EventLoop的数组

EventLoopThreadPool.cc

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#include "EventLoopThreadPool.h"
#include "EventLoopThread.h"
#include "assert.h" //断言

EventLoopThreadPool::EventLoopThreadPool(EventLoop* baseLoop, const std::string& nameArg)
: started_(false)
, baseLoop_(baseLoop)
, name_(nameArg)
, numThreads_(0)
, next_(0)
{

}

EventLoopThreadPool::~EventLoopThreadPool()
{

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
void EventLoopThreadPool::start(const ThreadInitCallback& cb)
{
assert(!started_);

for(int i = 0; i < numThreads_; ++i)
{
std::string threadName = name_ + "-" + std::to_string(i);

auto t = std::make_unique<EventLoopThread>(cb, threadName);
loops_.push_back(t->startLoop());
threads_.push_back(std::move(t));
}

//兼容单线程
if(numThreads_ == 0 && cb)
{
cb(baseLoop_);
}

started_ = true;
}
  • std::string threadName = name_ + "-" + std::to_string(i);给线程起名(例:第 1 个线程 → IOThreadPool-1)。

  • auto t = std::make_unique<EventLoopThread>(cb, threadName);

    • 创建一个IO 线程对象:EventLoopThread
    • 传入两个参数:
      1. cb:线程初始化回调(线程启动时执行)
      2. threadName:线程名字
    • 使用 unique_ptr 管理:自动释放,不内存泄漏
  • loops_.push_back(t->startLoop());这是核心

    1. t->startLoop()

      • 真正启动线程
      • 线程内部创建 EventLoop
      • 启动事件循环
      • 返回该线程的 EventLoop* 指针
    2. loops_.push_back(...)

      • 把每个子线程的 EventLoop 保存到数组
      • 后面 getNextLoop() 就是从这里取
  • threads_.push_back(std::move(t));

    • std::move(t):转移所有权
    • 把线程对象交给 threads_ 数组统一管理
    • 作用:保证线程生命周期和线程池一致,线程池销毁,线程自动释放
  • 兼容单线程:

    用户设置 线程数=0(不创建任何子线程),所有网络工作都在主线程执行,但用户传入了初始化回调 cb,需要执行,所以直接在主线程执行:cb(baseLoop_)

1
2
3
4
5
6
7
8
9
10
11
EventLoop* EventLoopThreadPool::getNextLoop()
{
EventLoop* loop = baseLoop_;

if(!loops_.empty())
{
loop = loops_[next_++ % loops_.size()];
}

return loop;
}

轮询选出一个 IO 线程 EventLoop 并返回

  • 如果没有创建任何子线程numThreads=0)返回默认设置的主线程 loop(baseLoop_)
  • loop = loops_[next_++ % loops_.size()]是核心轮询算法
    • next_++原子变量自增,线程安全。原子变量里的++是原子操作,里面重载了++操作符,将三步操作(读→改→写)捆绑为一个整体,不被打断。如果要取出自增前的值时用fetch_add
    • % 取模过程安全,%只是纯数学计算,不碰数据,%不操作共享变量,它只是拿算好的数字算一下下标。**% 取模 读的是:next_++ 已经算好的 “结果值”不是去读正在被别的线程修改的 next_next_++ 是原子操作,它会返回一个固定的、不会变的数值**。(这里读到都是next,不是next+1,因为是前置自增)

错误写法

1
2
3
loop = loops_[next_];  // 读
++next_; // 写
next_ = 0; // 又写
1
2
3
4
5
6
7
8
9
10
11
12
std::vector<EventLoop*> EventLoopThreadPool::getAllLoops()
{
assert(started_);
if(loops_.empty())
{
return std::vector<EventLoop*>(1, baseLoop_);
}
else
{
return loops_;
}
}

返回线程池中所有的EventLoop

  • 单线程 → 返回[baseLoop_]
  • 多线程 → 返回所有子线程的 loop

目的:统一接口,上层代码不用区分单 / 多线程

源码地址

EventLoopThreadPool.h:https://gitee.com/lpzdinghai/lpzmuduo/blob/master/EventLoopThreadPool.h

EventLoopThreadPool.cc:https://gitee.com/lpzdinghai/lpzmuduo/blob/master/EventLoopThreadPool.cc