Channel是干什么的?

Channel 是对 Linux epoll 模型的 C++ 面向对象封装。

它把:

  • 文件描述符 (fd)
  • fd 感兴趣的事件 (events)
  • 事件发生后的回调函数 (callback)
  • 实际发生的事件 (revents)

这四样东西封装成一个类,交给 EventLoop 统一管理。

Channel只做事件分发,不做任何 I/O 操作、不处理业务逻辑,真正读数据、处理连接、处理业务的是:TcpConnection而 Channel 只负责 通知 TcpConnection

它的工作只有三件事:

  1. 向 epoll 注册 / 修改 / 删除 fd 的监听事件
  2. 保存用户设置的回调函数
  3. epoll 检测到事件后,调用对应的回调函数

Channel和别的模块的关联

Channel 就是 TcpConnection 和 EventLoop 之间的桥梁:

  • TcpConnection 给 Channel 注册回调
  • Channel 给 EventLoop 提供 fd 和事件
  • EventLoop 让 Poller 监听
  • 事件来了,Channel 调用回调通知 TcpConnection

启动 / 注册监听流程

1
2
3
4
5
6
7
8
9
10
11
12
TcpConnection::connectEstablished()

channel_->setReadCallback( std::bind( &TcpConnection::handleRead, this ) )
channel_->enableReading()

Channel::update()

EventLoop::updateChannel( Channel* )

Poller::updateChannel( Channel* )

epoll_ctl( EPOLL_CTL_ADD/MOD ) // 内核注册 fd 监听

事件发生后的完整调用链

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
1. 客户端发送数据 → 内核产生事件
2. epoll_wait() 返回活跃事件
3. Poller::poll() → 把事件填入 Channel::revents_
4. EventLoop::loop() 拿到 Channel
5. Channel::handleEvent(Timestamp)

内部判断 revents_
↳ EPOLLIN → readCallback_()
↳ EPOLLOUT → writeCallback_()
↳ EPOLLHUP → closeCallback_()
↳ EPOLLERR → errorCallback_()

6. TcpConnection::handleRead() / handleWrite() / handleClose()

7. 执行业务逻辑

Channel.h

1
2
3
4
5
6
7
8
9
#pragma once

#include <functional>
#include <memory>

#include "noncopyable.h"
#include "Timestamp.h"

class EventLoop;

class EventLoop前置声明,但不包含头文件,避免循环依赖

1
2
3
4
5
6
class Channel :noncopyable
{
public:
using EventCallback = std::function<void()>;
using ReadEventCallback = std::function<void(Timestamp)>;
}
  • Channel 继承 noncopyable,表示这个类 不能被拷贝、不能被赋值。如果允许拷贝,会出现两个 Channel 管理同一个 fd,导致崩溃

  • noncopyable前不写如何继承,默认private继承。

  • 定义两个回调,EventCallback ReadEventCallback

成员变量

1
2
3
static const int kNoneEvent; //不监听任何事件
static const int kReadEvent; //监听可读事件
static const int kWriteEvent;//监听可写事件

静态常量(所有 Channel 共享)

1
2
3
4
5
EventLoop* loop_;
const int fd_;
int events_; //我感兴趣什么事件(告诉epoll)
int revents_; //实际上发生了什么事件(epoll告诉我)
int index_;
  • EventLoop* loop_这个 Channel 属于哪一个事件循环,一个 Channel 只属于一个 EventLoop

  • 一个 fd 永远只对应一个 Channel,epoll 真正监听的就是它。

  • events_是业务逻辑里”我“想让 epoll 监听什么事件。

  • revents_是epoll 内核实际返回的事件。

    • 谁来填?Poller(epoll 封装)调用 channel->set_revents(...) 填入。

    • 谁使用?handleEvent() 根据它判断该调用哪个回调。

  • int index_;给 Poller(EpollPoller)内部使用的状态标记。Poller 用它来判断调用 epoll_ctl 时用 ADD / MOD / DEL

1
2
3
4
ReadEventCallback readCallback_;
EventCallback writeCallback_;
EventCallback closeCallback_;
EventCallback errorCallback_;

它们是 4 个可调用对象,Channel 只负责分发调用,业务逻辑由上层(TcpConnection)实现。**handleEvent**:根据 revents_ 分发事件,调用对应回调。

1
2
std::weak_ptr<void> tie_;
bool tied_;
  • std::weak_ptr<void> tie_弱引用,不持有所有权、不影响对象销毁;
  • bool tied_绑定状态标记,判断是否已经绑定过

内部函数

1
2
3
private:
void update();
void handleEventWithGuard(Timestamp reveiveTime);
  • update():把当前 Channel 最新的 events_(感兴趣的事件)同步给 EventLoop,最终让 epoll 内核更新对这个 fd 的监听。
  • handleEventWithGuard()handleEvent()内部调用,真正处理事件的函数。

成员函数

1
2
Channel(EventLoop* loop, int fd);
~Channel();

构造函数和析构函数。

1
2
3
4
5
6
int fd() const { return fd_; }
int events() const { return events_; }
void set_revents(int revents) { revents_ = revents; }
int index() const { return index_; }
void set_index(int index){ index_ = index; }
EventLoop* ownerLoop() const { return loop_; }

get()/set()函数

1
2
3
4
void setReadCallback(ReadEventCallback cb) { readCallback_ = std::move(cb); }
void setWriteCallback(EventCallback cb) { writeCallback_ = std::move(cb); }
void setCloseCallback(EventCallback cb) { closeCallback_ = std::move(cb); }
void setErrorCallback(EventCallback cb) { errorCallback_ = std::move(cb); }

设置回调函数。

1
void remove();

将当前 Channel 从它所属的 EventLoop 和 Poller (epoll) 中彻底移除,不再监听任何事件。调用后,fd 不再被 epoll 管理,Channel 失效。

1
2
3
4
//返回当前fd的事件状态
bool isNoneEvent() const { return events_ == kNoneEvent; }
bool isReading() const { return events_ & kReadEvent; }
bool isWriting() const {return events_ & kWriteEvent; }

判断当前 Channel 监听读、写,还是都不监听。

==为啥用&不用==?==

events_位标志,可以同时监听多个事件

1
events_ = 读 | 写;  // 二进制:0011
  • 读 = 0001

  • 写 = 0010

  • 读 + 写 = 0011

==判断完全相等&按位与运算判断是否包含某一个标志位,用&哪怕同时监听了读 + 写,也能正确检测出 包含读

1
2
3
4
5
void enableReading() { events_ |= kReadEvent; update(); }
void disableReading() { events_ &= ~kReadEvent; update(); }
void enableWriting() { events_ |= kWriteEvent; update(); }
void disableWriting() { events_ &= ~kWriteEvent; update(); }
void disableAll() { events_ = kNoneEvent; update(); }

events_多位状态,必须用位运算添加 / 删除某一位,不能影响其他位。

|= 用来添加事件,不影响其他事件。

例子:

1
2
3
4
5
events_ 原来 = 0010 (只监听写)
kReadEvent = 0001

按位或后
0010 | 0001 = 0011 (现在 读+写 都监听)

&= ~用来删事件。

  • ~ :按位取反(把 0 变 1,1 变 0)

例子:

1
2
kReadEvent   = 0001
~kReadEvent = 1110
  • &全是1才是1

例子:

1
2
3
4
5
events_ 原来 = 0011 (读+写)
~kReadEvent = 1110

按位与后
0011 & 1110 = 0010 (只保留写)
1
void handleEvent(Timestamp receiveTime);

这是核心函数,由EventLoop主动调用,配合handleEventWithGuard,根据 revents_ 事件类型,分发调用 4 大回调。

1
2
// 防止当channel被手动remove掉,channel还在执行回调操作
void tie(const std::shared_ptr<void>&);

绑定后,回调执行前必须检查连接(TcpConnection)是否存活,防止连接已销毁,Channel 还在执行回调导致崩溃

Channel.cc

1
2
3
4
5
6
7
8
9
#include "Logger.h"
#include "Channel.h"
#include "EventLoop.h"

#include <sys/epoll.h>

const int Channel::kNoneEvent = 0;
const int Channel::kReadEvent = EPOLLIN | EPOLLPRI; //EPOLLPRI是带外数据(紧急数据)可读
const int Channel::kWriteEvent = EPOLLOUT;

初始化三个Channel 固定事件常量,给 events_ / revents_ 使用。

这里建议使用**const常量,不建议使用#define,因为宏定义可以随便改,随便覆盖,且没有作用域,污染全局**,在C++中不建议使用#define宏定义。

1
2
3
4
5
6
7
8
9
10
Channel::Channel(EventLoop* loop, int fd)
: loop_(loop), fd_(fd), events_(0), revents_(0), index_(-1)
{

}
`
Channel::~Channel()
{

}

析构函数空实现,不关闭 fd,不释放 loop(fd 由外部 TcpConnection/Acceptor 管理)。

1
2
3
4
5
6
7
8
9
void Channel::update()
{
loop_->updateChannel(this);
}

void Channel::remove()
{
loop_->removeChannel(this);
}

**update ()**:通知 loop 更新本 Channel 的监听事件(执行 epoll_ctl)

**remove ()**:通知 loop 从 epoll 中删除本 Channel

因为Channel只封装,不操作epoll,所以通过Channel成员中的loop_,让 Channel所属的EventLoop 把当前 Channel 从 epoll 监听中删除,具体函数在后续EventLoop中实现。

==这里会报错因为EventLoop还没实现这两个函数,可以先注释掉。==

1
2
3
4
5
void Channel::tie(const std::shared_ptr<void> &obj)
{
tie_ = obj;
tied_ = true;
}
  • 弱引用绑定宿主对象

  • 标记已绑定

  • 这个函数主要是解决循环引用问题:

    • muduo 里有两个核心对象:

      1. TcpConnection(连接)
      2. Channel(事件通道)
    • 它们的关系:

      • TcpConnection 持有 Channel 的 unique_ptr(强拥有)
      • Channel 要执行回调,必须访问 TcpConnection
    • 如果 Channel 用 shared_ptr 持有 TcpConnection:他们将相互持有对方的强引用,双方将永远无法销毁

    • Channel 不持有 shared_ptr,只持有 weak_ptr,弱引用,不增加引用计数,不拥有对象,不会造成循环引用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
void Channel::handleEvent(Timestamp receiveTime)
{
if(tied_)
{
std::shared_ptr<void> guard = tie_.lock();
if(guard)
{
handleEventWithGuard(receiveTime);
}
}
else
{
handleEventWithGuard(receiveTime);
}
}

void Channel::handleEventWithGuard(Timestamp receiveTime)
{


if((revents_ & EPOLLHUP) && !(revents_ & EPOLLIN))
{
if(closeCallback_)
{
closeCallback_();
}
}

if(revents_ & EPOLLERR)
{
if(errorCallback_)
{
errorCallback_();
}
}

if(revents_ & (EPOLLIN | EPOLLPRI))
{
if(readCallback_)
{
readCallback_(receiveTime);
}
}

if(revents_ & EPOLLOUT)
{
if(writeCallback_)
{
writeCallback_();
}
}
}
  • handleEvent只有宿主对象活着,才执行回调;死了就直接跳过。

    • tied_ == true:说明这个 Channel 绑定了一个宿主(比如 TcpConnection)

    • tie_.lock():尝试把弱引用升级为强引用

      • 成功 → 对象活着
      • 失败 → 对象已销毁
    • guard局部强智能指针:确保在执行回调期间,宿主对象绝对不会被销毁

  • handleEventWithGuard:根据 epoll 触发的事件,调用对应的回调函数,是真正处理 IO 事件的地方

  • EPOLLHUP 连接挂断,加 !(revents_ & EPOLLIN) 是为了避免未读完数据就关闭

  • 内层if判断回调是否为空,是为了避免空指针调用,防止程序崩溃。

源码地址

Channel.h:https://gitee.com/lpzdinghai/lpzmuduo/blob/master/Channel.h

Channel.cc:https://gitee.com/lpzdinghai/lpzmuduo/blob/master/Channel.cc