EventLoop 是 muduo 的核心,所有网络操作、所有事件、所有任务,最终都由 EventLoop 驱动

1个EventLoop管理1个Poller+多个Channel

muduo设计原理:one loop per thread

EventLoop负责在单线程内处理两件事:

  1. 网络 IO 事件(内核通知的 socket 读写、连接)
  2. 用户提交的异步任务(其他线程发来的函数)

EventLoop.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
#pragma once

#include <vector>
#include <atomic>
#include <functional>
#include <mutex>
#include <memory>

#include "noncopyable.h"
#include "CurrentThread.h"
#include "Timestamp.h"

class Channel;
class Poller;

头文件依赖。

1
2
3
4
class EventLoop : noncopyable
{
public:
using Functor = std::function<void()>;
  • 禁止拷贝,禁止赋值。
  • 通用回调函数类型。

私有成员变量

1
2
3
std::atomic<bool> looping_;  // 是否正在循环
std::atomic<bool> quit_; // 是否退出循环
const pid_t threadId_; // 绑定当前线程ID
  • threadId_构造时记录线程 ID

  • 用来判断:isInLoopThread()

1
2
std::unique_ptr<Poller> poller_;
Timestamp pollReturnTime_;

多路分发器(epoll)。

1
2
int wakeupFd_;
std::unique_ptr<Channel> wakeupChannel_;

唤醒机制

  • wakeupFd_:eventfd,用来唤醒 epoll

  • wakeupChannel_:包装 wakeupFd

1
2
using ChannelList = std::vector<Channel*>;
ChannelList activeChannels_;
  • 活跃 Channel 列表,epoll 返回的有事件的 channel
1
2
3
std::atomic<bool> callingPendingFunctors_;
std::vector<Functor> pendingFunctors_;
std::mutex mutex_;

跨线程任务队列,mainLoop把 “处理新连接” 包装成任务,放进 subLoop 的任务队列并唤醒它

  • pendingFunctors_其他线程投递的任务

  • mutex_:保护任务队列

  • callingPendingFunctors_:标记是否正在执行任务

内部函数

1
2
3
private:
void handleRead(); //wake up
void doPendingFunctors(); //执行回调
  • handleRead()读 wakeupFd,清空唤醒事件。 EventLoop 构造时,把 handleRead 函数绑定给了 wakeupChannel_。别的线程调用 wakeup() 写了 8 字节,epoll被唤醒,自动调用 handleRead()
  • doPendingFunctors执行其他线程丢过来的跨线程任务。

核心成员函数

1
2
EventLoop();
~EventLoop();

构造和析构。

1
2
3
4
//开始事件循环
void loop();
//退出事件循环
void quit();

loop是核心函数,开始死循环:epoll监听事件 + 处理事件 + 执行任务队列

1
Timestamp pollReturnTime() const { return pollReturnTime_; }

返回 事件刚发生的那一刻的时间

1
bool isInLoopThread() const { return threadId_ == CurrentThread::tid(); }

判断当前代码,是不是跑在 EventLoop 自己的线程里

  • threadId_loop 自己的线程 ID

  • CurrentThread::tid()当前正在执行代码的线程 ID

1
2
3
void updateChannel(Channel* channel);
void removeChannel(Channel* channel);
bool hasChannel(Channel* channel);

EventLoop通过这三个函数,让 Poller 去操作底层。

1
2
//唤醒loop所在线程
void wakeup();
  • 向唤醒 fd 写个数据,把阻塞在 epoll 的 loop 线程叫醒,去执行新任务
  • 配合handleRead使用,一读一写,实现闭环。
1
2
3
4
//在当前线程执行回调cb
void runInLoop(Functor cb);
//把回调cb放入队列,唤醒loop所在线程,执行cb
void queueInLoop(Functor cb);
  • void runInLoop(Functor cb)让这个函数在 loop 线程里运行
  • void queueInLoop(Functor cb)把 cb 塞进任务队列

EventLoop.cc

1
2
3
4
5
6
7
8
9
10
#include "EventLoop.h"
#include "Poller.h"
#include "Logger.h"
#include "EpollPoller.h"
#include "Channel.h"

#include <sys/eventfd.h>
#include <functional>

const int kPollTimeMs = 10000;

kPollTimeMsepoll_wait 最大阻塞超时时间(10s)。

1
2
3
4
5
6
7
8
9
int createEventfd()
{
int eventfd_ = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
if(eventfd_ < 0)
{
LOG_FATAL("eventfd create failed, errno : {}", errno);
}
return eventfd_;
}

创建一个线程唤醒专用文件描述符,专门用来叫醒睡在 epoll 里的 EventLoop 线程

  • eventfd() 是 Linux 系统提供的一个系统调用,专门用来创建一个内核级事件通知文件描述符

    • 返回值:一个文件描述符 fd(和 socket、普通文件 fd 一样)
    • 作用:内核帮你维护一个 8 字节的计数器,你可以读写这个计数器。
  • eventfd()参数分析

    • 0:内核计数器初始值 = 0
    • EFD_NONBLOCK:非阻塞,读不到数据不卡死
    • EFD_CLOEXEC:fork( 复制进程,生成子进程) 后执行新程序自动关闭 fd,防泄漏
  • 为什么用eventfd()来唤醒线程?

    1. 内核自带计数器
    • write → 计数器 + 1 → 触发读事件
    • read → 清零 → 事件消失
    1. 轻量,只占一个文件描述符,系统开销极小
    • 不用**pipe(管道)**:要创建两个 fd(读端 + 写端),麻烦、占资源,效率不如·eventfd
    • 不能用 **socket**:太重,要绑定端口、监听,浪费资源
    1. 线程安全,多线程同时 write 都没问题
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
EventLoop::EventLoop()
: looping_(false)
, quit_(false)
, callingPendingFunctors_(false)
, poller_(Poller::newDefaultPoller(this))
, threadId_(CurrentThread::tid())
, wakeupFd_(createEventfd())
, wakeupChannel_(new Channel(this, wakeupFd_))
{
LOG_DEBUG("EventLoop create {} in thread {}", this, threadId_);
if(CurrentThread::t_loopInThisThread)
{
LOG_FATAL("Another EventLoop {} exists in this thread {}", CurrentThread::t_loopInThisThread, threadId_);
}
else
{
CurrentThread::t_loopInThisThread = this;
}

wakeupChannel_->setReadCallback(std::bind(&EventLoop::handleRead, this));
wakeupChannel_->enableReading();
}

构造函数

  • 成员变量初始化

  • 检查判断,一个线程 有且只有一个 EventLoop

  • 绑定唤醒回调,给唤醒 Channel 设置回调:可读时调用 handleRead(),别的线程调用 wakeup() → 写数据 →

    wakeupChannel 可读 → 自动调用 handleRead()

1
2
3
4
5
6
7
EventLoop::~EventLoop()
{
wakeupChannel_->disableAll();
wakeupChannel_->remove();
::close(wakeupFd_);
CurrentThread::t_loopInThisThread = nullptr;
}

析构函数

  • wakeupChannel_->disableAll()取消 Channel 上所有的事件监听(让 epoll 不再监听这个唤醒 fd)
  • wakeupChannel_->remove()把 Channel 从 Poller(epoll)中删除
  • 关闭系统创建的 eventfd 文件描述符
  • 把线程本地存储的 loop 指针清空
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
void EventLoop::handleRead()
{
uint64_t one;
ssize_t n = read(wakeupFd_, &one, sizeof one);
if(n != sizeof one)
{
LOG_ERROR("EventLoop::handleRead() reads {} bytes instead of 8", n);
}
}

void EventLoop::wakeup()
{
uint64_t one = 1;
ssize_t n = write(wakeupFd_, &one, sizeof one);
if (n != sizeof one)
{
LOG_ERROR("EventLoop::wakeup() writes %lu bytes instead of 8 \n", n);
}
}
  • 读写一个uint64_t的变量(正好8字节)。
  • wakeup写如8字节,wakeup读取8字节,一写一读,形成闭环。
  • 必须要读,不读的话,事件会一直触发,epoll 死循环。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
void EventLoop::loop()
{
looping_ = true;
quit_ = false;

LOG_INFO("EventLoop {} start looping", this);

while(!quit_)
{
activeChannels_.clear();

Timestamp pollReturnTIme = poller_->poll(kPollTimeMs, &activeChannels_);

for(Channel* channel : activeChannels_)
{
channel->handleEvent(pollReturnTIme);
}

doPendingFunctors();
}

LOG_INFO("EventLoop {} stop looping", this);
looping_ = false;
}

==核心函数==: 死循环监听 epoll → 有事件就处理 → 一直跑,直到 quit ()

  • poller_->poll(...)

    • 没有事件 → 阻塞等待
    • 有事件(连接、数据、wakeup)→ 立刻返回
    • 返回来的就是 activeChannels_(有事件的 Channel 列表)
  • channel->handleEvent(...)遍历所有 Channel,调用它们的 handleEvent

  • doPendingFunctors();处理完所有 socket IO 事件(例如:客户端发来了一条消息。 内核 → Loop 线程),立刻调用一次 doPendingFunctors(),执行任务队列里的任务(例如:业务线程算完结果,想把结果发给客户端)

1
2
3
4
5
6
7
8
9
void EventLoop::quit()
{
quit_ = true;

if(!isInLoopThread())
{
wakeup();
}
}

让 loop () 停止循环,如果是别的线程调用,必须 wakeup ()唤醒(loop 线程正在 epoll_wait 睡觉)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
void EventLoop::runInLoop(Functor cb)
{
if(isInLoopThread())
{
cb();
}
else
{
queueInLoop(cb);
}
}

void EventLoop::queueInLoop(Functor cb)
{
{
std::unique_lock<std::mutex> lock(mutex_);
pendingFunctors_.emplace_back(cb);
}

if(!isInLoopThread() || callingPendingFunctors_)
{
wakeup();
}
}
  • runInLoop保证 cb 一定在 loop 线程执行。
    • 当前执行流就是 loop 线程 → 同步直接调用 cb。
    • 当前执行流不是 loop 线程 → 将 cb 加入任务队列,由 loop 线程异步执行。
  • queueInLoop把任务放进队列,并且唤醒线程去执行。
    • 加锁,线程安全地将任务放进队列。
    • 唤醒条件:!isInLoopThread()别的线程来放任务callingPendingFunctors_当前loop正在执行回调,但是loop又有了新的回调callingPendingFunctors_doPendingFunctors() 内部会被置true
  • 所有跨线程、延迟、异步执行的回调,最终都走 queueInLoop。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
void EventLoop::doPendingFunctors()
{
std::vector<Functor> functors;
callingPendingFunctors_ = true;

{
std::unique_lock<std::mutex> lock(mutex_);
functors.swap(pendingFunctors_);
}

for(auto functor : functors)
{
functor();
}

callingPendingFunctors_ = false;
}
  • 进入函数就修改callingPendingFunctors_,让 queueInLoop 知道 “当前正在执行任务”,从而决定是否需要立即 wakeup()

  • ==核心设计:==swap,而不是直接遍历(减少锁的持有时间)。

    • 直接遍历会在执行 functor 期间一直持有锁

    • 执行 functor 可能很慢、耗时、不可控

    • swap 仅用很短时间,交换后立即释放锁

    • 其他线程可以继续入队,不被阻塞

  • 流程简述:

    1. 加锁,保护队列
    2. 局部空 vector 和全局队列交换内容
    3. 全局队列变空,局部 vector 拿到所有任务
    4. 解锁
    5. 后续在局部 vector 上执行,不影响全局队列
1
2
3
4
5
6
7
8
9
10
11
12
13
14
void EventLoop::updateChannel(Channel *channel)
{
poller_->updateChannel(channel);
}

void EventLoop::removeChannel(Channel *channel)
{
poller_->removeChannel(channel);
}

bool EventLoop::hasChannel(Channel *channel)
{
return poller_->hasChannel(channel);
}

EventLoop 不直接操作 epoll,而是交给 Poller 实际执行。

提示:如果前面有暂时注释掉的代码,这里记得取消注释。

源码地址

EventLoop.h:https://gitee.com/lpzdinghai/lpzmuduo/blob/master/EventLoop.h

EventLoop.cc:https://gitee.com/lpzdinghai/lpzmuduo/blob/master/EventLoop.cc