【动手写协程库 2】协程调度器

【动手写协程库】系列笔记是学习sylar的协程库时的记录,参考了从零开始重写sylar C++高性能分布式服务器框架和代码随想录中的文档。文章并不是对所有代码的详细解释,而是为了自己理解一些片段所做的笔记。

Scheduler类中其他函数的定义可以在这里查看:Github: src/scheduler.cpp

Sylar的协程调度器是一个N-M模型,意味着N个线程可以运行M个协程,协程能够在线程之间进行切换,也可以被绑定到特定的线程上执行。

调度器可以由应用程序中的任何线程创建,但创建它的线程(称为caller线程)可以选择是否参与协程的调度。如果caller线程参与调度,那么调度器的线程数会相应减少一个,因为caller线程本身也会作为一个调度线程。

Scheduler相关API如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
#ifndef SCHEDULER_H_
#define SCHEDULER_H_

#include <atomic>
#include <cstddef>
#include <functional>
#include <iostream>
#include <list>
#include <memory>
#include <mutex>
#include <thread>
#include <utility>
#include <vector>
#include "coroutine.h"
#include "util.h"

// 协程调度器
class Scheduler {
private:
// 调度任务,任务类型可以是协程/函数二选一,并且可指定调度线程
using ThreadIdPtr = std::shared_ptr<std::thread::id>;

struct SchedulerTask {
ThreadIdPtr thread_id_;
Coroutine::Ptr coroutine_;
std::function<void()> callback_;

SchedulerTask() {}

SchedulerTask(Coroutine::Ptr co, ThreadIdPtr id)
: coroutine_(co)
, thread_id_(std::move(id)) {}

SchedulerTask(std::function<void()> callback, ThreadIdPtr id)
: callback_(callback)
, thread_id_(std::move(id)) {}

void Reset() {
thread_id_.reset();
callback_ = nullptr;
coroutine_ = nullptr;
}
};

public:
Scheduler(size_t threads = 1, bool use_caller = true, const std::string& name = "scheduler");

virtual ~Scheduler();

std::string GetName() const {
return name_;
}

void Start();

void Stop();

template <typename TaskType>
void Sched(TaskType t, ThreadIdPtr id = nullptr) requires(std::invocable<TaskType> || std::same_as<TaskType, Coroutine::Ptr>) {
bool is_need_tick = false;

{
std::lock_guard lock(mutex_);
is_need_tick = tasks_.empty();
SchedulerTask task(t, id);
if (task.callback_ || task.coroutine_) {
tasks_.push_back(task);
}
}

if (is_need_tick) {
Tickle();
}
}

public:
static Scheduler* GetScheduler();

static Coroutine* GetSchedCoroutine();

protected:
virtual void Tickle();

void Run();

void SetThisAsScheduler();

virtual void Idle();

virtual bool IsStop();

bool HasIdleThreads() {
return idle_threads_ > 0;
}

private:
std::string name_;
std::mutex mutex_;
std::vector<std::thread> thread_pool_;
std::vector<std::thread::id> thread_ids_;
std::list<SchedulerTask> tasks_;

size_t threads_size_;
std::atomic_size_t active_threads_{0};
std::atomic_size_t idle_threads_{0};

std::thread::id sched_id_; // use_caller为true时,调度器所在的线程id
Coroutine::Ptr sched_co_; // use_caller为true时调度器所在线程的调度协程
bool is_stop_;
bool is_use_caller_;
};

#endif /* SCHEDULER_H_ */

调度器的工作流大致为:

  • 协程调度器在初始化时可传入线程数和一个布尔型的use_caller参数,表示是否使用caller线程。在使用caller线程的情况下,线程数自动减一,并且调度器内部会初始化一个属于caller线程的调度协程并保存起来(比如,在main函数中创建的调度器,如果use_caller为true,那调度器会初始化一个属于main函数线程的调度协程)。
  • 调度器创建好后 ,即可调用调度器的Sched函数向调度器添加调度任务,但此时调度器并不会立刻执行这些任务,而是将它们保存到内部的一个任务队列中。
  • 调用Scheduler::Start()函数启动调度 。调用Start会创建调度线程池,线程数量由初始化时的线程数和use_caller确定。调度线程一旦创建,就会立刻从任务队列里取任务执行。比较特殊的一点是,如果初始化时指定线程数为1且use_caller为true,那么Start方法什么也不做,因为不需要创建新线程用于调度。并且,由于没有创建新的调度线程,那只能由caller线程的调度协程来负责调度协程(这里有点绕),而caller线程的调度协程的执行时机与Start函数并不在同一个地方。caller线程的调度协程的执行时机在Stop函数中。
  • 接下来是调度协程,对应Scheduler::Run() 。调度协程负责从调度器的任务队列中取任务执行。取出的任务即子协程,每个子协程执行完后都必须返回调度协程,由调度协程重新从任务队列中取新的协程并执行。如果任务队列空了,那么调度协程会切换到一个idle协程,等有新任务进来时,idle协程才会退出并回到调度协程,重新开始下一轮调度。(在Scheduler中,idle函数的定义十分简单粗暴,因为实际使用协程库时并不是直接使用Scheduler类,而是使用它的派生类,在派生类中将会实现更为完善的调度)
  • 接下来是添加调度任务,对应Scheduler::Sched() ,这个方法支持传入协程或函数,并且支持一个线程id参数,表示是否将这个协程或函数绑定到一个具体的线程上执行。如果任务队列为空,那么在添加任务之后,要调用一次tickle方法以通知各调度线程的调度协程有新任务来了。在执行调度任务时,还可以通过调度器的GetScheduler()获取到当前调度器,再通过Sched函数继续添加新的任务,这就变相实现了在子协程中创建并运行新的子协程的功能。
  • 接下来是调度器的停止。调度器的停止行为要分两种情况讨论,首先是use_caller为false的情况,这种情况下,由于没有使用caller线程进行调度,那么只需要简单地等各个调度线程的调度协程退出就行了。如果use_caller为true,表示caller线程也要参于调度,这时,调度器初始化时记录的属于caller线程的调度协程就要起作用了,在调度器停止前,应该让这个caller线程的调度协程也运行一次,让caller线程完成调度工作后再退出。如果调度器只使用了caller线程进行调度,那么所有的调度任务要在调度器停止时才会被调度。

调度器中最重要的一个函数我认为就是Run()函数了,这个函数用于协程的调度,或者,你可以将他理解为是一个调度协程(名词)。

创建Scheduler时会为每一个内部线程池中的每一个线程都绑定一个调度协程,线程数量默认为1,此时也默认会使用caller线程,也就是使用的主线程。调度协程Scheduler::Run()会从任务队列Task Queue中不断去取任务去执行。如果有任务可执行,那就切换至任务协程执行,任务协程执行完毕后又切换回调度协程;无任务执行时,调度协程切换至Idle协程进行等待。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
// 用于协程的调度
void Scheduler::Run() {
LOG << "Scheduler running...\n";
SetThisAsScheduler();

// 如果当前线程不是调度器所在线程,设置调度的协程为当前线程运行的协程
if (std::this_thread::get_id() != sched_id_) {
sched_coroutine = Coroutine::GetNowCoroutine().get();
}

Coroutine::Ptr idle_co = std::make_shared<Coroutine>([this] { this->Idle(); });
Coroutine::Ptr callback_co;

SchedulerTask task;

while (true) {
task.Reset();
bool tickle = false;
{
std::lock_guard lock(mutex_);
auto iter = tasks_.begin();
while (iter != tasks_.end()) {
// 当前遍历的task已经分配了线程去执行且这个线程不是当前线程,则不用管
if (iter->thread_id_ && *iter->thread_id_ != std::this_thread::get_id()) {
++iter;
tickle = true;
continue;
}
if (iter->coroutine_ && iter->coroutine_->GetState() != Coroutine::READY) {
LOG << "Coroutine task's state should be READY!\n";
assert(false);
}
task = *iter;
tasks_.erase(iter++);
active_threads_++;
break;
}
// 有任务可以去执行,需要tickle一下
tickle |= (iter != tasks_.end());
}
if (tickle) {
Tickle();
}

// 子协程执行完毕后yield会回到Run()中
if (task.coroutine_) {
// 任务类型为协程
task.coroutine_->Resume();
active_threads_--;
} else if (task.callback_) {
// 任务类型为回调函数
if (callback_co) {
callback_co->Reset(task.callback_);
} else {
callback_co = std::make_shared<Coroutine>(task.callback_);
}
callback_co->Resume();
active_threads_--;
} else {
// 无任务,任务队列为空
if (idle_co->GetState() == Coroutine::FINISH) {
LOG << "Idle coroutine finish\n";
break;
}
idle_threads_++;
idle_co->Resume(); // Idle最后Yeild时回到这里
idle_threads_--;
}
}
LOG << "Scheduler Run() exit\n";
}

这个Scheduler是一个很简单的调度器,要对任务做更好的调度,少不了Idle协程的帮助。Idle协程的具体实现要在之后的IOManager中,其继承自Scheduler,重写了Tickle()Idle()等函数,并且使用epoll来实现在不同的 I/O 事件发生时,触发相应的处理逻辑。这使得程序可以以非阻塞的方式处理多个 I/O 操作,而不必等待每个操作完成后再进行下一个操作。