EventLoopThread = 封装Thread + 自身 EventLoop的线程类,核心实现 one loop per thread(一个线程只跑一个 EventLoop)。

EventLoopThread.h

1
2
3
4
5
6
7
8
9
10
11
12
13
#pragma once

#include "noncopyable.h"
#include "Thread.h"

#include <mutex>
#include <condition_variable>
#include <functional>
#include <string>

class EventLoop;

class EventLoopThread : noncopyable

私有成员变量

1
2
3
4
5
6
7
8
private:
void ThreadFunc();

std::mutex mutex_;
std::condition_variable cond_;
EventLoop* loop_;
ThreadInitCallback callback_;
Thread thread_;

ThreadFunc真正在线程里跑的函数,创建 EventLoop

成员函数

1
2
3
4
5
6
7
8
public:
using ThreadInitCallback = std::function<void(EventLoop*)>;

EventLoopThread(const ThreadInitCallback& cb = ThreadInitCallback(),
const std::string& name = std::string());
~EventLoopThread();

EventLoop* startLoop();

EventLoop* startLoop();真正启动线程,并返回线程里的 EventLoop

EventLoopThread.cc

1
2
#include "EventLoopThread.h"
#include "EventLoop.h"
1
2
3
4
5
6
7
8
9
10
11
12
EventLoopThread::EventLoopThread(const ThreadInitCallback& cb, 
const std::string& name)
: loop_(nullptr)
, cond_()
, mutex_()
, callback_(cb)
, thread_([this](){
ThreadFunc();
})
{

}
  • loop_ 初始为 nullptr,是因为子线程未启动、未创建 EventLoop,只有子线程执行 threadFunc() 时,才会创建 EventLoop 并赋值给 loop_。EventLoop 必须由所属线程自己创建,绝对不能由外部(主线程)创建后传递。
  • thread_在构造函数仅绑定线程函数,不创建线程、不调用 threadFunc()。只有调用 thread_.start() 时,才会创建操作系统级线程,子线程才会执行 threadFunc()。
1
2
3
4
5
6
7
8
EventLoopThread::~EventLoopThread()
{
if(loop_ != nullptr)
{
loop_->quit();
thread_.join();
}
}
  • loop_->quit() 让 EventLoop 停止运行
  • thread_.join()主线程在这里等着子线程彻底结束,防止线程还在跑,对象就没了导致崩溃
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
EventLoop* EventLoopThread::startLoop()
{
thread_.start();

EventLoop* loop = nullptr;

{
std::unique_lock<std::mutex> lock(mutex_);
while(loop_ == nullptr)
{
cond_.wait(lock);
}
}

loop = loop_;
return loop;
}

启动子线程 → 等待子线程创建 EventLoop → 返回 loop 指针(主线程控制子线程的唯一入口)

  1. thread_.start():启动子线程,子线程开始执行 threadFunc()
  2. 主线程加锁等待:用 mutex + condition_variable 阻塞,直到子线程创建好 EventLoop 并通知。
  3. 返回loop_指针:主线程拿到指针后,可用于控制子线程(如停止 loop、分配任务)。

==关键==:while(loop_ == nullptr) 是为了防止虚假唤醒,确保 loop 真正创建完成。(前面Logger日志类有相似的技术实现)

==注意==:loop = loop_指针赋值,不是对象拷贝,与 EventLoop 是否禁止拷贝无关。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
void EventLoopThread::ThreadFunc()
{
EventLoop loop;

//允许用户在 EventLoop 启动前做自定义初始化(比如设置定时器、注册回调)
if(callback_)
{
callback_(&loop);
}

{
std::unique_lock<std::mutex> lock(mutex_);
loop_ = &loop;
cond_.notify_one();
}

loop.loop();

std::unique_lock<std::mutex> lock(mutex_);
loop_ = nullptr;
}
  • EventLoop loop;

    • 在子线程上创建一个 EventLoop 对象

    • 这就是真正跑事件循环的那个 loop

    • 生命周期和子线程一样

  • loop_ 只是一个指针,指向子线程栈上的这个 EventLoop 实体,并非实体本身。

  • loop_ 赋值(loop_ = &loop)时加锁原因:loop_ 被两个线程同时访问(主线程读、子线程写),不加锁会出现数据竞争。

    • 子线程:写 loop_(loop_ = &loop)。
    • 主线程:读 loop_(while(loop_ == nullptr))。
    • ==tip:==这里条件变量实现线程同步,现代C++可以用future+promise重构实现,但主包太懒了,这里就不实现了。

Thread和EventLoopThread流程图

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
【主线程】                              【子线程】
↓ ↓
EventLoopThread thread; │
↓ ↓
startLoop() │
↓ ↓
thread_.start() ───────────────────────→ ThreadFunc()
↓ ↓
wait(cond_) 【主线程阻塞】 创建 EventLoop loop
│ ↓
│ 执行 callback_(&loop)
│ ↓
│ 加锁 → loop_ = &loop
│ cond_.notify_one()
←──────────────────────────────────────┘
主线程被唤醒

返回 loop_


【主线程继续做别的事】 【子线程】

loop.loop() 【死循环处理事件】
↓(quit()被调用)
loop_ = nullptr

线程结束

主线程、子线程具体指什么?

  • 主线程:调用 startLoop() 的线程(通常是 main 函数所在线程),负责启动子线程、等待 loop 创建、控制子线程。
  • 子线程:由 thread_.start() 创建的新线程,负责执行 threadFunc()、创建 EventLoop、运行 loop 循环、处理 IO 事件。

源码地址

EventLoopThread.h:https://gitee.com/lpzdinghai/lpzmuduo/blob/master/EventLoopThread.h

EventLoopThread.cc:https://gitee.com/lpzdinghai/lpzmuduo/blob/master/EventLoopThread.cc