【动手写协程库】系列笔记是学习sylar的协程库时的记录,参考了从零开始重写sylar C++高性能分布式服务器框架和代码随想录中的文档。文章并不是对所有代码的详细解释,而是为了自己理解一些片段所做的笔记。
Scheduler
类中其他函数的定义可以在这里查看:Github: src/scheduler.cpp
Sylar的协程调度器是一个N-M模型,意味着N个线程可以运行M个协程,协程能够在线程之间进行切换,也可以被绑定到特定的线程上执行。
调度器可以由应用程序中的任何线程创建,但创建它的线程(称为caller线程)可以选择是否参与协程的调度。如果caller线程参与调度,那么调度器的线程数会相应减少一个,因为caller线程本身也会作为一个调度线程。
Scheduler相关API如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113
| #ifndef SCHEDULER_H_ #define SCHEDULER_H_
#include <atomic> #include <cstddef> #include <functional> #include <iostream> #include <list> #include <memory> #include <mutex> #include <thread> #include <utility> #include <vector> #include "coroutine.h" #include "util.h"
class Scheduler { private: using ThreadIdPtr = std::shared_ptr<std::thread::id>;
struct SchedulerTask { ThreadIdPtr thread_id_; Coroutine::Ptr coroutine_; std::function<void()> callback_;
SchedulerTask() {}
SchedulerTask(Coroutine::Ptr co, ThreadIdPtr id) : coroutine_(co) , thread_id_(std::move(id)) {}
SchedulerTask(std::function<void()> callback, ThreadIdPtr id) : callback_(callback) , thread_id_(std::move(id)) {}
void Reset() { thread_id_.reset(); callback_ = nullptr; coroutine_ = nullptr; } };
public: Scheduler(size_t threads = 1, bool use_caller = true, const std::string& name = "scheduler");
virtual ~Scheduler();
std::string GetName() const { return name_; }
void Start();
void Stop();
template <typename TaskType> void Sched(TaskType t, ThreadIdPtr id = nullptr) requires(std::invocable<TaskType> || std::same_as<TaskType, Coroutine::Ptr>) { bool is_need_tick = false;
{ std::lock_guard lock(mutex_); is_need_tick = tasks_.empty(); SchedulerTask task(t, id); if (task.callback_ || task.coroutine_) { tasks_.push_back(task); } }
if (is_need_tick) { Tickle(); } }
public: static Scheduler* GetScheduler();
static Coroutine* GetSchedCoroutine();
protected: virtual void Tickle();
void Run();
void SetThisAsScheduler();
virtual void Idle();
virtual bool IsStop();
bool HasIdleThreads() { return idle_threads_ > 0; }
private: std::string name_; std::mutex mutex_; std::vector<std::thread> thread_pool_; std::vector<std::thread::id> thread_ids_; std::list<SchedulerTask> tasks_;
size_t threads_size_; std::atomic_size_t active_threads_{0}; std::atomic_size_t idle_threads_{0};
std::thread::id sched_id_; Coroutine::Ptr sched_co_; bool is_stop_; bool is_use_caller_; };
#endif
|
调度器的工作流大致为:
- 协程调度器在初始化时可传入线程数和一个布尔型的
use_caller
参数,表示是否使用caller线程。在使用caller线程的情况下,线程数自动减一,并且调度器内部会初始化一个属于caller线程的调度协程并保存起来(比如,在main函数中创建的调度器,如果use_caller为true,那调度器会初始化一个属于main函数线程的调度协程)。
- 调度器创建好后 ,即可调用调度器的
Sched
函数向调度器添加调度任务,但此时调度器并不会立刻执行这些任务,而是将它们保存到内部的一个任务队列中。
- 调用
Scheduler::Start()
函数启动调度 。调用Start会创建调度线程池,线程数量由初始化时的线程数和use_caller确定。调度线程一旦创建,就会立刻从任务队列里取任务执行。比较特殊的一点是,如果初始化时指定线程数为1且use_caller为true,那么Start方法什么也不做,因为不需要创建新线程用于调度。并且,由于没有创建新的调度线程,那只能由caller线程的调度协程来负责调度协程(这里有点绕),而caller线程的调度协程的执行时机与Start函数并不在同一个地方。caller线程的调度协程的执行时机在Stop函数中。
- 接下来是调度协程,对应
Scheduler::Run()
。调度协程负责从调度器的任务队列中取任务执行。取出的任务即子协程,每个子协程执行完后都必须返回调度协程,由调度协程重新从任务队列中取新的协程并执行。如果任务队列空了,那么调度协程会切换到一个idle协程,等有新任务进来时,idle协程才会退出并回到调度协程,重新开始下一轮调度。(在Scheduler中,idle函数的定义十分简单粗暴,因为实际使用协程库时并不是直接使用Scheduler类,而是使用它的派生类,在派生类中将会实现更为完善的调度)
- 接下来是添加调度任务,对应
Scheduler::Sched()
,这个方法支持传入协程或函数,并且支持一个线程id参数,表示是否将这个协程或函数绑定到一个具体的线程上执行。如果任务队列为空,那么在添加任务之后,要调用一次tickle方法以通知各调度线程的调度协程有新任务来了。在执行调度任务时,还可以通过调度器的GetScheduler()
获取到当前调度器,再通过Sched
函数继续添加新的任务,这就变相实现了在子协程中创建并运行新的子协程的功能。
- 接下来是调度器的停止。调度器的停止行为要分两种情况讨论,首先是use_caller为false的情况,这种情况下,由于没有使用caller线程进行调度,那么只需要简单地等各个调度线程的调度协程退出就行了。如果use_caller为true,表示caller线程也要参于调度,这时,调度器初始化时记录的属于caller线程的调度协程就要起作用了,在调度器停止前,应该让这个caller线程的调度协程也运行一次,让caller线程完成调度工作后再退出。如果调度器只使用了caller线程进行调度,那么所有的调度任务要在调度器停止时才会被调度。
调度器中最重要的一个函数我认为就是Run()
函数了,这个函数用于协程的调度,或者,你可以将他理解为是一个调度协程(名词)。
创建Scheduler时会为每一个内部线程池中的每一个线程都绑定一个调度协程,线程数量默认为1,此时也默认会使用caller线程,也就是使用的主线程。调度协程Scheduler::Run()
会从任务队列Task Queue中不断去取任务去执行。如果有任务可执行,那就切换至任务协程执行,任务协程执行完毕后又切换回调度协程;无任务执行时,调度协程切换至Idle协程进行等待。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71
| void Scheduler::Run() { LOG << "Scheduler running...\n"; SetThisAsScheduler();
if (std::this_thread::get_id() != sched_id_) { sched_coroutine = Coroutine::GetNowCoroutine().get(); }
Coroutine::Ptr idle_co = std::make_shared<Coroutine>([this] { this->Idle(); }); Coroutine::Ptr callback_co;
SchedulerTask task;
while (true) { task.Reset(); bool tickle = false; { std::lock_guard lock(mutex_); auto iter = tasks_.begin(); while (iter != tasks_.end()) { if (iter->thread_id_ && *iter->thread_id_ != std::this_thread::get_id()) { ++iter; tickle = true; continue; } if (iter->coroutine_ && iter->coroutine_->GetState() != Coroutine::READY) { LOG << "Coroutine task's state should be READY!\n"; assert(false); } task = *iter; tasks_.erase(iter++); active_threads_++; break; } tickle |= (iter != tasks_.end()); } if (tickle) { Tickle(); }
if (task.coroutine_) { task.coroutine_->Resume(); active_threads_--; } else if (task.callback_) { if (callback_co) { callback_co->Reset(task.callback_); } else { callback_co = std::make_shared<Coroutine>(task.callback_); } callback_co->Resume(); active_threads_--; } else { if (idle_co->GetState() == Coroutine::FINISH) { LOG << "Idle coroutine finish\n"; break; } idle_threads_++; idle_co->Resume(); idle_threads_--; } } LOG << "Scheduler Run() exit\n"; }
|
这个Scheduler是一个很简单的调度器,要对任务做更好的调度,少不了Idle协程的帮助。Idle协程的具体实现要在之后的IOManager中,其继承自Scheduler,重写了Tickle()
、Idle()
等函数,并且使用epoll来实现在不同的 I/O 事件发生时,触发相应的处理逻辑。这使得程序可以以非阻塞的方式处理多个 I/O 操作,而不必等待每个操作完成后再进行下一个操作。