Naios / continuable

C++14 asynchronous allocation aware futures (supporting then, exception handling, coroutines and connections)
https://naios.github.io/continuable/
MIT License
815 stars 44 forks source link

question: is_continuation<> and asio::strand::post #44

Closed Spongman closed 2 years ago

Spongman commented 2 years ago

I have some existing code that posts synchronous methods to a boost strand in order to queue their execution on a thread pool. However, in order to reduce the number of waiting threads, i'd like to convert these methods to continuable-returning coroutines. I have all the asio i/o calls asyncronized via cti::use_continuable, but what I'm trying to work out is whether or not posting a coroutine to a strand will still serialize the invocation of those asynchronous methods? I notice there's a specialization of is_continuation in erasure.hpp, is that related somehow?

i'm using continuable 4.1.0 & boost 1.77, if that's important.

Spongman commented 2 years ago

@Naios sorry, just atting you here since you mentioned before you didn't get notified.

Naios commented 2 years ago

@Spongman I think it is difficult to answer your question without having a minimalistic example provided of what you are trying to accomplish. As far as I understand your question, you are trying to dispatch a continuable coroutine or continuation from an asio strand.

If the coroutine can be invoked from anywhere, it is possible to suspend the coroutine and invoke it from within the strand.

continuable<> invoke_on_stand() {
  co_await strand.post(cti::use_continuable); // or something like this (pseudocode)

  // Continue logic on your strand...

  // After an async task that might switch the execution,
  // continue the execution on the strand
  co_await do_sth_async().via(strand.post(cti::use_continuable));

  // ... more program logic

  co_return;
}

However, it is required that after awaiting on an asynchronous task that might resume on a different execution context, that you re-schedule it on the strand. As far as I know, this is a limitation of the C++20 coroutine standard and cannot be solved more elegantly.

Note that you cannot use continuable to turn synchronous waits into asynchronous waits. is_continuation is an implementation detail and is not related to what you are trying to achieve.

Spongman commented 2 years ago

sorry, i should have been clearer about wha i'm trying to do.

i'm actually starting with code that looks something like this:

strand.post([&]() {
  stream_.read_some(..., [](auto...) {
    // handle read, do more io...
  });
});

which is obviously bad because it's blocking not only the strand, but the underlying context thread.

and i'd like to turn it into something like this (handwaving):

strand.post([&]() {
  auto result = co_await stream_.async_read_some(buffer_, cti::use_continuable);
  // handle read
  // do more io...
});

which ideally, while blocking the strand, would not block the underlying thread.

if i could co_await that post() call, that would be even better, but that doesn't look possible based on the boost code.

Naios commented 2 years ago

Ok, i see your issue. However asio should provide an initiation token for post and derivates that can be awaited. Independent of that I changed the use_continuable initation token such that it also now can be used on post:

template <typename T>
auto through_post(T& postable) {
  return [&postable](auto&& work) mutable {
    asio::post(postable, std::forward<decltype(work)>(work));
  };
}

void using_strand() {
  asio::io_context ioc(1);
  asio::io_context::strand strand(ioc);

  asio::post(strand, cti::use_continuable).then([]() {
    puts("Dispatched through initiation token");
  });

  cti::async_on(
      []() mutable {
        puts("Dispatched through executor");
      },
      through_post(strand));

  ioc.run();
}

This should make it possible to turn your example into:

co_await strand.post(cti::use_continuable);

auto result = co_await stream_.async_read_some(buffer_, cti::use_continuable);

co_await strand.post(cti::use_continuable);

or

co_await strand.post(cti::use_continuable);

auto result = co_await stream_.async_read_some(buffer_, cti::use_continuable).via(through_post(strand));

// where

template <typename T>
auto through_post(T& postable) {
  return [&postable](auto&& work) mutable {
    asio::post(postable, std::forward<decltype(work)>(work));
  };
}
Spongman commented 2 years ago

ok, that is pretty amazing. it looks like it works perfectly and the co_await strand.post thing makes it even simpler to use than i had imagined!

Naios commented 2 years ago

It seems like this issue was resolved. If you have any further questions or issue reports open a new ticket please.

Spongman commented 1 year ago

btw: i just noticed that the docs for async and async_on are missing. they appear in search & the changelog, but the page linked to doesn't contain anything.