【动手写协程库】系列笔记是学习sylar的协程库时的记录,参考了从零开始重写sylar C++高性能分布式服务器框架和代码随想录中的文档。文章并不是对所有代码的详细解释,而是为了自己理解一些片段所做的笔记。

Scheduler类中其他函数的定义可以在这里查看:Github: src/scheduler.cpp

Sylar的协程调度器是一个N-M模型,意味着N个线程可以运行M个协程,协程能够在线程之间进行切换,也可以被绑定到特定的线程上执行。

调度器可以由应用程序中的任何线程创建,但创建它的线程(称为caller线程)可以选择是否参与协程的调度。如果caller线程参与调度,那么调度器的线程数会相应减少一个,因为caller线程本身也会作为一个调度线程。

Scheduler相关API如下:

#ifndef SCHEDULER_H_
#define SCHEDULER_H_

#include <atomic>
#include <cstddef>
#include <functional>
#include <iostream>
#include <list>
#include <memory>
#include <mutex>
#include <thread>
#include <utility>
#include <vector>
#include "coroutine.h"
#include "util.h"

// 协程调度器
class Scheduler {
private:
    // 调度任务,任务类型可以是协程/函数二选一,并且可指定调度线程
    using ThreadIdPtr = std::shared_ptr<std::thread::id>;

    struct SchedulerTask {
        ThreadIdPtr thread_id_;
        Coroutine::Ptr coroutine_;
        std::function<void()> callback_;

        SchedulerTask() {}

        SchedulerTask(Coroutine::Ptr co, ThreadIdPtr id)
            : coroutine_(co)
            , thread_id_(std::move(id)) {}

        SchedulerTask(std::function<void()> callback, ThreadIdPtr id)
            : callback_(callback)
            , thread_id_(std::move(id)) {}

        void Reset() {
            thread_id_.reset();
            callback_ = nullptr;
            coroutine_ = nullptr;
        }
    };

public:
    Scheduler(size_t threads = 1, bool use_caller = true, const std::string& name = "scheduler");

    virtual ~Scheduler();

    std::string GetName() const {
        return name_;
    }

    void Start();

    void Stop();

    template <typename TaskType>
    void Sched(TaskType t, ThreadIdPtr id = nullptr) requires(std::invocable<TaskType> || std::same_as<TaskType, Coroutine::Ptr>) {
        bool is_need_tick = false;

        {
            std::lock_guard lock(mutex_);
            is_need_tick = tasks_.empty();
            SchedulerTask task(t, id);
            if (task.callback_ || task.coroutine_) {
                tasks_.push_back(task);
            }
        }

        if (is_need_tick) {
            Tickle();
        }
    }

public:
    static Scheduler* GetScheduler();

    static Coroutine* GetSchedCoroutine();

protected:
    virtual void Tickle();

    void Run();

    void SetThisAsScheduler();

    virtual void Idle();

    virtual bool IsStop();

    bool HasIdleThreads() {
        return idle_threads_ > 0;
    }

private:
    std::string name_;
    std::mutex mutex_;
    std::vector<std::thread> thread_pool_;
    std::vector<std::thread::id> thread_ids_;
    std::list<SchedulerTask> tasks_;

    size_t threads_size_;
    std::atomic_size_t active_threads_{0};
    std::atomic_size_t idle_threads_{0};

    std::thread::id sched_id_; // use_caller为true时,调度器所在的线程id
    Coroutine::Ptr sched_co_;  // use_caller为true时调度器所在线程的调度协程
    bool is_stop_;
    bool is_use_caller_;
};

#endif /* SCHEDULER_H_ */

调度器的工作流大致为:

  • 协程调度器在初始化时可传入线程数和一个布尔型的use_caller参数,表示是否使用caller线程。在使用caller线程的情况下,线程数自动减一,并且调度器内部会初始化一个属于caller线程的调度协程并保存起来(比如,在main函数中创建的调度器,如果use_caller为true,那调度器会初始化一个属于main函数线程的调度协程)。
  • 调度器创建好后 ,即可调用调度器的Sched函数向调度器添加调度任务,但此时调度器并不会立刻执行这些任务,而是将它们保存到内部的一个任务队列中。
  • 调用Scheduler::Start()函数启动调度 。调用Start会创建调度线程池,线程数量由初始化时的线程数和use_caller确定。调度线程一旦创建,就会立刻从任务队列里取任务执行。比较特殊的一点是,如果初始化时指定线程数为1且use_caller为true,那么Start方法什么也不做,因为不需要创建新线程用于调度。并且,由于没有创建新的调度线程,那只能由caller线程的调度协程来负责调度协程(这里有点绕),而caller线程的调度协程的执行时机与Start函数并不在同一个地方。caller线程的调度协程的执行时机在Stop函数中。
  • 接下来是调度协程,对应Scheduler::Run() 。调度协程负责从调度器的任务队列中取任务执行。取出的任务即子协程,每个子协程执行完后都必须返回调度协程,由调度协程重新从任务队列中取新的协程并执行。如果任务队列空了,那么调度协程会切换到一个idle协程,等有新任务进来时,idle协程才会退出并回到调度协程,重新开始下一轮调度。(在Scheduler中,idle函数的定义十分简单粗暴,因为实际使用协程库时并不是直接使用Scheduler类,而是使用它的派生类,在派生类中将会实现更为完善的调度)
  • 接下来是添加调度任务,对应Scheduler::Sched() ,这个方法支持传入协程或函数,并且支持一个线程id参数,表示是否将这个协程或函数绑定到一个具体的线程上执行。如果任务队列为空,那么在添加任务之后,要调用一次tickle方法以通知各调度线程的调度协程有新任务来了。在执行调度任务时,还可以通过调度器的GetScheduler()获取到当前调度器,再通过Sched函数继续添加新的任务,这就变相实现了在子协程中创建并运行新的子协程的功能。
  • 接下来是调度器的停止。调度器的停止行为要分两种情况讨论,首先是use_caller为false的情况,这种情况下,由于没有使用caller线程进行调度,那么只需要简单地等各个调度线程的调度协程退出就行了。如果use_caller为true,表示caller线程也要参于调度,这时,调度器初始化时记录的属于caller线程的调度协程就要起作用了,在调度器停止前,应该让这个caller线程的调度协程也运行一次,让caller线程完成调度工作后再退出。如果调度器只使用了caller线程进行调度,那么所有的调度任务要在调度器停止时才会被调度。

调度器中最重要的一个函数我认为就是Run()函数了,这个函数用于协程的调度,或者,你可以将他理解为是一个调度协程(名词)。

创建Scheduler时会为每一个内部线程池中的每一个线程都绑定一个调度协程,线程数量默认为1,此时也默认会使用caller线程,也就是使用的主线程。调度协程Scheduler::Run()会从任务队列Task Queue中不断去取任务去执行。如果有任务可执行,那就切换至任务协程执行,任务协程执行完毕后又切换回调度协程;无任务执行时,调度协程切换至Idle协程进行等待。

// 用于协程的调度
void Scheduler::Run() {
    LOG << "Scheduler running...\n";
    SetThisAsScheduler();

    // 如果当前线程不是调度器所在线程,设置调度的协程为当前线程运行的协程
    if (std::this_thread::get_id() != sched_id_) {
        sched_coroutine = Coroutine::GetNowCoroutine().get();
    }

    Coroutine::Ptr idle_co = std::make_shared<Coroutine>([this] { this->Idle(); });
    Coroutine::Ptr callback_co;

    SchedulerTask task;

    while (true) {
        task.Reset();
        bool tickle = false;
        {
            std::lock_guard lock(mutex_);
            auto iter = tasks_.begin();
            while (iter != tasks_.end()) {
                // 当前遍历的task已经分配了线程去执行且这个线程不是当前线程,则不用管
                if (iter->thread_id_ && *iter->thread_id_ != std::this_thread::get_id()) {
                    ++iter;
                    tickle = true;
                    continue;
                }
                if (iter->coroutine_ && iter->coroutine_->GetState() != Coroutine::READY) {
                    LOG << "Coroutine task's state should be READY!\n";
                    assert(false);
                }
                task = *iter;
                tasks_.erase(iter++);
                active_threads_++;
                break;
            }
            // 有任务可以去执行,需要tickle一下
            tickle |= (iter != tasks_.end());
        }
        if (tickle) {
            Tickle();
        }

        // 子协程执行完毕后yield会回到Run()中
        if (task.coroutine_) {
            // 任务类型为协程
            task.coroutine_->Resume();
            active_threads_--;
        } else if (task.callback_) {
            // 任务类型为回调函数
            if (callback_co) {
                callback_co->Reset(task.callback_);
            } else {
                callback_co = std::make_shared<Coroutine>(task.callback_);
            }
            callback_co->Resume();
            active_threads_--;
        } else {
            // 无任务,任务队列为空
            if (idle_co->GetState() == Coroutine::FINISH) {
                LOG << "Idle coroutine finish\n";
                break;
            }
            idle_threads_++;
            idle_co->Resume(); // Idle最后Yeild时回到这里
            idle_threads_--;
        }
    }
    LOG << "Scheduler Run() exit\n";
}

这个Scheduler是一个很简单的调度器,要对任务做更好的调度,少不了Idle协程的帮助。Idle协程的具体实现要在之后的IOManager中,其继承自Scheduler,重写了Tickle()Idle()等函数,并且使用epoll来实现在不同的 I/O 事件发生时,触发相应的处理逻辑。这使得程序可以以非阻塞的方式处理多个 I/O 操作,而不必等待每个操作完成后再进行下一个操作。