1️⃣ 什么是 IOManager 模块

IOManager 模块可以理解为 Sylar 框架里的“协程版 I/O 事件循环 + 调度中枢”。它把 Linuxepoll 事件机制和协程调度器 Scheduler 融合在了一起。

当某个文件描述符 socketpipe 等变得可读/可写时,IOManager 不会像传统 Reactor 那样直接在 epoll 线程里执行回调,而是把对应的回调函数或协程 Fiber 重新投递到调度器队列中,由线程池里的工作线程去执行。这样一来,业务逻辑可以用“同步写法”的协程风格组织,却能获得“异步 I/O”的高并发能力。

更具体地说,IOManager 维护了一个 FdContext 表,用来记录每个 fd 关注的读/写事件以及事件触发后要执行的回调函数等待中的协程。它内部的 idle() 协程长期阻塞在 epoll_wait

一旦事件到来,就通过 triggerEvent() 将任务调度出去;同时它还继承 TimerManager,把定时器超时回调也纳入同一套调度体系,实现“IO 事件 + 定时任务”的统一驱动。为了在新增任务定时器更新时及时唤醒 epoll_waitIOManager 还使用 pipetickle 机制向 epoll 注入一个可读事件,让空闲线程立刻从阻塞中返回,继续处理新的调度工作。

2️⃣ 整体框架

IOManager

3️⃣ 嵌套类

enum Event

1
2
3
4
5
6
enum Event
{
NONE = 0x0,
READ = 0x1,
WRITE = 0x4
};

enum Event 用来表示 IOManager 关心的 I/O 事件类型,并且用位标志(bitmask)的方式存储/组合它们

  • NONE = 0x0:不关注任何事件
  • READ = 0x1:关注“可读事件”(比如 socket 有数据可读 / pipe 可读 / 接收缓冲区有数据)
  • WRITE = 0x4:关注“可写事件”(比如 socket 发送缓冲区有空间,可以继续发送)

strcut EventContext

1
2
3
4
5
6
struct EventContext
{
Scheduler* scheduler = nullptr;
std::function<void()> cb;
Fiber::ptr fiber;
};

EventContext 的作用就是:描述“某个 fd 的某个事件(READWRITE)触发后,该由哪个调度器 Scheduler 去执行什么回调函数或协程”。

  • scheduler
    • 记录事件触发后要把任务投递到哪个调度器 Scheduler 里执行(通常就是当前线程的 Scheduler::GetThis())。因为 IOManager 可能有多个调度线程,必须知道回到哪个调度器。
  • cb
    • 如果用户在 addEvent(fd, event, cb) 传了回调,那么事件就绪时执行这个回调函数。
  • fiber
    • 如果用户没有传回调,会把协程存进来,事件触发时就把这个协程重新 schedule(),从而实现“协程等待 IO → IO 就绪后继续跑”。

struct FdContext

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
struct FdContext
{
using MutexType = Mutex;

/**
* @brief 事件绑定的回调函数
*/
struct EventContext
{
Scheduler* scheduler = nullptr;
std::function<void()> cb;
Fiber::ptr fiber;
};

/**
* @brief 获取事件上下文类
* @param[in] event 事件类型
* @return 返回对应事件的上下文
*/
EventContext& getEventContext(Event event);

/**
* @brief 重置事件上下文
* @param[in, out] ctx 待重置的事件上下文对象
*/
void resetEventContext(EventContext &ctx);

/**
* @brief 触发事件
* @details 根据事件类型调用对应上下文结构中的调度器去调度回调协程或回调函数
* @param[in] event 事件类型
*/
void triggerEvent(Event event);

int fd = 0; // 对应的文件描述符
EventContext read; // 读事件
EventContext write; // 写事件
Event m_events = NONE; // 事件集(每一位对应读写事件)
MutexType m_mutex; // 互斥锁
};

FdContext 的作用是:把一个文件描述符 fd 的“事件注册状态 + 事件触发后的执行体”完整封装起来,让 IOManager 能把 epoll 返回的事件,准确转换成“调度器要执行的协程/回调”。

  • FdContext 中的 m_event 记录当前这个 fd 注册了哪些事件
    • m_event & Read 表示在 fd 上注册了读事件
    • m_event & Write 表示在 fd 上注册了写事件
  • FdContext 中的 EventContext 记录当前这个 fd 对应读写事件的回调函数 cb 和协程 fiber
  • getEventContext 用于根据事件 event 的类型,返回对应事件所注册的 EventContext
  • triggerEvent 用于根据事件 event 的类型,将这个事件所注册的回调函数 cb 以及协程 fiber 加入到 EventContext 所保存的调度器 scheduler 中进行调度。

4️⃣ 重要函数

构造函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
IOManager::IOManager(size_t threads, bool use_caller, const std::string &name)
: Scheduler(threads, use_caller, name)
{
// 创建一个epoll实例,返回对应的内核事件表
m_epfd = epoll_create(5000);
SYLAR_ASSERT(m_epfd > 0);

// 创建一个管道,返回对应的文件描述符(句柄)
int rt = pipe(m_tickleFds);
SYLAR_ASSERT(rt == 0);

// 创建一个epoll_event事件,用于联系 fd 和 fd 对应的读写事件
epoll_event event;
bzero(&event,sizeof(event));
event.events = EPOLLIN | EPOLLET;
event.data.fd = m_tickleFds[0];

// 修改m_tickleFds[0]的属性为非阻塞模式
rt = fcntl(m_tickleFds[0],F_SETFL, O_NONBLOCK);
SYLAR_ASSERT(!rt);

// 往内核事件表上注册fd和其对应的事件
rt = epoll_ctl(m_epfd, EPOLL_CTL_ADD, m_tickleFds[0], &event);
SYLAR_ASSERT(!rt);

// 默认设置64个大小
contextResize(32);

// 默认启动 协程调度器Scheduler
start();
}

IOManager 继承自 Scheduler,因此在构造 IOManager 对象时,会先执行基类 Scheduler 的构造函数,完成调度器的基础初始化工作(例如线程数配置、是否使用 caller 线程、调度器名称设置等)。Scheduler 的完整初始化与运行机制我已在协程调度模块中详细展开,本文不再重复啰嗦,只保留与 IOManager 相关的部分作为铺垫。

解析:

  • 首先,IOManager 的构造函数通过系统调用 epoll_create 建立内核事件表。
    • m_epfdepoll 的实例句柄,对应内核中的事件表。
    • 后续所有 fd 的读写事件监听,都会通过 epoll_ctl 注册到这个 epoll 实例中。
    • idle() 协程则会阻塞在 epoll_wait(m_epfd, ...) 上等待事件发生(IO 就绪 / tickle 唤醒 / 超时返回)。
  • 其次,建立 tickle 机制:用 pipe 唤醒阻塞的 epoll_waitidle 协程
    • IOManager 最关键的问题之一是:当 idle 协程正阻塞在 epoll_wait 时,如何让它立刻返回去重新检查调度队列?

    • Sylar 中的实现是使用一对 pipe 来作为唤醒管道

      1
      2
      int rt = pipe(m_tickleFds);
      SYLAR_ASSERT(rt == 0);
      • m_tickleFds[0]:读端(注册进 epoll
      • m_tickleFds[1]:写端(tickle 时写入一个字节)
  • 之后,会将读端注册到 epoll,这里监听的是 EPOLLIN(可读事件发生),并使用 EPOLLET(边缘触发)。
    • 由于执行 Scheduler::run() 的线程发现任务队列 m_task 没有任务的时候,会执行协程 idle(),陷入到 epoll_wait 中等待事件的发生。
    • 但是当其他线程执行 Scheduler::run() 发现除了自己执行的任务之外还有任务的时候,会调用 tickle() 往写端写入数据时,此时读端变得可读,idle 协程就会从 epoll_wait 立刻返回,从而唤醒 idleidle() 协程会在本轮完成事件分发:将 IO 就绪事件/超时定时器对应的 fiber/cb 重新 schedule() 投递回调度器队列;随后 yield() 让出执行权,由 Scheduler::run() 再去真正执行这些任务。
    • 为了避免读端阻塞,还需要把读端设为非阻塞模式。
  • 然后,初始化 fd 上下文表:建立 fdFdContext 的映射
    1
    contextResize(32);
    • m_fdcontexts 是一个 “fd 下标 → FdContext” 的数组结构。
    • 构造阶段先预分配一段容量,后续 addEvent(fd, ...) 时如果 fd 更大,再动态扩容。
  • 最后,启动调度器:让 IOManager 正式开始工作
    1
    start();
    • 这一步会启动 Scheduler 的工作线程,并进入调度循环。之后 IOManageridle() 协程会在空闲时阻塞在 epoll_wait,等待 IO 事件或 tickle 唤醒;一旦事件发生,就将对应的 fiber/cb 投递回调度器执行。

contextResize 函数

1
2
3
4
5
6
7
8
9
10
11
12
void IOManager::contextResize(size_t size)
{
m_fdcontexts.resize(size);
for (size_t i = 0; i < m_fdcontexts.size(); i++)
{
if (!m_fdcontexts[i])
{
m_fdcontexts[i] = new FdContext;
m_fdcontexts[i]->fd = i;
}
}
}

作用是:扩容并初始化 IOManagerfd 上下文表 m_fdcontexts,建立 “fd → FdContext” 的映射,保证后续对某个 fd 注册事件时,能快速拿到对应的 FdContext

tickle 函数

1
2
3
4
5
6
7
8
9
10
void IOManager::tickle()
{
SYLAR_LOG_DEBUG(g_logger) << "tickle";
if (!hasIdleThreads())
{
return;
}
int rt = write(m_tickleFds[1], "T", 1);
SYLAR_ASSERT(rt == 1);
}

IOManager::tickle() 的作用是:唤醒可能正在 epoll_wait 中阻塞的空闲线程(idle 协程),让调度器立刻重新检查任务队列/定时器,从而及时调度新任务。

它做法很经典:往 pipe 管道写一个字节制造“可读事件”。

addEvent 函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
int IOManager::addEvent(int fd, Event event, std::function<void()> cb)
{
FdContext* fd_ctx = nullptr;
RWMutexType::ReadLock lock(m_mutex);
if ((int)m_fdcontexts.size() > fd)
{
fd_ctx = m_fdcontexts[fd];
lock.unlock();
}
else
{
lock.unlock();
RWMutexType::WriteLock lock2(m_mutex);
contextResize(fd * 1.5);
fd_ctx = m_fdcontexts[fd];
}

// 同一个fd不允许重复添加相同的事件
FdContext::MutexType::Lock lock3(fd_ctx->m_mutex);
if (SYLAR_UNLIKELY(fd_ctx->m_events & event)) // 这表示事件类型重复,比如都是读事件
{
SYLAR_LOG_ERROR(g_logger) << "addEvent assert fd=" << fd
<< " event=" << event
<< " fd_ctx.event=" << fd_ctx->m_events;
SYLAR_ASSERT(!(fd_ctx->m_events & event));
}

// 将新的事件加入epoll_wait,使用epoll_event的私有指针存储FdContext的位置
int op = fd_ctx->m_events ? EPOLL_CTL_MOD : EPOLL_CTL_ADD;
epoll_event epevent;
epevent.events = EPOLLET | static_cast<EPOLL_EVENTS>(fd_ctx->m_events) | static_cast<EPOLL_EVENTS>(event);
epevent.data.ptr = fd_ctx;

int rt = epoll_ctl(m_epfd, op, fd, &epevent);
if (rt)
{
SYLAR_LOG_ERROR(g_logger) << "epoll_ctl(" << m_epfd << ", "
<< op << ", " << fd << ", " << (EPOLL_EVENTS)epevent.events << "):"
<< rt << " (" << errno << ") (" << strerror(errno) << ") fd_ctx->events="
<< (EPOLL_EVENTS)fd_ctx->m_events;
return -1;
}

// 待执行IO事件数加1
++m_pendingEventCount;

// 找到这个fd的event事件对应的EventContext,对其的scheduler、fiber、cb进行赋值
fd_ctx->m_events = static_cast<Event>(fd_ctx->m_events | event);
FdContext::EventContext& event_ctx = fd_ctx->getEventContext(event);
SYLAR_ASSERT(!event_ctx.scheduler && !event_ctx.fiber && !event_ctx.cb);

// 赋值scheduler和回调函数,如果回调函数为空,则把当前协程当成回调执行体
event_ctx.scheduler = Scheduler::GetThis();
if (cb)
{
event_ctx.cb.swap(cb);
}
else
{
event_ctx.fiber = Fiber::GetThis();
SYLAR_ASSERT2(event_ctx.fiber->getState() == Fiber::RUNNING, "state=" << event_ctx.fiber->getState());
}

return 0;
}

IOManager::addEvent 的作用是:在指定 fd 上注册一个 IO 事件(READ/WRITE)到 epoll,同时把“事件触发后要执行的内容”绑定到该 fd 的上下文里。这样当 epoll 检测到该 fd 就绪时,IOManager 就能把对应的 cbfiber 投递回 Scheduler 去执行。

解析

  • 首先,创建或者定位 fd 对应的 FdContext
    • 如果 m_fdcontexts 的容量够用,直接返回 fd 对应的 FdContext
    • 否则,对 m_fdcontexts 进行扩容,再返回 fd 对应的 FdContext
  • 其次,保证不允许对同一 fd 重复注册同一种事件。
  • 然后,将对 fd 的事件注册到 epoll
    • 先决定操作类型 op
      • 若该 fd 之前没有注册过任何事件:EPOLL_CTL_ADD
      • 若已有 fd 之前注册过事件 event,比如之前已注册 READ,现在加 WRITEEPOLL_CTL_MOD
    • 构造 epoll_event
      • epevent.events = EPOLLET | 旧事件 | 新事件
      • epevent.data.ptr = fd_ctx,这里非常重要,因为当 epoll 检测到该 fd 就绪后,可以直接通过 epoll_event.data.ptr 找到注册到这个事件的协程/回调函数 fiber/cb
    • 调用 epoll_ctl 注册/修改内核事件表 m_epfd 监听该 fd 的所关注事件。
  • 之后,通过 m_pendingEventCount 记录当前 “已注册但尚未触发” 的 IO 事件数,用于 stopping() 判断(是否还有 IO 事件挂着,不能退出)。
  • 最后,绑定事件触发后的执行体(cbfiber)到 EventContext
    • 更新 fd_ctx 关注的事件:fd_ctx->m_events |= event
    • 找到事件 event 对应的协程/回调函数EventContext& event_ctx = getEventContext(event)
    • 绑定调度器:event_ctx.scheduler = Scheduler::GetThis();
    • 绑定执行体:
      • 如果传了 cb:保存回调函数,之后事件触发后就 schedule 这个 cb 函数
      • 如果没传 cb:保存当前协程 Fiber::GetThis(),表示“当前协程在等这个 IO 事件”。

delEvent 函数

同理于 addEvent 函数,只不过作用相反

cancelEvent 函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
bool IOManager::cancelEvent(int fd, Event event)
{
// 找到fd对应的FdContext
RWMutexType::ReadLock lock(m_mutex);
if (m_fdcontexts.size() <= fd) {
return false;
}
FdContext *fd_ctx = m_fdcontexts[fd];
lock.unlock();

FdContext::MutexType::Lock lock2(fd_ctx->m_mutex);
if (!(fd_ctx->m_events & event)) {
return false;
}

// 删除事件
Event new_events = (Event)(fd_ctx->m_events & ~event);
int op = new_events ? EPOLL_CTL_MOD : EPOLL_CTL_DEL;
epoll_event epevent;
epevent.events = EPOLLET | static_cast<EPOLL_EVENTS>(new_events);
epevent.data.ptr = fd_ctx;

int rt = epoll_ctl(m_epfd, op, fd, &epevent);
if (rt) {
SYLAR_LOG_ERROR(g_logger) << "epoll_ctl(" << m_epfd << ", "
<< op << ", " << fd << ", " << (EPOLL_EVENTS)epevent.events << "):"
<< rt << " (" << errno << ") (" << strerror(errno) << ")";
return false;
}

// 删除之前触发一次事件
fd_ctx->triggerEvent(event);
// 活跃事件数减1
--m_pendingEventCount;
return true;
}

该函数主要用于取消某个 fd 上已经注册的指定事件(READ/WRITE),把它从 epoll 的监听集合里移除,同时“主动触发一次”该事件对应的 cb/fiber,让等待它的任务立刻被调度执行(通常用于唤醒正在等待 IO 的协程,避免永远挂起)

解析

1
fd_ctx->triggerEvent(event);

首先,addEvent 的本质:不是“监听” ,而是“挂起一个等待者

  • 当调用 addEvent(fd, READ) [不传 cb]时,典型使用方式是:
    1. addEvent 把当前协程保存到 `fd_ctx->read.fiber
    2. 然后当前协程 yield(),让出执行权
    3. READ 就绪时,IOManager 才会 schedule(fiber) 把它恢复。
      所以这个 fiber 现在的状态是:我在等 READ 才能继续。

其次,如果只“取消监听”,但不唤醒,会发生什么?

  • 这个协程已经 yield() 了,它不会自己醒。
  • 你又把 epoll 监听删了,未来也不会再触发 READ
  • 结果这个协程永远不会被恢复(逻辑死锁/资源泄漏)

cancelAll 函数

同理于 cancelEvent 函数,主要用于取消 fd 关注的所有事件。

idle 函数

IOManager 最重要的函数:主要描述了一个线程在无法在任务队列 m_task 取得任务 task 后,会将执行权给 idle 协程,idle 协程会给出线程在空闲时会做些什么!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
void IOManager::idle()
{
SYLAR_LOG_DEBUG(g_logger) << "idle";

// 一次epoll_wait 最多检测256个就绪事件,如果就绪事件超过了这个数,那么会在下轮epoll_wait继续处理
const uint64_t MAX_EVENTS = 256;
epoll_event* events = new epoll_event[MAX_EVENTS]();
std::shared_ptr<epoll_event> shared_events(events, [](epoll_event* ptr){
delete[] ptr;
});

while(true)
{
// 获取下一个定时器的超时时间,顺便判断调度器是否停止
uint64_t next_timeout = 0;
if (SYLAR_UNLIKELY(stopping(next_timeout)))
{
SYLAR_LOG_DEBUG(g_logger) << "name= " << getName() << " idle stopping exit";
break;
}

// 阻塞在epoll_wait上,等待事件发生或定时器超时
int rt = 0;
do
{
// 默认超时时间5秒,如果下一个定时器的超时时间大于5秒,仍以5秒来计算超时,避免定时器超时时间太大时,epoll_wait一直阻塞
static const int TIMEOUT = 5000;
if(next_timeout != ~0ull)
{
next_timeout = std::min((int)next_timeout, TIMEOUT);
}
else
{
next_timeout = TIMEOUT;
}

rt = epoll_wait(m_epfd, events, MAX_EVENTS, (int)next_timeout);
if (rt < 0 && errno == EINTR)
{
continue;
}
else
{
break;
}
} while (true);

// 收集所有已超时的定时器,执行回调函数
std::vector<std::function<void()>> cbs;
listExpireCb(cbs);
if(!cbs.empty())
{
for(const auto &cb : cbs)
{
schedule(cb);
}
cbs.clear();
}

// 遍历所有发生的事件,根据epoll_event的私有指针找到对应的FdContext,进行事件处理
for (size_t i = 0; i < rt; ++i)
{
epoll_event& event = events[i];
if (event.data.fd == m_tickleFds[0])
{
// ticklefd[0] 用于通知协程调度,这时只需要把管道里的内容读完即可
uint8_t dummy[256];
while (read(m_tickleFds[0], dummy, sizeof(dummy)) > 0)
;
continue;
}
FdContext* fd_ctx = (FdContext*)event.data.ptr;
FdContext::MutexType::Lock lock(fd_ctx->m_mutex);
/**
* EPOLLERR: 出错,比如写读端已经关闭的pipe
* EPOLLHUP: 套接字对端关闭
* 出现这两种事件,应该同时触发fd的读和写事件,否则有可能出现注册的事件永远执行不到的情况
*/
if (event.events & (EPOLLERR | EPOLLHUP))
{
event.events |= (EPOLLERR | EPOLLHUP) & fd_ctx->m_events;
}
int real_events = NONE;
if (event.events & EPOLLIN)
{
real_events |= READ;
}
if (event.events & EPOLLOUT)
{
real_events |= WRITE;
}
if ((fd_ctx->m_events & real_events) == NONE)
{
continue;
}

// 剔除已经发生的事件,将剩下的事件重新加入epoll_wait
int left_events = (fd_ctx->m_events & ~real_events);
int op = left_events ? EPOLL_CTL_MOD : EPOLL_CTL_DEL;
event.events = EPOLLET | left_events;

int rt2 = epoll_ctl(m_epfd, op, fd_ctx->fd, &event);
if (rt2)
{
SYLAR_LOG_ERROR(g_logger) << "epoll_ctl(" << m_epfd << ", "
<< op << ", " << fd_ctx->fd << ", " << (EPOLL_EVENTS)event.events << "):"
<< rt2 << " (" << errno << ") (" << strerror(errno) << ")";
continue;
}

// 处理已经发生的事件件,也就是让调度器调度指定的函数或协程
if (real_events & READ)
{
fd_ctx->triggerEvent(READ);
--m_pendingEventCount;
}
if (real_events & WRITE) {
fd_ctx->triggerEvent(WRITE);
--m_pendingEventCount;
}
}
/**
* 一旦处理完所有的事件,idle协程yield,这样可以让调度协程(Scheduler::run)重新检查是否有新任务要调度
* 上面triggerEvent实际也只是把对应的fiber重新加入调度,要执行的话还要等idle协程退出
*/
Fiber::ptr cur = Fiber::GetThis();
auto raw_ptr = cur.get();
cur.reset();

raw_ptr->yield();
}
}

IOManager::idle()作为IOManager 的“事件循环协程”。当调度器没有普通任务可跑时,线程会切换到 idle(),它负责:

  • 阻塞在 epoll_wait 上等待 IO 就绪或被 tickle 唤醒
  • 处理定时器超时,把超时回调投递到调度器队列
  • 把就绪的 READ/WRITE 事件对应的协程/回调重新 schedule() 回调度器执行
  • 最后 yield() 让出执行权回到 Scheduler::run(),让调度器去真正执行刚投递的任务

一句话:idle() 不执行业务逻辑,它只做 “等待 + 分发(投递)”。

解析

首先,准备 epoll 就绪事件数组 events

1
2
const uint64_t MAX_EVENTS = 256;
epoll_event* events = new epoll_event[MAX_EVENTS]();
  • 一次 epoll_wait 最多处理 256 个就绪事件,超过的留到下一轮。

其次,循环运行:直到满足停止条件

  • 通过 stopping(next_timeout) 获取下一个定时器时间作为 epoll_wait 的超时时间 timeout
  • 并且判断是否可以退出。

阻塞等待:epoll_wait + timeout

1
rt = epoll_wait(m_epfd, events, MAX_EVENTS, (int)next_timeout);
  • 如果有 IO 就绪 / tickle pipe 可读,立刻返回
  • 如果没有事件发生,但 timeout 到了,处理定时器
  • 被信号中断 EINTR 则重试
  • 最大阻塞时间为 5s,避免定时器很大导致线程长时间睡眠

处理所有已超时的定时器:只投递,不执行

1
2
3
4
5
6
7
8
9
10
std::vector<std::function<void()>> cbs;
listExpireCb(cbs);
if(!cbs.empty())
{
for(const auto &cb : cbs)
{
schedule(cb);
}
cbs.clear();
}
  • 主要是把定时器回调加入任务队列,真正执行要等 idle() yield 后回到 Scheduler::run()

遍历 epoll 返回的就绪事件:只分发,不执行

对于每个 epoll_event 事件

  • 如果是 tickle 管道读端:tickle 的目的只是唤醒 idle 协程让出执行权,所以读空 pipe 传来的字节即可。

    1
    2
    3
    4
    5
    6
    7
    8
    if (event.data.fd == m_tickleFds[0])
    {
    // ticklefd[0] 用于通知协程调度,这时只需要把管道里的内容读完即可
    uint8_t dummy[256];
    while (read(m_tickleFds[0], dummy, sizeof(dummy)) > 0)
    ;
    continue;
    }
  • 如果是业务 fd

    • 找到对应的 FdContext,因为 FdContext 保存了 fd 关注的事件就绪后需要进行的 cb/fiber
    • 判断事件 event 类型
      • 如果是异常事件 EPOLLERR | EPOLLHUP,则将异常事件映射成 read/write 都就绪事件,避免等待在 read/write 的协程永远睡死。

        1
        2
        3
        4
        if (event.events & (EPOLLERR | EPOLLHUP))
        {
        event.events |= (EPOLLERR | EPOLLHUP) & fd_ctx->m_events;
        }
      • 如果是常规事件,则判断是读写事件,并且判断是否是 fd_ctx 保存的 fd 所感兴趣的事件。

        1
        2
        3
        4
        5
        6
        7
        8
        9
        int real_events = NONE;
        if (event.events & EPOLLIN)
        {
        real_events |= READ;
        }
        if (event.events & EPOLLOUT)
        {
        real_events |= WRITE;
        }
      • 剔除已经发生的事件,将剩下的事件重新加入 epoll_wait

        1
        2
        3
        4
        5
        int left_events = (fd_ctx->m_events & ~real_events);
        int op = left_events ? EPOLL_CTL_MOD : EPOLL_CTL_DEL;
        event.events = EPOLLET | left_events;

        int rt2 = epoll_ctl(m_epfd, op, fd_ctx->fd, &event);
      • 处理已发生的事件,将 fd 感兴趣的事件 event 要进行的 fiber/cb 放入到任务队列中。

        1
        2
        3
        4
        5
        6
        7
        8
        9
        if (real_events & READ)
        {
        fd_ctx->triggerEvent(READ);
        --m_pendingEventCount;
        }
        if (real_events & WRITE) {
        fd_ctx->triggerEvent(WRITE);
        --m_pendingEventCount;
        }

释放执行权给 Scheduler::run()

一旦处理完所有的事件,idle 协程 yield,让出执行权,这样可以让调度协程 (Scheduler::run) 重新检查是否有新任务要执行。