1️⃣ 什么是协程调度器?
协程调度器(Scheduler)本质上是把“要运行的协程/任务”分配到“可用的线程”上执行的调度中心。在 Sylar 里它封装的是一种 N-M 调度模型:N 个协程(Fiber / 回调函数)复用 M 个线程。调度器内部维护一个线程池和任务队列,外部通过 schedule() 把 Fiber 或 std::function<void()> 投递进来;当队列从空变为非空时,调度器会通过 tickle() 唤醒处于 idle() 的空闲线程/空闲协程去取任务执行。这样做的好处是:协程切换发生在用户态、成本低,同时又能利用多线程把不同协程并行跑起来,实现高并发下的高效调度与资源利用。
2️⃣ 整体框架图

3️⃣ 代码类图

4️⃣ 模块解析
私有成员变量
m_name:调度器的名称
m_mutex:互斥量
m_threads:线程池
m_tasks:任务队列
m_tids:线程池中包含的线程id
m_threadCount:线程数量
m_activeCount:活跃线程数量
m_idleCount:不活跃线程数量
m_useCaller:判断是否执行Scheduler构造函数的线程
m_rootFiber:当 m_userCaller = true 时,主协程以后的第一个协程
m_rootThread:当 m_userCaller = true 时,调度器所在线程的线程id
m_stopping:调度器是否正在停止
全局变量
t_schedule
- 类型:
static thread_local Scheduler*
- 作用:
- 每个线程都有一个独立的
t_schedule,指向“这个线程当前正在使用/运行的 Scheduler”。
- 让
Scheduler::GetThis() 能在任意位置直接拿到本线程的调度器。
t_schedule_fiber
- 类型:
static thread_local Fiber*
- 作用:
- 每个线程都有一个独立的
t_schedule_fiber,指向“本线程用于调度的那条主协程(调度协程)”。
- 让
Scheduler::GetMainFiber() 能返回本线程的主 Fiber,并且在 run()/idle() 等调度逻辑里实现“从工作协程切回调度协程/主协程”的切换。
嵌套类(SchedulerTask)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| class ScheduleTask { public: friend class Scheduler;
ScheduleTask() { threadid = -1; }
ScheduleTask(Fiber::ptr f, int thr) { fiber = f; threadid = thr; }
ScheduleTask(Fiber::ptr* f, int thr) { fiber.swap(*f); threadid = thr; }
ScheduleTask(std::function<void()> c, int thr) { cb = c; threadid = thr; } void reset() { fiber = nullptr; cb = nullptr; threadid = -1; } private: Fiber::ptr fiber; std::function<void()> cb; int threadid; };
|
- 作用:定义了任务的类型,只接受协程和用于执行的函数
- 私有成员变量
fiber:协程
cb:函数
threadid:协程和函数运行在哪个线程
- 重要函数
重要成员函数
接下来将根据:Scheduler()[ user_caller 也就是主线程 Main 是否需要下水干活]、start()[线程池如何启动]、schedule()[如何放任务进去]、run()[线程如何消费任务,如何切换协程]、idle()[没任务时干啥]、stop()[怎么结束]
构造函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| Scheduler::Scheduler(size_t threads, bool use_caller, const std::string& name) { SYLAR_ASSERT(threads > 0);
m_useCaller = use_caller; m_name = name; if (use_caller) { --threads; Fiber::GetThis(); SYLAR_ASSERT(GetThis() == nullptr); t_schedule = this;
m_rootFiber.reset(new Fiber(std::bind(&Scheduler::run, this), 0, false)); Thread::setName(m_name); t_schedule_fiber = m_rootFiber.get(); m_rootThread = sylar::GetThreadId(); m_tids.push_back(m_rootThread); } else { m_rootThread = -1; }
m_threadCount = threads; }
|
- 作用
- 如果打算将当前的线程用作调度线程,则创建针对当前线程的第一个协程,并通过
t_schedue 表示调度的线程是当前线程。
- 之后创建一个子协程当成调度协程,将
run 函数当作协程的入口函数,并设置调度器的名字为当前线程的名字。
- 最后将调度协程保存到
t_scheduler_fiber 中,并将当前线程 id 保存到线程池中。
start 函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| void Scheduler::start() { SYLAR_LOG_INFO(g_logger) << "start()"; MutexType::Lock lock(m_mutex); if (m_stopping) { SYLAR_LOG_ERROR(g_logger) << "Scheduler is stopping"; return; } SYLAR_ASSERT(m_threads.empty()); m_threads.resize(m_threadCount);
for (size_t i = 0; i < m_threads.size(); ++i) { m_threads[i].reset(new Thread(std::bind(&Scheduler::run,this), m_name + "_" + std::to_string(i))); m_tids.push_back(m_threads[i]->getId()); } }
|
- 作用:
- 遍历当前的线程池,创建指定数量的线程来执行
run 函数,并将每个线程的 id 保存到线程 id 集合中。
schedule 函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| template<typename FiberOrcb> void schedule(FiberOrcb ft, size_t threadid = -1) { bool need_tickle = false; { MutexType::Lock lock(m_mutex); need_tickle = scheduleNoLock(ft, threadid); }
if (need_tickle) { tickle(); } }
|
1 2 3 4 5 6 7 8 9 10 11
| template<typename FiberOrcb> bool scheduleNoLock(FiberOrcb ft, size_t threadid) { bool need_tickle = m_tasks.empty(); ScheduleTask task(ft, threadid); if (task.fiber || task.cb) { m_tasks.push_back(task); } return need_tickle; }
|
- 作用
- 根据
Fiber 或者回调函数 cb 创建一个任务 task,放进任务队列中 m_task,并设置 need_tickle 为 true。
- 如果
need_tickle 不为 false,这说明现在任务队列中有任务,就需要唤醒 idle 协程。
run 函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103
| void Scheduler::run() { SYLAR_LOG_INFO(g_logger) << "run"; set_hook_enable(true); setThis(); if (sylar::GetThreadId() != m_rootThread) { t_schedule_fiber = sylar::Fiber::GetThis().get(); }
Fiber::ptr idle(new Fiber(std::bind(&Scheduler::idle, this))); Fiber::ptr cb_fiber; ScheduleTask task; while (true) { task.reset(); bool tickle_me = false; { MutexType::Lock lock(m_mutex); auto it = m_tasks.begin(); while (it != m_tasks.end()) {
if (it->threadid != -1 && it->threadid != sylar::GetThreadId()) { ++it; tickle_me = true; continue; }
SYLAR_ASSERT(it->fiber || it->cb);
if (it->fiber && it->fiber->getState() == Fiber::RUNNING) { ++it; continue; }
task = *it; m_tasks.erase(it++); ++m_activeCount; break; } tickle_me |= (it != m_tasks.end()); }
if (tickle_me) { tickle(); }
if (task.fiber) { task.fiber->resume(); --m_activeCount; task.reset(); } else if (task.cb) { if (cb_fiber) { cb_fiber->reset(task.cb); } else { cb_fiber.reset(new Fiber(task.cb)); } task.reset(); cb_fiber->resume(); --m_activeCount; cb_fiber.reset(); } else { if (idle->getState() == Fiber::TERM) { SYLAR_LOG_INFO(g_logger) << "idle fiber term"; break; } ++m_idleCount; idle->resume(); --m_idleCount; } } SYLAR_LOG_INFO(g_logger) << "Scheduler::run() exit"; }
|
- 作用:
- 首先,
setThis() 函数会使得调度器创建的每个线程都有个静态线程局部变量 t_schedule 指向调度器 Scheduler。
- 如果当前线程
id 不等于调度线程的 id,则需要创建当前线程的主协程。
- 之后每个线程还需要创建懒惰协程
idle,入口函数为 idle(),每个懒惰协程先让出执行权。
- 然后,本线程会进入到
while 循环中,持续判断任务队列 m_task 中是否有任务存在。如果当前线程已经取到了任务 task,但是任务队列 m_task 中还有任务 task 的话。则需要通知其他线程来进行干活。
- 最后,将
cpu 的执行时间切换到 task 所保存的 协程 或者 回调函数,使该 task 对应的协程执行,resume 返回时,协程要么执行完了,要么半路yield了,总之这个任务就算完成了,活跃线程数减一。如果没有取到 task 的话,先判断当前调度器是否停止?如果没有停止的话,则切换到 idle 协程来执行对应的 idle(),当 idle 协程执行完时,当前线程就又需要重新进行抢夺任务来执行。
idle() 函数
1 2 3 4 5 6 7 8
| void Scheduler::idle() { SYLAR_LOG_DEBUG(g_logger) << "idle"; while(!stopping()) { sylar::Fiber::GetThis()->yield(); } }
|
- 作用:
- 后面的模块会有继承
Scheduler 的类,其会修改 idle() 函数,用来执行线程没事的时候具体需要做些什么事情,而 Schedluer 这里只是简单的让 idle 协程让出执行权。
stop 函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
| void Scheduler::stop() { SYLAR_LOG_INFO(g_logger) << "stop"; if (stopping()) { return; } m_stopping = true;
if (m_useCaller) { SYLAR_ASSERT(GetThis() == this); } else { SYLAR_ASSERT(GetThis() != this); } for (size_t i = 0; i < m_threadCount; i++) { tickle(); }
if (m_rootFiber) { tickle(); }
if (m_rootFiber) { m_rootFiber->resume(); SYLAR_LOG_DEBUG(g_logger) << "m_rootFiber end"; }
std::vector<Thread::ptr> thres; { MutexType::Lock lock(m_mutex); thres.swap(m_threads); } for (auto& i : thres) { i->join(); } }
|
- 作用:
stop 函数主要用于优雅停止调度器:它不是立刻杀掉线程,而是发出停止信号,唤醒所有调度线程/协程让它们自行退出调度循环,最后 join() 等待线程池完全结束,确保 stop 返回时调度器已经干净停机。
- 提前返回:若
stopping() 已满足条件,直接返回,避免重复停止。
- 置停止标志:置
m_stopping = true,让 run()/idle() 中的 stopping() 判断最终变为真,从而退出循环。
- 断言校验调用上下文:通过
GetThis() 检查当前线程是否符合 use_caller 语义,避免在错误线程上下文里触发停止逻辑导致协程切换混乱。
tickle 唤醒:循环调用 tickle()用于叫醒可能在 idle()/等待中的调度线程,让它们尽快回到 run() 检查停止条件并退出,否则 join() 可能一直等。m_rootFiber 存在时额外 tickle,是为了兼顾 caller 线程上的调度协程。
resume rootFiber:当 use_caller=true 时,caller 线程的调度循环跑在 m_rootFiber 中。m_rootFiber->resume() 是为了把控制权切回调度协程,让其继续执行 run() 并走到退出点(通常由 idle() 观察到 stopping() 后结束,run() 再 break 退出),从而正确回到 caller 主协程完成收尾。
- 释放锁后
join:将 m_threads swap 到局部变量再逐个 join(),避免 join 时长时间持锁,保证 stop 返回时所有调度线程都已经退出。
参考文章
sylar 源码解析—协程模块
从零开始重写sylar C++高性能分布式服务器框架
C++高性能分布式服务器框架