【动手写协程库 4】IO协程调度器

【动手写协程库】系列笔记是学习sylar的协程库时的记录,参考了从零开始重写sylar C++高性能分布式服务器框架和代码随想录中的文档。文章并不是对所有代码的详细解释,而是为了自己理解一些片段所做的笔记。

IOManager类中具体定义实现可以在这里查看:Github: src/iomanager.cpp

之前实现的协程调度器的功能其实非常简单,当添加任务后调度器只是单纯的从任务队列中取出任务交给协程去执行。sylar的协程库的关注对象是网络IO,如果采用这么简单的调度就根本没有用到协程的精髓。

sylar的IO协程调度解决了之前调度器在idle状态下忙等待导致CPU占用率高的问题。IO协程调度器使用一对管道fd来tickle调度协程,当调度器空闲时,idle协程通过epoll_wait阻塞在管道的读描述符上,等管道的可读事件。添加新任务时,tickle方法写管道,idle协程检测到管道可读后退出,调度器执行调度。

IOManager API

IOManager的API如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
#ifndef IOMANAGER_H_
#define IOMANAGER_H_

#include <cstddef>
#include <sys/epoll.h>

#include <atomic>
#include <functional>
#include <mutex>
#include <shared_mutex>
#include <vector>

#include "coroutine.h"
#include "scheduler.h"
#include "timer.h"

// 事件:无、读、写
enum Event {
NONE = 0x0,
READ = EPOLLIN,
WRITE = EPOLLOUT,
};

// IO协程调度
class IOManager : public Scheduler, public TimerManager {
public:
IOManager(size_t threads = 1, bool use_caller = true, const std::string& name = "IOManager");

~IOManager();

bool AddEvent(int fd, Event event, std::function<void()> cb = nullptr);

bool DelEvent(int fd, Event event);

bool CancelEvent(int fd, Event event);

bool CancelAllEvent(int fd);

static IOManager* GetIOManager();

protected:
void Idle() override;

void Tickle() override;

void OnTimerInsertAtFront() override;

bool IsStop() override;

void ResizeContexts(size_t size);

private:
// socket fd 上下文
struct FdContext {
struct EventContext {
Scheduler* scheduler = nullptr;
Coroutine::Ptr coroutine;
std::function<void()> callback;
};

// 根据类型获取对应的上下文
EventContext& GetEventContext(Event& e);

void ResetEventContext(EventContext& ectx);

void TriggerEvent(Event e);

EventContext read_ctx, write_ctx;
int fd;
Event events = Event::NONE;
std::mutex mutex;
};

private:
int epfd_;
int tickle_fd_[2];
std::atomic_size_t pending_evt_cnt_;
std::mutex mutex_;
std::shared_mutex rw_mutex_;
// 利用fd作为下标来获取对应的FdContext*,也可以使用哈希表代替
std::vector<FdContext*> fd_contexts_;
};

#endif /* IOMANAGER_H_ */

Scheduler::Run()

我们可以先回顾一下Scheduler::Run()这个函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
void Scheduler::Run() {
LOG << "Scheduler running...\n";
SetHookFlag(true);
SetThisAsScheduler();

// 如果当前线程不是调度器所在线程,设置调度的协程为当前线程运行的协程
if (std::this_thread::get_id() != sched_id_) {
sched_coroutine = Coroutine::GetNowCoroutine().get();
}

Coroutine::Ptr idle_co = std::make_shared<Coroutine>([this] { this->Idle(); });
Coroutine::Ptr callback_co;

SchedulerTask task;

while (true) {
task.Reset();
bool tickle = false;
{
std::lock_guard lock(mutex_);
auto iter = tasks_.begin();
while (iter != tasks_.end()) {
// 当前遍历的task已经分配了线程去执行且这个线程不是当前线程,则不用管
if (iter->thread_id_ && *iter->thread_id_ != std::this_thread::get_id()) {
++iter;
tickle = true;
continue;
}
if (iter->coroutine_ && iter->coroutine_->GetState() != Coroutine::READY) {
LOG << "Coroutine task's state should be READY!\n";
assert(false);
}
task = *iter;
tasks_.erase(iter++);
active_threads_++;
break;
}
// 有任务可以去执行,需要tickle一下
tickle |= (iter != tasks_.end());
}
if (tickle) {
Tickle();
}

// 子协程执行完毕后yield会回到Run()中
// 注意,每次运行了一个task后需要Reset一下
if (task.coroutine_) {
// 任务类型为协程
task.coroutine_->Resume();
active_threads_--;
task.Reset();
} else if (task.callback_) {
// 任务类型为回调函数,将其包装为协程
if (callback_co) {
callback_co->Reset(task.callback_);
} else {
callback_co = std::make_shared<Coroutine>(task.callback_);
}
callback_co->Resume();
active_threads_--;
callback_co.reset();
task.Reset();
} else {
// 无任务,任务队列为空
if (idle_co->GetState() == Coroutine::FINISH) {
LOG << "Idle coroutine finish\n";
break;
}
idle_threads_++;
idle_co->Resume(); // Idle最后Yeild时回到这里
idle_threads_--;
}
}
LOG << "Scheduler Run() exit\n";
}

这个函数就是每个线程会启动的协程调度函数,负责管理和执行任务队列中的任务,包括协程和回调函数两种类型的任务,如果任务队列为空,则执行Idle协程。

Scheduler::Idle()函数中,仅仅只是做了一个简单的处理:调度器没有停止就让出当前正在执行的协程,我们要做的增强后的IOManager需要重写Idle函数,让它不断等待事件、处理事件、然后再次等待事件的循环过程,它在没有其他协程运行时保持系统的活跃度,并在有事件发生时进行相应的处理。

重写Idle函数

IOManager中,我们就需要重写Idle函数,我们需要它是一个不断等待事件、处理事件、然后再次等待事件的循环过程,它在没有其他协程运行时保持系统的活跃度,并在有事件发生时进行相应的处理:

  1. 我们会先找到最近一个定时器的超时时间,并将其与自定义最长超时时间(源码中是5s)进行比较取最小者作为epoll_wait的超时时间
  2. 将超时的定时器的回调函数加入调度器
  3. 处理epoll_wait送来的事件
  4. 将当前线程运行的协程暂停(也就是暂停Idle协程),并将执行权交给调度协程(Scheduler::Run()
  5. 从1.又开始重复执行

具体操作可看代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
void IOManager::Idle() {
LOG << "idle coroutine start up\n";

const int MAX_EVENTS = 256;
const int MAX_TIMEOUT = 5000;
epoll_event events[MAX_EVENTS]{};

while (true) {
// LOG << "in idle now\n";
if (IsStop()) {
LOG << GetName() << "idle stop now\n";
break;
}

uint64_t next_timeout = GetNextTimerInterval();
int triggered_events;
do {
// 如果时间堆中有超时的定时器,则比较这个超时定时器的下一次触发的时间与MAX_TIMEOUT(5s),选取最小值作为超时时间
next_timeout = next_timeout != ~0ull ? std::min(static_cast<int>(next_timeout), MAX_TIMEOUT) : MAX_TIMEOUT;

// 没有事件到来时会阻塞在epoll_wait上,除非到了超时时间
triggered_events = epoll_wait(epfd_, events, MAX_EVENTS, static_cast<int>(next_timeout));
if (triggered_events < 0 && errno == EINTR) {
continue;
} else {
break;
}
} while (true); // 用while(true)的目的是确保在出现特定错误情况时能够重新尝试执行 epoll_wait

// 将超时的定时器的回调函数加入调度器
// 这些回调函数的作用可能是关闭连接等操作
std::vector<std::function<void()>> cbs = GetExpiredCbList();
for (auto& cb : cbs) {
Sched(cb);
}

// 处理事件
for (int i = 0; i < triggered_events; i++) {
epoll_event& event = events[i];
// 是一个用于通知协程调度的事件
// epoll中监听了用于通知的管道读端fd,当有数据到时即会触发
if (event.data.fd == tickle_fd_[0]) {
char buf[256]{};
// 将管道内的数据读完
while (read(tickle_fd_[0], buf, sizeof(buf)) > 0)
;
continue;
}

// FdContext* fd_ctx = (FdContext*) event.data.ptr;
FdContext* fd_ctx = static_cast<FdContext*>(event.data.ptr);
std::lock_guard lock(fd_ctx->mutex);

// 发生错误时,如果原来的文件描述符上下文(fd_ctx)中有可读或可写事件标志被设置,那么现在将重新触发这些事件
if (event.events & (EPOLLERR | EPOLLHUP)) {
event.events |= (EPOLLIN | EPOLLOUT) & fd_ctx->events;
}

// 获取fd_ctx对应的事件
int real_event = Event::NONE;
if (event.events & EPOLLIN) {
real_event |= Event::READ;
}
if (event.events & EPOLLOUT) {
real_event |= Event::WRITE;
}

if ((fd_ctx->events & real_event) == Event::NONE) {
continue;
}

// 如果还有剩余事件,则修改;否则将其从epoll中删除
// 注意获取rest_events时不是使用的event.events & ~real_event,因为是要去除fd_ctx->fd中本次触发的事件
int rest_events = fd_ctx->events & ~real_event;
int op = rest_events ? EPOLL_CTL_MOD : EPOLL_CTL_DEL;
event.events = EPOLLET | rest_events;
if (epoll_ctl(epfd_, op, fd_ctx->fd, &event) < 0) {
LOG_ERROR << strerror(errno) << "\n";
continue;
}

if (real_event & Event::READ) {
fd_ctx->TriggerEvent(Event::READ);
--pending_evt_cnt_;
}
if (real_event & Event::WRITE) {
fd_ctx->TriggerEvent(Event::WRITE);
--pending_evt_cnt_;
}
}

// 将当前线程运行的协程暂停(也就是暂停Idle协程),并将执行权交给调度协程
Coroutine::Ptr co_ptr = Coroutine::GetNowCoroutine();
auto co = co_ptr.get();
co_ptr.reset();
co->Yield();
}
}

添加事件

IOManager除了重写Idle函数这个重要点外,还有个重要点就是为指定文件描述符添加事件。

IOManager内部有一个[FdContext](#IOManger API)结构体用来封装socket fd的上下文(需要绑定的回调函数,对应的事件、协程),并使用一个vector保存这些FdContext。

我们在添加fd的事件时,需要将其加入vector中,并且需要通过epoll_ctl注册fd的对应事件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
bool IOManager::AddEvent(int fd, Event event, std::function<void()> cb) {
FdContext* fd_ctx = nullptr;
{
std::shared_lock rw_lock(rw_mutex_);
if (fd_contexts_.size() > fd) {
fd_ctx = fd_contexts_[fd];
rw_lock.unlock();
} else {
rw_lock.unlock();
std::unique_lock rw_lock2(rw_mutex_);
ResizeContexts(fd * 1.5);
fd_ctx = fd_contexts_[fd];
}
}

std::lock_guard lock(mutex_);
if (fd_ctx->events & event) {
LOG_ERROR << "A fd can't add same event\n";
return false;
}

int op = fd_ctx->events ? EPOLL_CTL_MOD : EPOLL_CTL_ADD;
epoll_event ep_evt{};
ep_evt.events = static_cast<int>(fd_ctx->events) | EPOLLET | event;
ep_evt.data.ptr = fd_ctx; // 在Idle()中将使用fd对应的这个ep_evt

int ret = epoll_ctl(epfd_, op, fd, &ep_evt);
if (ret) {
LOG_ERROR << "epoll_ctl " << strerror(errno);
return false;
}

++pending_evt_cnt_;

// 设置fd对应事件的EventContext
fd_ctx->events = static_cast<Event>(fd_ctx->events | event);
// 使用event_ctx相当于使用fd_ctx->read_ctx or fd_ctx->write_ctx(注意是auto&而不是auto)
auto& event_ctx = fd_ctx->GetEventContext(event);
assert(!event_ctx.scheduler && !event_ctx.callback && !event_ctx.coroutine);
event_ctx.scheduler = Scheduler::GetScheduler();
if (cb) {
event_ctx.callback = cb;
} else {
// 设置fd相关事件触发时使用的协程为当前
event_ctx.coroutine = Coroutine::GetNowCoroutine();
assert(event_ctx.coroutine->GetState() == Coroutine::RUNNING);
}
return true;
}