day09-EventLoop
EventLoop 是 muduo 的核心,所有网络操作、所有事件、所有任务,最终都由 EventLoop 驱动。
1个EventLoop管理1个Poller+多个Channel
muduo设计原理:one loop per thread
EventLoop负责在单线程内处理两件事:
- 网络 IO 事件(内核通知的 socket 读写、连接)
- 用户提交的异步任务(其他线程发来的函数)
EventLoop.h
1 |
|
头文件依赖。
1 | class EventLoop : noncopyable |
- 禁止拷贝,禁止赋值。
- 通用回调函数类型。
私有成员变量
1 | std::atomic<bool> looping_; // 是否正在循环 |
threadId_:构造时记录线程 ID用来判断:
isInLoopThread()
1 | std::unique_ptr<Poller> poller_; |
多路分发器(epoll)。
1 | int wakeupFd_; |
唤醒机制
wakeupFd_:eventfd,用来唤醒 epollwakeupChannel_:包装 wakeupFd
1 | using ChannelList = std::vector<Channel*>; |
- 活跃 Channel 列表,epoll 返回的有事件的 channel
1 | std::atomic<bool> callingPendingFunctors_; |
跨线程任务队列,mainLoop把 “处理新连接” 包装成任务,放进 subLoop 的任务队列并唤醒它
pendingFunctors_:其他线程投递的任务mutex_:保护任务队列callingPendingFunctors_:标记是否正在执行任务
内部函数
1 | private: |
handleRead()读 wakeupFd,清空唤醒事件。 EventLoop 构造时,把handleRead函数绑定给了wakeupChannel_。别的线程调用wakeup()写了 8 字节,epoll被唤醒,自动调用handleRead()。doPendingFunctors执行其他线程丢过来的跨线程任务。
核心成员函数
1 | EventLoop(); |
构造和析构。
1 | //开始事件循环 |
loop是核心函数,开始死循环:epoll监听事件 + 处理事件 + 执行任务队列
1 | Timestamp pollReturnTime() const { return pollReturnTime_; } |
返回 事件刚发生的那一刻的时间。
1 | bool isInLoopThread() const { return threadId_ == CurrentThread::tid(); } |
判断当前代码,是不是跑在 EventLoop 自己的线程里
threadId_:loop 自己的线程 IDCurrentThread::tid():当前正在执行代码的线程 ID
1 | void updateChannel(Channel* channel); |
EventLoop通过这三个函数,让 Poller 去操作底层。
1 | //唤醒loop所在线程 |
- 向唤醒 fd 写个数据,把阻塞在 epoll 的 loop 线程叫醒,去执行新任务
- 配合
handleRead使用,一读一写,实现闭环。
1 | //在当前线程执行回调cb |
void runInLoop(Functor cb)让这个函数在 loop 线程里运行void queueInLoop(Functor cb)把 cb 塞进任务队列
EventLoop.cc
1 |
|
kPollTimeMs 是epoll_wait 最大阻塞超时时间(10s)。
1 | int createEventfd() |
创建一个线程唤醒专用文件描述符,专门用来叫醒睡在 epoll 里的 EventLoop 线程
eventfd()是 Linux 系统提供的一个系统调用,专门用来创建一个内核级事件通知文件描述符。- 返回值:一个文件描述符 fd(和 socket、普通文件 fd 一样)
- 作用:内核帮你维护一个 8 字节的计数器,你可以读写这个计数器。
eventfd()参数分析- 0:内核计数器初始值 = 0
EFD_NONBLOCK:非阻塞,读不到数据不卡死EFD_CLOEXEC:fork( 复制进程,生成子进程) 后执行新程序自动关闭 fd,防泄漏
为什么用
eventfd()来唤醒线程?- 它内核自带计数器
- write → 计数器 + 1 → 触发读事件
- read → 清零 → 事件消失
- 轻量,只占一个文件描述符,系统开销极小
- 不用**
pipe(管道)**:要创建两个fd(读端 + 写端),麻烦、占资源,效率不如·eventfd - 不能用 **
socket**:太重,要绑定端口、监听,浪费资源
- 线程安全,多线程同时 write 都没问题
1 | EventLoop::EventLoop() |
构造函数
成员变量初始化
检查判断,一个线程 有且只有一个 EventLoop
绑定唤醒回调,给唤醒 Channel 设置回调:可读时调用
handleRead(),别的线程调用wakeup()→ 写数据 →wakeupChannel 可读 → 自动调用
handleRead()
1 | EventLoop::~EventLoop() |
析构函数
wakeupChannel_->disableAll()取消 Channel 上所有的事件监听(让 epoll 不再监听这个唤醒 fd)wakeupChannel_->remove()把 Channel 从 Poller(epoll)中删除- 关闭系统创建的 eventfd 文件描述符
- 把线程本地存储的 loop 指针清空
1 | void EventLoop::handleRead() |
- 读写一个
uint64_t的变量(正好8字节)。 wakeup写如8字节,wakeup读取8字节,一写一读,形成闭环。- 必须要读,不读的话,事件会一直触发,epoll 死循环。
1 | void EventLoop::loop() |
==核心函数==: 死循环监听 epoll → 有事件就处理 → 一直跑,直到 quit ()。
poller_->poll(...)- 没有事件 → 阻塞等待
- 有事件(连接、数据、
wakeup)→ 立刻返回 - 返回来的就是 activeChannels_(有事件的 Channel 列表)
channel->handleEvent(...)遍历所有Channel,调用它们的handleEventdoPendingFunctors();处理完所有socket IO事件(例如:客户端发来了一条消息。 内核 → Loop 线程),立刻调用一次doPendingFunctors(),执行任务队列里的任务(例如:业务线程算完结果,想把结果发给客户端)
1 | void EventLoop::quit() |
让 loop () 停止循环,如果是别的线程调用,必须 wakeup ()唤醒(loop 线程正在 epoll_wait 睡觉)。
1 | void EventLoop::runInLoop(Functor cb) |
runInLoop保证 cb 一定在 loop 线程执行。- 当前执行流就是 loop 线程 → 同步直接调用 cb。
- 当前执行流不是 loop 线程 → 将 cb 加入任务队列,由 loop 线程异步执行。
queueInLoop把任务放进队列,并且唤醒线程去执行。- 加锁,线程安全地将任务放进队列。
- 唤醒条件:
!isInLoopThread()别的线程来放任务或callingPendingFunctors_当前loop正在执行回调,但是loop又有了新的回调(callingPendingFunctors_在doPendingFunctors()内部会被置true)
- 所有跨线程、延迟、异步执行的回调,最终都走 queueInLoop。
1 | void EventLoop::doPendingFunctors() |
进入函数就修改
callingPendingFunctors_,让queueInLoop知道 “当前正在执行任务”,从而决定是否需要立即wakeup()。==核心设计:==用
swap,而不是直接遍历(减少锁的持有时间)。直接遍历会在执行
functor期间一直持有锁执行
functor可能很慢、耗时、不可控swap仅用很短时间,交换后立即释放锁其他线程可以继续入队,不被阻塞
流程简述:
- 加锁,保护队列
- 局部空 vector 和全局队列交换内容
- 全局队列变空,局部 vector 拿到所有任务
- 解锁
- 后续在局部 vector 上执行,不影响全局队列
1 | void EventLoop::updateChannel(Channel *channel) |
EventLoop 不直接操作 epoll,而是交给 Poller 实际执行。
提示:如果前面有暂时注释掉的代码,这里记得取消注释。
源码地址
EventLoop.h:https://gitee.com/lpzdinghai/lpzmuduo/blob/master/EventLoop.h
EventLoop.cc:https://gitee.com/lpzdinghai/lpzmuduo/blob/master/EventLoop.cc