1️⃣ 什么是协程调度器?

协程调度器(Scheduler)本质上是把“要运行的协程/任务”分配到“可用的线程”上执行的调度中心。在 Sylar 里它封装的是一种 N-M 调度模型:N 个协程(Fiber / 回调函数)复用 M 个线程。调度器内部维护一个线程池任务队列,外部通过 schedule()Fiberstd::function<void()> 投递进来;当队列从空变为非空时,调度器会通过 tickle() 唤醒处于 idle() 的空闲线程/空闲协程去取任务执行。这样做的好处是:协程切换发生在用户态、成本低,同时又能利用多线程把不同协程并行跑起来,实现高并发下的高效调度与资源利用。

2️⃣ 整体框架图

工作原理

3️⃣ 代码类图

Scheduler

4️⃣ 模块解析

私有成员变量

  • m_name:调度器的名称
  • m_mutex:互斥量
  • m_threads:线程池
  • m_tasks:任务队列
  • m_tids:线程池中包含的线程id
  • m_threadCount:线程数量
  • m_activeCount:活跃线程数量
  • m_idleCount:不活跃线程数量
  • m_useCaller:判断是否执行Scheduler构造函数的线程
  • m_rootFiber:当 m_userCaller = true 时,主协程以后的第一个协程
  • m_rootThread:当 m_userCaller = true 时,调度器所在线程的线程id
  • m_stopping:调度器是否正在停止

全局变量

  • t_schedule
    • 类型:static thread_local Scheduler*
    • 作用:
      • 每个线程都有一个独立的 t_schedule,指向“这个线程当前正在使用/运行的 Scheduler”。
      • Scheduler::GetThis() 能在任意位置直接拿到本线程的调度器。
  • t_schedule_fiber
    • 类型:static thread_local Fiber*
    • 作用:
      • 每个线程都有一个独立的 t_schedule_fiber,指向“本线程用于调度的那条主协程(调度协程)”。
      • Scheduler::GetMainFiber() 能返回本线程的主 Fiber,并且在 run()/idle() 等调度逻辑里实现“从工作协程切回调度协程/主协程”的切换。

嵌套类(SchedulerTask)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
class ScheduleTask
{
public:
friend class Scheduler;

/**
* @brief 默认构造函数
*/
ScheduleTask()
{
threadid = -1;
}

/**
* @brief 构造函数
*/
ScheduleTask(Fiber::ptr f, int thr)
{
fiber = f;
threadid = thr;
}

ScheduleTask(Fiber::ptr* f, int thr)
{
fiber.swap(*f);
threadid = thr;
}

ScheduleTask(std::function<void()> c, int thr)
{
cb = c;
threadid = thr;
}

void reset()
{
fiber = nullptr;
cb = nullptr;
threadid = -1;
}

private:
Fiber::ptr fiber;
std::function<void()> cb;
int threadid;
};
  • 作用:定义了任务的类型,只接受协程用于执行的函数
  • 私有成员变量
    • fiber:协程
    • cb:函数
    • threadid:协程和函数运行在哪个线程
  • 重要函数
    • 只有构造函数用于创建一个需要调度的任务。

重要成员函数

接下来将根据:Scheduler()[ user_caller 也就是主线程 Main 是否需要下水干活]、start()[线程池如何启动]、schedule()[如何放任务进去]、run()[线程如何消费任务,如何切换协程]、idle()[没任务时干啥]、stop()[怎么结束]

构造函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
Scheduler::Scheduler(size_t threads, bool use_caller, const std::string& name)
{
SYLAR_ASSERT(threads > 0);

m_useCaller = use_caller;
m_name = name;
if (use_caller) // 这表示打算将当前的线程用作调度线程
{
--threads; // 将调度线程的数量减一
Fiber::GetThis(); // 创建 caller 线程的主协程
SYLAR_ASSERT(GetThis() == nullptr);
t_schedule = this; // 将当前线程的调度器上下文表示为自己

/**
* caller 线程的主协程不会被调度协程run进行调度,而且,线程的调度协程停止时,应该返回到caller的主协程
* 在user caller情况下,把caller线程的主协程暂时保存起来,等调度协程结束时,再resume caller协程
*/

m_rootFiber.reset(new Fiber(std::bind(&Scheduler::run, this), 0, false)); // 创建主协程后的调度协程
Thread::setName(m_name); // 设置线程名称
t_schedule_fiber = m_rootFiber.get(); // 保存调度协程
m_rootThread = sylar::GetThreadId(); // 设置线程id
m_tids.push_back(m_rootThread); // 将当前线程id保存到线程id集合中
}
else
{
m_rootThread = -1;
}

m_threadCount = threads;
}
  • 作用
    • 如果打算将当前的线程用作调度线程,则创建针对当前线程的第一个协程,并通过 t_schedue 表示调度的线程是当前线程。
    • 之后创建一个子协程当成调度协程,将 run 函数当作协程的入口函数,并设置调度器的名字为当前线程的名字。
    • 最后将调度协程保存到 t_scheduler_fiber 中,并将当前线程 id 保存到线程池中。

start 函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
void Scheduler::start()
{
SYLAR_LOG_INFO(g_logger) << "start()";
MutexType::Lock lock(m_mutex);
if (m_stopping)
{
SYLAR_LOG_ERROR(g_logger) << "Scheduler is stopping";
return;
}
SYLAR_ASSERT(m_threads.empty());
m_threads.resize(m_threadCount);

for (size_t i = 0; i < m_threads.size(); ++i)
{
m_threads[i].reset(new Thread(std::bind(&Scheduler::run,this), m_name + "_" + std::to_string(i)));
m_tids.push_back(m_threads[i]->getId());
}
}
  • 作用:
    • 遍历当前的线程池,创建指定数量的线程来执行 run 函数,并将每个线程的 id 保存到线程 id 集合中。

schedule 函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
template<typename FiberOrcb>
void schedule(FiberOrcb ft, size_t threadid = -1)
{
bool need_tickle = false;
{
MutexType::Lock lock(m_mutex);
need_tickle = scheduleNoLock(ft, threadid);
}

if (need_tickle) // 这说明现在任务队列中有任务,需要唤醒idle协程
{
tickle(); // 唤醒idle协程
}
}
1
2
3
4
5
6
7
8
9
10
11
template<typename FiberOrcb>
bool scheduleNoLock(FiberOrcb ft, size_t threadid)
{
bool need_tickle = m_tasks.empty();
ScheduleTask task(ft, threadid);
if (task.fiber || task.cb)
{
m_tasks.push_back(task);
}
return need_tickle;
}
  • 作用
    • 根据 Fiber 或者回调函数 cb 创建一个任务 task,放进任务队列中 m_task,并设置 need_tickletrue
    • 如果 need_tickle 不为 false,这说明现在任务队列中有任务,就需要唤醒 idle 协程。

run 函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
void Scheduler::run()
{
SYLAR_LOG_INFO(g_logger) << "run";
// 这主要用于hook系统调用
set_hook_enable(true);
// 调度器创建的每个线程都会有个变量t_schedule指向调度器
setThis();
if (sylar::GetThreadId() != m_rootThread) // 这表示不是当前线程的id不是调度线程的id
{
t_schedule_fiber = sylar::Fiber::GetThis().get(); // 线程创建协程
}

Fiber::ptr idle(new Fiber(std::bind(&Scheduler::idle, this))); // 创建懒惰携程
Fiber::ptr cb_fiber;
ScheduleTask task;
while (true)
{
task.reset();
bool tickle_me = false; // 是否tickle其他线程进行任务调度
{
MutexType::Lock lock(m_mutex);
auto it = m_tasks.begin();
while (it != m_tasks.end())
{
/**
* @brief 情况一:当前任务指定了调度线程,但不是当前线程
*/
if (it->threadid != -1 && it->threadid != sylar::GetThreadId())
{
++it;
tickle_me = true;
continue;
}

// 判断任务至少存在
SYLAR_ASSERT(it->fiber || it->cb);

/**
* @brief 情况二:当前任务存在,但是正在运行在该线程的协程上
*/
if (it->fiber && it->fiber->getState() == Fiber::RUNNING)
{
++it;
continue;
}

/**
* @brief 情况三:当前调度线程找到了一个任务,准备开始调度(将其从任务队列中删除,活动线程数量加一)
*/
task = *it;
m_tasks.erase(it++);
++m_activeCount;
break;
}

// 当前线程如果拿到一个任务后,发现任务队列中还有任务,就tick其他线程
tickle_me |= (it != m_tasks.end());

}

// 通知其他线程
if (tickle_me)
{
tickle();
}

if (task.fiber)
{
// 使该task对应的协程执行,resume返回时,协程要么执行完了,要么半路yield了,总之这个任务就算完成了,活跃线程数减一
task.fiber->resume();
--m_activeCount;
task.reset();
}
else if (task.cb)
{
if (cb_fiber)
{
cb_fiber->reset(task.cb);
}
else
{
cb_fiber.reset(new Fiber(task.cb));
}
task.reset();
cb_fiber->resume();
--m_activeCount;
cb_fiber.reset();
}
else // 进入到这个分支,说明没有在任务队列中取到task
{
if (idle->getState() == Fiber::TERM)
{
// 这说明调度器已被停止
SYLAR_LOG_INFO(g_logger) << "idle fiber term";
break;
}
++m_idleCount;
idle->resume();
--m_idleCount;
}
}
SYLAR_LOG_INFO(g_logger) << "Scheduler::run() exit";
}
  • 作用:
    • 首先,setThis() 函数会使得调度器创建的每个线程都有个静态线程局部变量 t_schedule 指向调度器 Scheduler
    • 如果当前线程 id 不等于调度线程的 id,则需要创建当前线程主协程
    • 之后每个线程还需要创建懒惰协程 idle,入口函数为 idle(),每个懒惰协程先让出执行权。
    • 然后,本线程会进入到 while 循环中,持续判断任务队列 m_task 中是否有任务存在。如果当前线程已经取到了任务 task,但是任务队列 m_task 中还有任务 task 的话。则需要通知其他线程来进行干活。
    • 最后,将 cpu 的执行时间切换到 task 所保存的 协程 或者 回调函数,使该 task 对应的协程执行,resume 返回时,协程要么执行完了,要么半路yield了,总之这个任务就算完成了,活跃线程数减一。如果没有取到 task 的话,先判断当前调度器是否停止?如果没有停止的话,则切换到 idle 协程来执行对应的 idle(),当 idle 协程执行完时,当前线程就又需要重新进行抢夺任务来执行。

idle() 函数

1
2
3
4
5
6
7
8
void Scheduler::idle()
{
SYLAR_LOG_DEBUG(g_logger) << "idle";
while(!stopping())
{
sylar::Fiber::GetThis()->yield();
}
}
  • 作用:
    • 后面的模块会有继承 Scheduler 的类,其会修改 idle() 函数,用来执行线程没事的时候具体需要做些什么事情,而 Schedluer 这里只是简单的让 idle 协程让出执行权。

stop 函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
void Scheduler::stop()
{
SYLAR_LOG_INFO(g_logger) << "stop";
if (stopping())
{
return;
}
m_stopping = true;

// 判断是否使用了caller线程
if (m_useCaller)
{
SYLAR_ASSERT(GetThis() == this);
}
else
{
SYLAR_ASSERT(GetThis() != this);
}
for (size_t i = 0; i < m_threadCount; i++)
{
tickle();
}

if (m_rootFiber)
{
tickle();
}

// 在use caller情况下,调度器协程结束时,应该返回caller协程
if (m_rootFiber)
{
m_rootFiber->resume();
SYLAR_LOG_DEBUG(g_logger) << "m_rootFiber end";
}

std::vector<Thread::ptr> thres;
{
MutexType::Lock lock(m_mutex);
thres.swap(m_threads);
}

for (auto& i : thres)
{
i->join();
}

}
  • 作用:stop 函数主要用于优雅停止调度器:它不是立刻杀掉线程,而是发出停止信号,唤醒所有调度线程/协程让它们自行退出调度循环,最后 join() 等待线程池完全结束,确保 stop 返回时调度器已经干净停机。
    • 提前返回:若 stopping() 已满足条件,直接返回,避免重复停止。
    • 置停止标志:置 m_stopping = true,让 run()/idle() 中的 stopping() 判断最终变为真,从而退出循环。
    • 断言校验调用上下文:通过 GetThis() 检查当前线程是否符合 use_caller 语义,避免在错误线程上下文里触发停止逻辑导致协程切换混乱。
    • tickle 唤醒:循环调用 tickle()用于叫醒可能在 idle()/等待中的调度线程,让它们尽快回到 run() 检查停止条件并退出,否则 join() 可能一直等。m_rootFiber 存在时额外 tickle,是为了兼顾 caller 线程上的调度协程。
    • resume rootFiber:当 use_caller=true 时,caller 线程的调度循环跑在 m_rootFiber 中。m_rootFiber->resume() 是为了把控制权切回调度协程,让其继续执行 run() 并走到退出点(通常由 idle() 观察到 stopping() 后结束,run()break 退出),从而正确回到 caller 主协程完成收尾。
    • 释放锁后 join:将 m_threads swap 到局部变量再逐个 join(),避免 join 时长时间持锁,保证 stop 返回时所有调度线程都已经退出。

参考文章

sylar 源码解析—协程模块

从零开始重写sylar C++高性能分布式服务器框架

C++高性能分布式服务器框架