chriskohlhoff / asio

Asio C++ Library
http://think-async.com/Asio
4.98k stars 1.22k forks source link

How to Create and then Await/Co_Await a Promise Set in another function? #1175

Open WarrenNiles opened 1 year ago

WarrenNiles commented 1 year ago

I was wondering if it is possible to co_await a promise to be set by another thread in Boost::Asio? The reason I am asking this is that many 3rd Party DB Connector libraries implement their own thread management and async engines. Therefore, while their functions are async they are not compatible in a readable way with boost. However, all of these libraries typically implement a way s.t when their function completes it will call a handler function. My goal is to: 1) create a promise with boost 2) create a lambda function that will be the callback for the 3rd party library to set said Asio promise's value with the resulting future from the 3rd Party Library. 3) In my webserver implementation call the 3rd party library non-blocking function registering the callback in 2). 4) Now co_await the boost_promise. 5) Use the 3rd Party Future returned as if it were a normal variable in my coroutine.

https://docs.datastax.com/en/developer/cpp-driver/2.16/topics/basics/futures/ For example using DataStax Cassandra:

void on_result(CassFuture* cass_future, boost::promise<CassFuture*>* boost_promise) {
  /* This will set our boost_promise with the CassFuture that now has a value so we can use the future in the boost awaitable function */
  boost_promise.set(cass_future);
}

asio::awaitable<int> third_party_call() {
  CassFuture* third_party_future = NULL;

  /* Do some operation to get a Cass future */
  third_party_future = cass_session_connect(session, cluster);

  /* Create a Boost Asio Promise */
  boost::promise<CassFuture> boost_promise;

  /* Set a callback instead of waiting for the result to be returned. When 3rd party engine finishes, it will set the promise we pass to on_result with our completed CassFuture*/
  cass_future_set_callback(third_party_future, on_result, promise);

  /* Await the Promise and set our 3rd party future to it as the callback consumes the initial one */
  third_party_future = co_await boost_promise.async_wait(asio::use_awaitable)
  co_return 1
}

The problem I am having is that I cannot find a way to do this with the promise mechanic as I am struggling greatly with the documentation coming from std::promise as well as being new to boost asio. That is why I am wondering if this is even possible as the promise via. the documentation doesn't seem to have a set method.

chloro-pn commented 1 year ago

You can directly package the asynchronous function as a coroutine and co_ awa it, the code is as follows(add an asynchronous operation to the echo_server):

//
// echo_server.cpp
// ~~~~~~~~~~~~~~~
//
// Copyright (c) 2003-2022 Christopher M. Kohlhoff (chris at kohlhoff dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//

#include <asio/co_spawn.hpp>
#include <asio/detached.hpp>
#include <asio/io_context.hpp>
#include <asio/ip/tcp.hpp>
#include <asio/signal_set.hpp>
#include <asio/write.hpp>
#include <cstdio>

#include <thread>
#include <iostream>

using asio::ip::tcp;
using asio::awaitable;
using asio::co_spawn;
using asio::detached;
using asio::use_awaitable;
namespace this_coro = asio::this_coro;

#if defined(ASIO_ENABLE_HANDLER_TRACKING)
# define use_awaitable \
  asio::use_awaitable_t(__FILE__, __LINE__, __PRETTY_FUNCTION__)
#endif

auto myAsyncTask(const std::string& msg, asio::use_awaitable_t<> h = {}) {
  auto initiate = []<typename Handler>(Handler&& handler, const std::string& msg) mutable {
    std::thread th(
      [handler = std::move(handler), msg = msg]() mutable -> void {
        std::this_thread::sleep_for(std::chrono::seconds(1));
        auto ex = asio::get_associated_executor(handler);
        asio::dispatch(ex, [handler = std::move(handler), msg = std::move(msg)] () mutable -> void {
          handler(msg.size());
        });
    });
    th.detach();
  };
  return asio::async_initiate<asio::use_awaitable_t<>, void(size_t)>(initiate, h, msg);
}

awaitable<void> echo(tcp::socket socket)
{
  try
  {
    char data[1024];
    for (;;)
    {
      std::size_t n = co_await socket.async_read_some(asio::buffer(data), use_awaitable);
      size_t length = co_await myAsyncTask("hello world");
      std::cout << "my async task return " << length << std::endl;
      co_await async_write(socket, asio::buffer(data, n), use_awaitable);
    }
  }
  catch (std::exception& e)
  {
    std::printf("echo Exception: %s\n", e.what());
  }
}

awaitable<void> listener()
{
  auto executor = co_await this_coro::executor;
  tcp::acceptor acceptor(executor, {tcp::v4(), 55555});
  for (;;)
  {
    tcp::socket socket = co_await acceptor.async_accept(use_awaitable);
    co_spawn(executor, echo(std::move(socket)), detached);
  }
}

int main()
{
  try
  {
    asio::io_context io_context(1);

    asio::signal_set signals(io_context, SIGINT, SIGTERM);
    signals.async_wait([&](auto, auto){ io_context.stop(); });

    co_spawn(io_context, listener(), detached);

    io_context.run();
  }
  catch (std::exception& e)
  {
    std::printf("Exception: %s\n", e.what());
  }
}

In addition, if you can understand Chinese, you can read this article:)

WarrenNiles commented 1 year ago

Yeah this doesn't address the utilization of promises and future, but it is interesting nonetheless.