day13-EventLoopThreadPool
EventLoopThreadPool是muduo 网络库核心的 IO 线程池类,专门用于多 Reactor 网络模型:一个主线程(baseLoop)处理连接,多个子 IO 线程处理读写事件,实现高并发网络编程。
EventLoopThreadPool = 事件循环线程池(预先创建固定数量的长期运行线程,每个线程绑定一个EventLoop,通过轮询分配网络连接)
- 管理一组
EventLoopThread(每个线程自带一个EventLoop) - 对外提供轮询分配IO 线程的能力
- 保证线程安全、不可拷贝(网络库通用设计)
EventLoopThreadPool.h
1 |
|
1 | class EventLoopThreadPool : noncopyable |
- 线程池是全局唯一资源,禁止拷贝
ThreadInitCallback线程初始化回调
1 | private: |
baseLoop_主线程的 EventLoop,Acceptor(接收连接) 依附它numThreads_子 IO 线程的数量next_轮询计数器,原子变量std::vector<std::unique_ptr<EventLoopThread>> threads_存储子线程对象,用unique_ptr自动管理,避免内存泄漏loops_每个子线程启动后,会创建一个EventLoop,统一存在这里,供getNextLoop()快速取用
为什么 next_ 设为原子变量?
因为 getNextLoop() 这个函数,可能被多个不同的线程同时调用。只要有人调用 getNextLoop(),就会读写 next_。用原子变量,不使用 mutex,性能更高
例子:
主线程(Acceptor 线程)
来新连接 → 调用
getNextLoop()某个子 IO 线程
内部业务逻辑需要分配新任务 → 也调用
getNextLoop()
1 | EventLoopThreadPool(EventLoop* baseLoop, const std::string& nameArg); |
baseLoop:主线程的 EventLoopnameArg:线程池名字
1 | void start(const ThreadInitCallback& cb = ThreadInitCallback()); |
核心接口,启动线程池
1 | void setNumThreads(int numThreads) { numThreads_ = numThreads; } |
set()/get()函数
1 | EventLoop* getNextLoop(); |
getNextLoop通过轮询,负载均衡选择子 IO 线程getAllLoops返回存有所有子EventLoop的数组
EventLoopThreadPool.cc
1 |
|
1 | void EventLoopThreadPool::start(const ThreadInitCallback& cb) |
std::string threadName = name_ + "-" + std::to_string(i);给线程起名(例:第 1 个线程 → IOThreadPool-1)。auto t = std::make_unique<EventLoopThread>(cb, threadName);- 创建一个IO 线程对象:EventLoopThread
- 传入两个参数:
cb:线程初始化回调(线程启动时执行)threadName:线程名字
- 使用 unique_ptr 管理:自动释放,不内存泄漏
loops_.push_back(t->startLoop());这是核心t->startLoop()- 真正启动线程
- 线程内部创建
EventLoop - 启动事件循环
- 返回该线程的 EventLoop* 指针
loops_.push_back(...)- 把每个子线程的
EventLoop保存到数组 - 后面
getNextLoop()就是从这里取
- 把每个子线程的
threads_.push_back(std::move(t));std::move(t):转移所有权- 把线程对象交给
threads_数组统一管理 - 作用:保证线程生命周期和线程池一致,线程池销毁,线程自动释放
兼容单线程:
用户设置
线程数=0(不创建任何子线程),所有网络工作都在主线程执行,但用户传入了初始化回调cb,需要执行,所以直接在主线程执行:cb(baseLoop_)。
1 | EventLoop* EventLoopThreadPool::getNextLoop() |
轮询选出一个 IO 线程 EventLoop 并返回
- 如果没有创建任何子线程(
numThreads=0)返回默认设置的主线程 loop(baseLoop_) loop = loops_[next_++ % loops_.size()]是核心轮询算法next_++原子变量自增,线程安全。原子变量里的++是原子操作,里面重载了++操作符,将三步操作(读→改→写)捆绑为一个整体,不被打断。如果要取出自增前的值时用fetch_add。%取模过程安全,%只是纯数学计算,不碰数据,%不操作共享变量,它只是拿算好的数字算一下下标。**% 取模 读的是:next_++ 已经算好的 “结果值”不是去读正在被别的线程修改的next_!next_++是原子操作,它会返回一个固定的、不会变的数值**。(这里读到都是next,不是next+1,因为是前置自增)
❌错误写法
1 | loop = loops_[next_]; // 读 |
1 | std::vector<EventLoop*> EventLoopThreadPool::getAllLoops() |
返回线程池中所有的EventLoop
- 单线程 → 返回
[baseLoop_] - 多线程 → 返回所有子线程的 loop
目的:统一接口,上层代码不用区分单 / 多线程
源码地址
EventLoopThreadPool.h:https://gitee.com/lpzdinghai/lpzmuduo/blob/master/EventLoopThreadPool.h
EventLoopThreadPool.cc:https://gitee.com/lpzdinghai/lpzmuduo/blob/master/EventLoopThreadPool.cc