drogonframework / drogon

Drogon: A C++14/17/20 based HTTP web application framework running on Linux/macOS/Unix/Windows
MIT License
11.62k stars 1.12k forks source link

Intended `queueInLoopCoro()` usage #1790

Open prm-james-hill opened 1 year ago

prm-james-hill commented 1 year ago

I have a Drogon (1.8.4) server that I just upgraded to C++20 so I’ve started moving things over to coroutines. queueInLoopCoro() seems relatively new, doesn’t have any examples in https://github.com/drogonframework/drogon/wiki/ENG-16-Coroutines, and I’m relying on an internal namespace so I wanted to make sure I was using it properly. My code flow looks effectively like this:

auto callback = []() -> drogon::Task<> {
    …
    co_return;
};
drogon::internal::EventLoopAwaiter<drogon::Task<>> task = drogon::queueInLoopCoro<drogon::Task<>>(drogon::app().getIOLoop(...), std::move(callback)));
co_await co_await task;
…

It seems to work but the repeated co_awaits is unusual enough that I want to confirm. The right co_await is for the drogon::internal::EventLoopAwaiter, which I think just indicates when the loop is being held, and the left co_await is for the drogon::Task, which is my custom logic. Is this the intended usage?

prm-james-hill commented 1 year ago

For posterity: I moved away from drogon::internal::EventLoopAwaiter because it felt like I was using it inappropriately. All the existing uses in Drogon seemed to be for HTTP handlers and I'm not familiar enough with them to map them to what I'm trying to do. Switched to a hand-rolled loop awaiter that wraps a coroutine. Probably the only interesting part of the impl:

  void await_suspend(std::coroutine_handle<> handle) {
    drogon::app().getIOLoop(...)->queueInLoop(
      [this, handle]() {
        coro_.resume();
        if (coro_.done()) {
          handle.resume();
        }
      });
  }

I'm still not sure how to queue the top-level coroutine because anything based on drogon::AsyncTask seems to leak memory.

marty1885 commented 1 year ago

Hi,

I don't fully get what you are trying to achieve. IIRC I wrote that function as a way to test if our coroutine implementation is working and handles properties coroutines have. What you wrote is basically saying "drogon, execute the function callback on the specified thread. Upon finish, continue the execution of the current coroutine. If you simply want to run a coroutine. Simply do:

auto callback = []() -> drogon::Task<> {
    …
    co_return;
};
co_await callback();

queueInLoopCoro runs a function on a specified thread, then forcibly switch the thread executing the coroutine (if provided). Internally, drogon assigns each TCP connection (no matter it's for HTTP, DB, NoSQL) and some objects to be associated with a thread in the thread pool. Then all IO operations are done on that thread exclusively. This avoids the messiness of managing locks. However, since coroutines in C++ can switch switch thread of execution upon awaiting. queueInLoopCoro allows us to run functions on another thread and be back on the current thread later. It's a very low level optimization for very high concurrent environment.

The internal namespace is fine. We made it that way so people won't try to inherent or extend that class (which you should never do unless you know exactly what you are doing).

For example.

// assuming you are running on IO thread 0 (app().getIoLoop(0)).
auto sourceLoop = trantor::getEventLoopOfCurrentThread();
auto db = app().getDbClient();
auto redis = app().getRedisClient();

co_await db->execSqlCoro(...);
// Now since DB clients runs on it's own thread. The coroutine is now resumed on the DB thread.
// But we wish to cache out data in Redis. Calling Redis and awaiting will result in the coroutine
// run on the redis thread. Yet if we `co_return` on the thread owning the Redis client
// The underlying TCP connection cannot write to the socket directly due to thread association. It had to
// synchronize across threads to send the HTTP response. To avoid that cost, use `queueInLoopCoro`

// But, it's advice to simply use "fast" clients so they all live on the same IO thread. This is much simpler
// and faster.

co_await queueInLoopCoro(redis->getLoop(), [redis] -> {....}, sourceLoop);
// The lambda runs in the redis thread
// Now we are back on IO thread 0 again. We can return an know that the underlying TCP don't need to sync

co_return HttpResponse::newHttpResponse(...);
prm-james-hill commented 1 year ago

Hey Marty,

Thanks for the example, and the explanation of why queueInLoopCoro() exists. I can explain more about my use case:

I occasionally get large, synchronous chunks of work in my worker loops. To speed it up, and to prevent occupying one loop for very long, I split it up (into multiple lambdas) and spread it across multiple of the loops. (The initial chunk of work can exceed 10ms so other threading setups would probably be fine, but it’s easier for me to just think in terms of the event loops.)

What I’m trying to do is replace these (potentially multiple, and potentially nested) lambdas with coroutines. I think the code sample in my second post supports that (though I think I still need to implement some kind of when_all).

I'm new to coroutines so please let me know if these questions don't make sense;

  1. Does EventLoopAwaiter support this? The only way I could emulate it was with the repeated co_awaits. From what you said it at least doesn't seem like I'm the intended user of it.
  2. My top-level HTTP handler doesn’t support coroutines yet, so I can't do co_return HttpResponse::newHttpResponse(...); for now. So, in terms of your first code sample, this would just be a std::function running on the event loop for me. coroutine.h has a bunch of utils for going back and forth between std::functions and coroutines so I had some success hacking stuff together, but ultimately it relied on AsyncTask, which I’m assuming is a misuse because it was leaking memory. Am I missing something here? (EDIT: After digging into it more, I guess the situation with AsyncTask is that, because of how it's adapting a coroutine into a non-coroutine, it has no clean way of knowing when it's done. I was able to coordinate my AsyncTask creation so that I can save a copy of the coro handle in something I know will live longer.)
marty1885 commented 1 year ago

Does EventLoopAwaiter support this?

No, this is actually one of the issues with C++ coroutines. Unlike JS and Rust, most C++ runs coroutines when you await on it. Not when creating the coroutine. For example

auto foo = []() -> Task<> { cout << "running" << endl; };
auto awaitible = foo(); // Coroutine created but not running
co_await awaitible; // now it prints

I see what you want. I need time to figure out a clean interface for this. Give me some time to come up with an implementation plan. Please do poke me if I haven't post in some time. I might get sidetracked into other stuff.

My top-level HTTP handler doesn’t support coroutines yet

You should be able to use coroutines nevertheless. Check the 1st example in the using coroutine section. Declare your handler as [](HttpRequestPtr req, std::function<void(const HttpResponsePtr&)> callback) -> Task<> so you can keep the callback function while able to await for coroutines to finish.

prm-james-hill commented 1 year ago

most C++ runs coroutines when you await on it [...] I need time to figure out a clean interface for this [...] Please do poke me if I haven't post in some time

Got it. I guess the issue is basing behavior on the promise type (which is currently erased)? And will do, but I’ll give it a couple weeks/months…

You should be able to use coroutines nevertheless. Check the 1st example in the using coroutine section

Sorry, my point was more that the majority of my executive logic is still based around callback-chaining with queueing on the event loop, rather than this being a handler limitation. (I also eventually understood what AsyncTask is really doing and how to use it safely, so my original point is no longer relevant.)

As far as this “issue” goes, you’ve cleared up my misunderstandings, so thanks :) Should I mark this resolved or leave it open to nag you about my feature request?