【动手写协程库】系列笔记是学习sylar的协程库时的记录,参考了从零开始重写sylar C++高性能分布式服务器框架和代码随想录中的文档。文章并不是对所有代码的详细解释,而是为了自己理解一些片段所做的笔记。
IOManager
类中具体定义实现可以在这里查看:Github: src/iomanager.cpp
之前实现的协程调度器的功能其实非常简单,当添加任务后调度器只是单纯的从任务队列中取出任务交给协程去执行。sylar的协程库的关注对象是网络IO,如果采用这么简单的调度就根本没有用到协程的精髓。
sylar的IO协程调度解决了之前调度器在idle状态下忙等待导致CPU占用率高的问题。IO协程调度器使用一对管道fd来tickle调度协程,当调度器空闲时,idle协程通过epoll_wait阻塞在管道的读描述符上,等管道的可读事件。添加新任务时,tickle方法写管道,idle协程检测到管道可读后退出,调度器执行调度。
IOManager API
IOManager
的API如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85
| #ifndef IOMANAGER_H_ #define IOMANAGER_H_
#include <cstddef> #include <sys/epoll.h>
#include <atomic> #include <functional> #include <mutex> #include <shared_mutex> #include <vector>
#include "coroutine.h" #include "scheduler.h" #include "timer.h"
enum Event { NONE = 0x0, READ = EPOLLIN, WRITE = EPOLLOUT, };
class IOManager : public Scheduler, public TimerManager { public: IOManager(size_t threads = 1, bool use_caller = true, const std::string& name = "IOManager");
~IOManager();
bool AddEvent(int fd, Event event, std::function<void()> cb = nullptr);
bool DelEvent(int fd, Event event);
bool CancelEvent(int fd, Event event);
bool CancelAllEvent(int fd);
static IOManager* GetIOManager();
protected: void Idle() override;
void Tickle() override;
void OnTimerInsertAtFront() override;
bool IsStop() override;
void ResizeContexts(size_t size);
private: struct FdContext { struct EventContext { Scheduler* scheduler = nullptr; Coroutine::Ptr coroutine; std::function<void()> callback; };
EventContext& GetEventContext(Event& e);
void ResetEventContext(EventContext& ectx);
void TriggerEvent(Event e);
EventContext read_ctx, write_ctx; int fd; Event events = Event::NONE; std::mutex mutex; };
private: int epfd_; int tickle_fd_[2]; std::atomic_size_t pending_evt_cnt_; std::mutex mutex_; std::shared_mutex rw_mutex_; std::vector<FdContext*> fd_contexts_; };
#endif
|
Scheduler::Run()
我们可以先回顾一下Scheduler::Run()
这个函数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75
| void Scheduler::Run() { LOG << "Scheduler running...\n"; SetHookFlag(true); SetThisAsScheduler();
if (std::this_thread::get_id() != sched_id_) { sched_coroutine = Coroutine::GetNowCoroutine().get(); }
Coroutine::Ptr idle_co = std::make_shared<Coroutine>([this] { this->Idle(); }); Coroutine::Ptr callback_co;
SchedulerTask task;
while (true) { task.Reset(); bool tickle = false; { std::lock_guard lock(mutex_); auto iter = tasks_.begin(); while (iter != tasks_.end()) { if (iter->thread_id_ && *iter->thread_id_ != std::this_thread::get_id()) { ++iter; tickle = true; continue; } if (iter->coroutine_ && iter->coroutine_->GetState() != Coroutine::READY) { LOG << "Coroutine task's state should be READY!\n"; assert(false); } task = *iter; tasks_.erase(iter++); active_threads_++; break; } tickle |= (iter != tasks_.end()); } if (tickle) { Tickle(); }
if (task.coroutine_) { task.coroutine_->Resume(); active_threads_--; task.Reset(); } else if (task.callback_) { if (callback_co) { callback_co->Reset(task.callback_); } else { callback_co = std::make_shared<Coroutine>(task.callback_); } callback_co->Resume(); active_threads_--; callback_co.reset(); task.Reset(); } else { if (idle_co->GetState() == Coroutine::FINISH) { LOG << "Idle coroutine finish\n"; break; } idle_threads_++; idle_co->Resume(); idle_threads_--; } } LOG << "Scheduler Run() exit\n"; }
|
这个函数就是每个线程会启动的协程调度函数,负责管理和执行任务队列中的任务,包括协程和回调函数两种类型的任务,如果任务队列为空,则执行Idle协程。
在Scheduler::Idle()
函数中,仅仅只是做了一个简单的处理:调度器没有停止就让出当前正在执行的协程,我们要做的增强后的IOManager需要重写Idle函数,让它不断等待事件、处理事件、然后再次等待事件的循环过程,它在没有其他协程运行时保持系统的活跃度,并在有事件发生时进行相应的处理。
重写Idle函数
在IOManager
中,我们就需要重写Idle函数,我们需要它是一个不断等待事件、处理事件、然后再次等待事件的循环过程,它在没有其他协程运行时保持系统的活跃度,并在有事件发生时进行相应的处理:
- 我们会先找到最近一个定时器的超时时间,并将其与自定义最长超时时间(源码中是5s)进行比较取最小者作为
epoll_wait
的超时时间
- 将超时的定时器的回调函数加入调度器
- 处理
epoll_wait
送来的事件
- 将当前线程运行的协程暂停(也就是暂停Idle协程),并将执行权交给调度协程(
Scheduler::Run()
)
- 从1.又开始重复执行
具体操作可看代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98
| void IOManager::Idle() { LOG << "idle coroutine start up\n";
const int MAX_EVENTS = 256; const int MAX_TIMEOUT = 5000; epoll_event events[MAX_EVENTS]{};
while (true) { if (IsStop()) { LOG << GetName() << "idle stop now\n"; break; }
uint64_t next_timeout = GetNextTimerInterval(); int triggered_events; do { next_timeout = next_timeout != ~0ull ? std::min(static_cast<int>(next_timeout), MAX_TIMEOUT) : MAX_TIMEOUT;
triggered_events = epoll_wait(epfd_, events, MAX_EVENTS, static_cast<int>(next_timeout)); if (triggered_events < 0 && errno == EINTR) { continue; } else { break; } } while (true);
std::vector<std::function<void()>> cbs = GetExpiredCbList(); for (auto& cb : cbs) { Sched(cb); }
for (int i = 0; i < triggered_events; i++) { epoll_event& event = events[i]; if (event.data.fd == tickle_fd_[0]) { char buf[256]{}; while (read(tickle_fd_[0], buf, sizeof(buf)) > 0) ; continue; }
FdContext* fd_ctx = static_cast<FdContext*>(event.data.ptr); std::lock_guard lock(fd_ctx->mutex);
if (event.events & (EPOLLERR | EPOLLHUP)) { event.events |= (EPOLLIN | EPOLLOUT) & fd_ctx->events; }
int real_event = Event::NONE; if (event.events & EPOLLIN) { real_event |= Event::READ; } if (event.events & EPOLLOUT) { real_event |= Event::WRITE; }
if ((fd_ctx->events & real_event) == Event::NONE) { continue; }
int rest_events = fd_ctx->events & ~real_event; int op = rest_events ? EPOLL_CTL_MOD : EPOLL_CTL_DEL; event.events = EPOLLET | rest_events; if (epoll_ctl(epfd_, op, fd_ctx->fd, &event) < 0) { LOG_ERROR << strerror(errno) << "\n"; continue; }
if (real_event & Event::READ) { fd_ctx->TriggerEvent(Event::READ); --pending_evt_cnt_; } if (real_event & Event::WRITE) { fd_ctx->TriggerEvent(Event::WRITE); --pending_evt_cnt_; } }
Coroutine::Ptr co_ptr = Coroutine::GetNowCoroutine(); auto co = co_ptr.get(); co_ptr.reset(); co->Yield(); } }
|
添加事件
IOManager除了重写Idle函数这个重要点外,还有个重要点就是为指定文件描述符添加事件。
IOManager内部有一个[FdContext
](#IOManger API)结构体用来封装socket fd的上下文(需要绑定的回调函数,对应的事件、协程),并使用一个vector保存这些FdContext。
我们在添加fd的事件时,需要将其加入vector中,并且需要通过epoll_ctl
注册fd的对应事件。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
| bool IOManager::AddEvent(int fd, Event event, std::function<void()> cb) { FdContext* fd_ctx = nullptr; { std::shared_lock rw_lock(rw_mutex_); if (fd_contexts_.size() > fd) { fd_ctx = fd_contexts_[fd]; rw_lock.unlock(); } else { rw_lock.unlock(); std::unique_lock rw_lock2(rw_mutex_); ResizeContexts(fd * 1.5); fd_ctx = fd_contexts_[fd]; } }
std::lock_guard lock(mutex_); if (fd_ctx->events & event) { LOG_ERROR << "A fd can't add same event\n"; return false; }
int op = fd_ctx->events ? EPOLL_CTL_MOD : EPOLL_CTL_ADD; epoll_event ep_evt{}; ep_evt.events = static_cast<int>(fd_ctx->events) | EPOLLET | event; ep_evt.data.ptr = fd_ctx;
int ret = epoll_ctl(epfd_, op, fd, &ep_evt); if (ret) { LOG_ERROR << "epoll_ctl " << strerror(errno); return false; }
++pending_evt_cnt_;
fd_ctx->events = static_cast<Event>(fd_ctx->events | event); auto& event_ctx = fd_ctx->GetEventContext(event); assert(!event_ctx.scheduler && !event_ctx.callback && !event_ctx.coroutine); event_ctx.scheduler = Scheduler::GetScheduler(); if (cb) { event_ctx.callback = cb; } else { event_ctx.coroutine = Coroutine::GetNowCoroutine(); assert(event_ctx.coroutine->GetState() == Coroutine::RUNNING); } return true; }
|