Naios / continuable

C++14 asynchronous allocation aware futures (supporting then, exception handling, coroutines and connections)
https://naios.github.io/continuable/
MIT License
815 stars 44 forks source link

Asio integration - completion token #73

Open pfeatherstone opened 2 weeks ago

pfeatherstone commented 2 weeks ago

@Naios

I would like to use continuable to build Asio composed operations as described https://think-async.com/Asio/boost_asio_1_30_2/doc/html/boost_asio/overview/composition/compose.html but instead of using async_compose() or co_composed() or deferred(), I would like to use this library as i think it provides a better abstraction.

Is this possible? A few subtleties:


Commit Hash

23a724c

Expected Behavior

What I would like is a transformation algorithm cti::transforms::to_completion_token() which would allow the following to work:

#include <boost/asio.hpp>
#include <continuable/continuable.hpp>
#include <continuable/external/asio.hpp>

using namespace std::chrono_literals;

template <typename CompletionToken>
auto async_wait_twice(boost::asio::steady_timer& timer, CompletionToken&& token)
{
    return cti::make_ready_continuable()
        .then([&] {
            timer.expires_after(1s);
            return timer.async_wait(cti::use_continuable);
        })
        .then([&] {
            printf("First timer wait finished\n");
            timer.expires_after(1s);
            return timer.async_wait(cti::use_continuable);
        })
        .then([] {
            printf("Second timer wait finished\n");
            return cti::make_ready_continuable();
        })
        .apply(cti::transforms::to_completion_token(std::forward<CompletionToken>(token)));
}

int main()
{
    boost::asio::io_context ctx;
    boost::asio::steady_timer timer(ctx);
    auto guard = boost::asio::make_work_guard(ctx);
    std::thread t([&]{ctx.run();});

    async_wait_twice(timer, [] {
        printf("Two timers called\n");
    });

    ctx.stop();
    t.join();

    return 0;
}

Actual Behavior

N/A

Steps to Reproduce

N/A

Your Environment

Naios commented 1 week ago

Hi, I assume you want to attach a handler, with an associated executor (not a completion token). The completion token usually transforms the callback. Then you can use something like the following:

template <typename Handler>
auto to_completion_token(Handler&& token) {
  return [token = std::forward<Handler>(token)](auto&& continuation) {
    auto executor = asio::get_associated_executor(token);

    return std::forward<decltype(continuation)>(continuation)
        .then(
            [token = std::move(token)]() mutable {
              std::move(token)();
            },
            [executor = std::move(executor)](auto&& work) mutable {
              asio::post(executor, std::forward<decltype(work)>(work));
            });
  };
}

with


template <typename Handler>
auto async_wait_twice(asio::steady_timer& timer, Handler&& token) {
  return cti::async([&] {
           timer.expires_after(1s);
           return timer.async_wait(cti::use_continuable);
         })
      .then([&] {
        printf("First timer wait finished\n");
        timer.expires_after(1s);
        return timer.async_wait(cti::use_continuable);
      })
      .then([] {
        printf("Second timer wait finished\n");
        return cti::make_ready_continuable();
      })
      .apply(to_completion_token(std::forward<Handler>(token)));
}

int main() {
  asio::io_context ctx;
  asio::steady_timer timer(ctx);
  auto guard = asio::make_work_guard(ctx);
  std::thread t([&] {
    ctx.run();
  });

  std::latch latch(1);

  async_wait_twice(timer, [&] {
    printf("Two timers called\n");
    latch.count_down();
  });

  latch.wait();
  ctx.stop();
  t.join();

  return 0;
}

In case you intended to use a completion token like asio::deferred, then you can use asio::async_initiate instead of std::move(token)();, but this will get too complicated for a short answer.

pfeatherstone commented 1 week ago

That's a great answer thank you. But I was hopping to have an adaptor (or whatever we want to call it) which would allow me to use async_wait_twice() with any completion token like callbacks, asio::deferred, asio::use_awaitable, asio::experimental::use_promise etc.

But maybe that's too much work. Maybe the better approach is to juse use asio::deferred since it offers similar functionality to this library, i.e. continuations, and you can use it to build asio composed operations which then work with any completion token. It's just that i think continuable is better than asio::deferred

pfeatherstone commented 1 week ago

So the following is roughly equivalent using deferred and works with all Asio completion tokens:

template <typename CompletionToken>
auto async_wait_twice(boost::asio::steady_timer& timer, CompletionToken&& token)
{
    return deferred.values()
        | deferred([&] {
            timer.expires_after(1s);
            return timer.async_wait(deferred);
        })
        | deferred([&](std::error_code ec) {
            printf("First timer wait finished\n");
            timer.expires_after(1s);
            return timer.async_wait(deferred);
        })
        | deferred([&](std::error_code ec) {
            printf("Second timer wait finished\n");
            return deferred.values(ec);
        })
        | std::forward<CompletionToken>(token);
}

However i'm ignoring all the error codes. Each deferred handler has to manually handle ec whereas I think continuable manages the error path automatically??