Codesire-Deng / co_context

A coroutine framework aimed at high-concurrency io with reasonable latency, based on io_uring.
Apache License 2.0
527 stars 46 forks source link

如何拓展lazy_io的范围 #93

Open wangzhankun opened 11 months ago

wangzhankun commented 11 months ago

我今天在学习io_uring时,发现其设计与我在用户态实现的异步IO几乎完全一致。进程间的异步IO是为了实现异步远程函数调用:

  1. 在两个进程之间通过共享内存的方式传输数据
  2. 共享内存被分为两个区域,A进程可读可写但B进程只读(类似io_uring的SQ);A进程可读但B进程可读可写(类似io_uring的CQ)
  3. 这两个队列是单生产者、单消费者模型,提供无锁接口,内部使用内存屏障做同步
  4. 当A进程发起RPC时,需要将参数以及要调用的函数ID写入到共享内存中,B进程在通过某种方法监测到有调用发生时,就会从共享内存中提取数据然后调用函数,之后将函数返回值写入到CQ
  5. A进程与B进程可以通过用户态中断(可以理解为signal机制,但是不需要Linux 内核参与,而是硬件实现的跨核中断)相互进行通知。(这样实现出来的就类似于io_uring的中断驱动机制)
  6. A进程和B进程也可以通过轮询的方法监测共享内存,类似于io_uring的轮询模式
  7. A进程异步,B进程轮询监测内存,类似于io_uring的内核轮询模式

请问我该如何适配co_context,时其支持我的设计方案呢?如果您方便的话,我想参考一下您的论文,如果不方便公开的话,可以发送到我的邮箱:bitwangzhankun@gmail.com

Codesire-Deng commented 11 months ago

你的设计方案包含一个完整的调度器,这是整个框架运行的核心逻辑。在 co_context 里,lazy_io 只负责告诉调度器“我想要等待什么”,调度器 io_context 的责任是监测结果和恢复协程。所以结论是:

  1. 你需要修改 io_context 的核心逻辑。它要能处理中断,或许还需要发起中断。(核心逻辑位于io_context::run() https://github.com/Codesire-Deng/co_context/blob/main/lib/co_context/io_context.cpp#L126-L151
  2. 你需要实现类似 lazy_io 的 C++20 的 awaiter。以 RPC 中的 caller 为例,awaiter 负责暂停协程,写共享内存,令 io_context 发出请求并监测结果,等待被 io_context 恢复;协程恢复时,读取共享内存以获得 RPC 的返回值。

awaiter 的一个 demo 是 lazy::yield https://github.com/Codesire-Deng/co_context/blob/main/include/co_context/detail/lazy_io_awaiter.hpp#L868C14-L879 。它暂停了协程,但又立即令自己加入就绪队列。

wangzhankun commented 11 months ago

我有没有可能在函数内部自定义调度呢?我看co_await的定义是会返回到caller:

task<> top1(){
    co_await top2();
}
task<> top2(){
    .... // 调度策略,如果接收到了用户态中断,选择被挂起的top3函数
    co_await top3();
}
task<> top3(){
}

我在top2中定义一下调度策略呢?或者说把调度策略作为函数添加到io_context中,然后在top2中调用。这样尽可能减少代码修改量。

wangzhankun commented 11 months ago

想到了侵入比较小的方法:

  1. rpc caller运行在单独的io_context线程中
  2. 实例化io_context时在构造函数传入一个bool判定是否是rpc的线程,如果是则run使用rpc的调度策略
Codesire-Deng commented 11 months ago

不侵入的办法是有的,可以在 io_context 下再挂一个你的 scheduler。但这样会遇到一个问题:当你管理的所有协程都陷入暂停,io_context 发现就绪队列里面没有任何东西,io_uring 里也没有任何 IO,就会结束自己。但事实上稍后到来的中断会使一些协程加入到就绪队列,但 io_context 对此一无所知。

针对这个问题,一个办法是再启动一个 io_context,使它们不敢主动退出。

你可以试试运行这个 demo,留意 ready_queue 并非线程安全:

#include <co_context/all.hpp>
#include <queue>

using co_context::task;
using co_context::counting_semaphore;

struct scheduler {
    counting_semaphore ready_count{0};
    std::queue<std::coroutine_handle<>> ready_queue; // NOT thread-safe
    bool poison{false};                              // NOT thread-safe

    void stop() noexcept {
        poison = true;
        ready_count.release();
    }

    void post(std::coroutine_handle<> job_handle) {
        ready_queue.push(job_handle);
        ready_count.release();
    }

    void post(task<> &&job) {
        std::coroutine_handle<> job_handle = job.get_handle();
        job.detach(); // prevent ~task() from killing the coroutine
        post(job_handle);
    }

    task<> run() {
        while (!poison) {
            printf("scheduler::run(): acquire...\n");
            co_await ready_count.acquire();
            printf("scheduler::run(): task found\n");
            if (poison) { // double check the poison
                break;
            }
            std::coroutine_handle<> handle = ready_queue.front();
            ready_queue.pop();
            handle.resume();
        }
    }
};

struct my_awaiter {
    constexpr bool await_ready() noexcept { return false; }

    void await_suspend(std::coroutine_handle<> current) noexcept {
        printf("my_awaiter: suspend\n");
        // Do your job. When finished, re-post it anytime, anywhere.
        my_context->post(current);
    }

    void await_resume() const noexcept { printf("my_awaiter: resume\n"); }

    scheduler *my_context;
};

task<> my_job(scheduler &my_context) {
    printf("my_job: begin\n");
    co_await my_awaiter{&my_context};
    printf("my_job: end\n");
}

int main() {
    co_context::io_context ctx;
    co_context::io_context background;

    scheduler sch;
    sch.post(my_job(sch)); // post the first job

    ctx.co_spawn(sch.run()); // launch the scheduler
    ctx.start();
    background.start();
    ctx.join();
    return 0;
}