IOManager 模块
1️⃣ 什么是 IOManager 模块
IOManager 模块可以理解为 Sylar 框架里的“协程版 I/O 事件循环 + 调度中枢”。它把 Linux 的 epoll 事件机制和协程调度器 Scheduler 融合在了一起。
当某个文件描述符
socket、pipe等变得可读/可写时,IOManager不会像传统Reactor那样直接在epoll线程里执行回调,而是把对应的回调函数或协程Fiber重新投递到调度器队列中,由线程池里的工作线程去执行。这样一来,业务逻辑可以用“同步写法”的协程风格组织,却能获得“异步 I/O”的高并发能力。
更具体地说,IOManager 维护了一个 FdContext 表,用来记录每个 fd 关注的读/写事件以及事件触发后要执行的回调函数或等待中的协程。它内部的 idle() 协程长期阻塞在 epoll_wait 上
一旦事件到来,就通过
triggerEvent()将任务调度出去;同时它还继承TimerManager,把定时器超时回调也纳入同一套调度体系,实现“IO事件 + 定时任务”的统一驱动。为了在新增任务或定时器更新时及时唤醒epoll_wait,IOManager还使用pipe的tickle机制向epoll注入一个可读事件,让空闲线程立刻从阻塞中返回,继续处理新的调度工作。
2️⃣ 整体框架

3️⃣ 嵌套类
enum Event
1 | enum Event |
enum Event 用来表示 IOManager 关心的 I/O 事件类型,并且用位标志(bitmask)的方式存储/组合它们
NONE = 0x0:不关注任何事件READ = 0x1:关注“可读事件”(比如socket有数据可读 /pipe可读 / 接收缓冲区有数据)WRITE = 0x4:关注“可写事件”(比如socket发送缓冲区有空间,可以继续发送)
strcut EventContext
1 | struct EventContext |
EventContext 的作用就是:描述“某个 fd 的某个事件(READ 或 WRITE)触发后,该由哪个调度器 Scheduler 去执行什么回调函数或协程”。
scheduler- 记录事件触发后要把任务投递到哪个调度器
Scheduler里执行(通常就是当前线程的Scheduler::GetThis())。因为IOManager可能有多个调度线程,必须知道回到哪个调度器。
- 记录事件触发后要把任务投递到哪个调度器
cb- 如果用户在
addEvent(fd, event, cb)传了回调,那么事件就绪时执行这个回调函数。
- 如果用户在
fiber- 如果用户没有传回调,会把协程存进来,事件触发时就把这个协程重新
schedule(),从而实现“协程等待 IO → IO 就绪后继续跑”。
- 如果用户没有传回调,会把协程存进来,事件触发时就把这个协程重新
struct FdContext
1 | struct FdContext |
FdContext 的作用是:把一个文件描述符 fd 的“事件注册状态 + 事件触发后的执行体”完整封装起来,让 IOManager 能把 epoll 返回的事件,准确转换成“调度器要执行的协程/回调”。
FdContext中的m_event记录当前这个fd注册了哪些事件m_event & Read表示在fd上注册了读事件m_event & Write表示在fd上注册了写事件
FdContext中的EventContext记录当前这个fd对应读写事件的回调函数cb和协程fibergetEventContext用于根据事件event的类型,返回对应事件所注册的EventContext。triggerEvent用于根据事件event的类型,将这个事件所注册的回调函数cb以及协程fiber加入到EventContext所保存的调度器scheduler中进行调度。
4️⃣ 重要函数
构造函数
1 | IOManager::IOManager(size_t threads, bool use_caller, const std::string &name) |
IOManager 继承自 Scheduler,因此在构造 IOManager 对象时,会先执行基类 Scheduler 的构造函数,完成调度器的基础初始化工作(例如线程数配置、是否使用 caller 线程、调度器名称设置等)。Scheduler 的完整初始化与运行机制我已在协程调度模块中详细展开,本文不再重复啰嗦,只保留与 IOManager 相关的部分作为铺垫。
解析:
- 首先,
IOManager的构造函数通过系统调用epoll_create建立内核事件表。m_epfd是epoll的实例句柄,对应内核中的事件表。- 后续所有
fd的读写事件监听,都会通过epoll_ctl注册到这个epoll实例中。 idle()协程则会阻塞在epoll_wait(m_epfd, ...)上等待事件发生(IO就绪 /tickle唤醒 / 超时返回)。
- 其次,建立
tickle机制:用pipe唤醒阻塞的epoll_wait的idle协程IOManager最关键的问题之一是:当idle协程正阻塞在epoll_wait时,如何让它立刻返回去重新检查调度队列?Sylar中的实现是使用一对pipe来作为唤醒管道1
2int rt = pipe(m_tickleFds);
SYLAR_ASSERT(rt == 0);m_tickleFds[0]:读端(注册进epoll)m_tickleFds[1]:写端(tickle时写入一个字节)
- 之后,会将读端注册到
epoll,这里监听的是EPOLLIN(可读事件发生),并使用EPOLLET(边缘触发)。- 由于执行
Scheduler::run()的线程发现任务队列m_task没有任务的时候,会执行协程idle(),陷入到epoll_wait中等待事件的发生。 - 但是当其他线程执行
Scheduler::run()发现除了自己执行的任务之外还有任务的时候,会调用tickle()往写端写入数据时,此时读端变得可读,idle协程就会从epoll_wait立刻返回,从而唤醒idle。idle()协程会在本轮完成事件分发:将IO就绪事件/超时定时器对应的fiber/cb重新schedule()投递回调度器队列;随后yield()让出执行权,由Scheduler::run()再去真正执行这些任务。 - 为了避免读端阻塞,还需要把读端设为非阻塞模式。
- 由于执行
- 然后,初始化
fd上下文表:建立fd→FdContext的映射1
contextResize(32);
m_fdcontexts是一个 “fd下标 →FdContext” 的数组结构。- 构造阶段先预分配一段容量,后续
addEvent(fd, ...)时如果 fd 更大,再动态扩容。
- 最后,启动调度器:让
IOManager正式开始工作1
start();
- 这一步会启动
Scheduler的工作线程,并进入调度循环。之后IOManager的idle()协程会在空闲时阻塞在epoll_wait,等待IO事件或tickle唤醒;一旦事件发生,就将对应的fiber/cb投递回调度器执行。
- 这一步会启动
contextResize 函数
1 | void IOManager::contextResize(size_t size) |
作用是:扩容并初始化 IOManager 的 fd 上下文表 m_fdcontexts,建立 “fd → FdContext” 的映射,保证后续对某个 fd 注册事件时,能快速拿到对应的 FdContext。
tickle 函数
1 | void IOManager::tickle() |
IOManager::tickle() 的作用是:唤醒可能正在 epoll_wait 中阻塞的空闲线程(idle 协程),让调度器立刻重新检查任务队列/定时器,从而及时调度新任务。
它做法很经典:往 pipe 管道写一个字节制造“可读事件”。
addEvent 函数
1 | int IOManager::addEvent(int fd, Event event, std::function<void()> cb) |
IOManager::addEvent 的作用是:在指定 fd 上注册一个 IO 事件(READ/WRITE)到 epoll,同时把“事件触发后要执行的内容”绑定到该 fd 的上下文里。这样当 epoll 检测到该 fd 就绪时,IOManager 就能把对应的 cb 或 fiber 投递回 Scheduler 去执行。
解析
- 首先,创建或者定位
fd对应的FdContext。- 如果
m_fdcontexts的容量够用,直接返回fd对应的FdContext。 - 否则,对
m_fdcontexts进行扩容,再返回fd对应的FdContext。
- 如果
- 其次,保证不允许对同一
fd重复注册同一种事件。 - 然后,将对
fd的事件注册到epoll- 先决定操作类型
op:- 若该
fd之前没有注册过任何事件:EPOLL_CTL_ADD - 若已有
fd之前注册过事件event,比如之前已注册READ,现在加WRITE:EPOLL_CTL_MOD
- 若该
- 构造
epoll_eventepevent.events = EPOLLET | 旧事件 | 新事件epevent.data.ptr = fd_ctx,这里非常重要,因为当epoll检测到该fd就绪后,可以直接通过epoll_event.data.ptr找到注册到这个事件的协程/回调函数fiber/cb
- 调用
epoll_ctl注册/修改内核事件表m_epfd监听该fd的所关注事件。
- 先决定操作类型
- 之后,通过
m_pendingEventCount记录当前 “已注册但尚未触发” 的IO事件数,用于stopping()判断(是否还有IO事件挂着,不能退出)。 - 最后,绑定事件触发后的执行体(
cb或fiber)到EventContext。- 更新
fd_ctx关注的事件:fd_ctx->m_events |= event - 找到事件
event对应的协程/回调函数:EventContext& event_ctx = getEventContext(event) - 绑定调度器:
event_ctx.scheduler = Scheduler::GetThis(); - 绑定执行体:
- 如果传了
cb:保存回调函数,之后事件触发后就schedule这个cb函数 - 如果没传
cb:保存当前协程Fiber::GetThis(),表示“当前协程在等这个IO事件”。
- 如果传了
- 更新
delEvent 函数
同理于 addEvent 函数,只不过作用相反
cancelEvent 函数
1 | bool IOManager::cancelEvent(int fd, Event event) |
该函数主要用于取消某个 fd 上已经注册的指定事件(READ/WRITE),把它从 epoll 的监听集合里移除,同时“主动触发一次”该事件对应的 cb/fiber,让等待它的任务立刻被调度执行(通常用于唤醒正在等待 IO 的协程,避免永远挂起)
解析
1 | fd_ctx->triggerEvent(event); |
首先,addEvent 的本质:不是“监听” ,而是“挂起一个等待者”
- 当调用
addEvent(fd, READ)[不传cb]时,典型使用方式是:addEvent把当前协程保存到 `fd_ctx->read.fiber- 然后当前协程
yield(),让出执行权 - 等
READ就绪时,IOManager才会schedule(fiber)把它恢复。
所以这个fiber现在的状态是:我在等READ才能继续。
其次,如果只“取消监听”,但不唤醒,会发生什么?
- 这个协程已经
yield()了,它不会自己醒。 - 你又把
epoll监听删了,未来也不会再触发READ - 结果这个协程永远不会被恢复(逻辑死锁/资源泄漏)
cancelAll 函数
同理于 cancelEvent 函数,主要用于取消 fd 关注的所有事件。
idle 函数
IOManager 最重要的函数:主要描述了一个线程在无法在任务队列 m_task 取得任务 task 后,会将执行权给 idle 协程,idle 协程会给出线程在空闲时会做些什么!
1 | void IOManager::idle() |
IOManager::idle()作为IOManager 的“事件循环协程”。当调度器没有普通任务可跑时,线程会切换到 idle(),它负责:
- 阻塞在
epoll_wait上等待IO就绪或被tickle唤醒 - 处理定时器超时,把超时回调投递到调度器队列
- 把就绪的
READ/WRITE事件对应的协程/回调重新schedule()回调度器执行 - 最后
yield()让出执行权回到Scheduler::run(),让调度器去真正执行刚投递的任务
一句话:idle() 不执行业务逻辑,它只做 “等待 + 分发(投递)”。
解析
首先,准备 epoll 就绪事件数组 events
1 | const uint64_t MAX_EVENTS = 256; |
- 一次
epoll_wait最多处理 256 个就绪事件,超过的留到下一轮。
其次,循环运行:直到满足停止条件
- 通过
stopping(next_timeout)获取下一个定时器时间作为epoll_wait的超时时间timeout - 并且判断是否可以退出。
阻塞等待:epoll_wait + timeout
1 | rt = epoll_wait(m_epfd, events, MAX_EVENTS, (int)next_timeout); |
- 如果有
IO就绪 /tickle pipe可读,立刻返回 - 如果没有事件发生,但
timeout到了,处理定时器 - 被信号中断
EINTR则重试 - 最大阻塞时间为 5s,避免定时器很大导致线程长时间睡眠
处理所有已超时的定时器:只投递,不执行
1 | std::vector<std::function<void()>> cbs; |
- 主要是把定时器回调加入任务队列,真正执行要等
idle() yield后回到Scheduler::run()。
遍历 epoll 返回的就绪事件:只分发,不执行
对于每个 epoll_event 事件
如果是
tickle管道读端:tickle的目的只是唤醒idle协程让出执行权,所以读空pipe传来的字节即可。1
2
3
4
5
6
7
8if (event.data.fd == m_tickleFds[0])
{
// ticklefd[0] 用于通知协程调度,这时只需要把管道里的内容读完即可
uint8_t dummy[256];
while (read(m_tickleFds[0], dummy, sizeof(dummy)) > 0)
;
continue;
}如果是业务
fd- 找到对应的
FdContext,因为FdContext保存了fd关注的事件就绪后需要进行的cb/fiber。 - 判断事件
event类型如果是异常事件
EPOLLERR | EPOLLHUP,则将异常事件映射成read/write都就绪事件,避免等待在read/write的协程永远睡死。1
2
3
4if (event.events & (EPOLLERR | EPOLLHUP))
{
event.events |= (EPOLLERR | EPOLLHUP) & fd_ctx->m_events;
}如果是常规事件,则判断是读写事件,并且判断是否是
fd_ctx保存的fd所感兴趣的事件。1
2
3
4
5
6
7
8
9int real_events = NONE;
if (event.events & EPOLLIN)
{
real_events |= READ;
}
if (event.events & EPOLLOUT)
{
real_events |= WRITE;
}剔除已经发生的事件,将剩下的事件重新加入
epoll_wait1
2
3
4
5int left_events = (fd_ctx->m_events & ~real_events);
int op = left_events ? EPOLL_CTL_MOD : EPOLL_CTL_DEL;
event.events = EPOLLET | left_events;
int rt2 = epoll_ctl(m_epfd, op, fd_ctx->fd, &event);处理已发生的事件,将
fd感兴趣的事件event要进行的fiber/cb放入到任务队列中。1
2
3
4
5
6
7
8
9if (real_events & READ)
{
fd_ctx->triggerEvent(READ);
--m_pendingEventCount;
}
if (real_events & WRITE) {
fd_ctx->triggerEvent(WRITE);
--m_pendingEventCount;
}
- 找到对应的
释放执行权给 Scheduler::run()
一旦处理完所有的事件,idle 协程 yield,让出执行权,这样可以让调度协程 (Scheduler::run) 重新检查是否有新任务要执行。
