boostorg / cobalt

Coroutines for C++20 & asio
https://www.boost.org/doc/libs/master/libs/cobalt/doc/html/index.html
191 stars 22 forks source link

Cancellation from external `asio::cancellation_signal`? #166

Closed Lyulf closed 3 months ago

Lyulf commented 3 months ago

I have coroutines that need to be cancelled in order for me to correctly handle cleanup of a class. I have tried using reset_cancellation_source directly on promise_type (since cobalt::this_coro::reset_cancellation_source doesn't work outside cobalt::detached) but cancellation signal is ignored if sent before cancellation source is reset and cancellation stops working when called directly on the coroutine (e.g. from cobalt::race ).

I can probably work around those problems but I was wondering if there is a built in way to do this? If not then are there any plans for adding that in the future?

Here is an example code to hopefully demonstrate what I want to achieve (sorry for the length):

#include <coroutine>

#include <boost/asio.hpp>
#include <boost/cobalt.hpp>

namespace asio = boost::asio;
namespace cobalt = boost::cobalt;

// Required because cobalt::this_coro::reset_cancellation_source is available only for
// cobalt::detached
struct ResetCancellationSource
{
  asio::cancellation_slot slot;

  bool await_ready() { return false; }
  template <typename Promise>
  bool await_suspend(std::coroutine_handle<Promise> handle)
  {
    handle.promise().reset_cancellation_source(std::move(slot));
    return false;
  }
  void await_resume() {}
};

// Assume that implementations can change value of locked and assume that lockImpl can suspend and
// resume multiple times until lock is ready
cobalt::task<void> lockImpl(bool& locked);
cobalt::task<void> unlockImpl(bool& locked);

cobalt::task<void> cleanUp(bool& locked)
{
  // Without cancellation value could potentially be changed after cleanup
  if (locked)
    co_await unlockImpl(locked);
}

class Lock
{
public:
  Lock(cobalt::executor executor = cobalt::this_thread::get_executor()) : executor(executor) {}

  ~Lock()
  {
    signal.emit(asio::cancellation_type::all);
    cobalt::spawn(executor, cleanUp(locked), asio::use_future).get();
  }

  cobalt::promise<void> lock()
  {
    // Is there a way to do this in a way that still allows caller to cancel lock and doesn't ignore
    // cancellation if signal was triggered before reseting the source?
    co_await ResetCancellationSource{signal.slot()};

    // example impl
    co_await cobalt::spawn(executor, lockImpl(locked), cobalt::use_op);
  }

  cobalt::promise<void> unlock()
  {
    // same as above
    co_await ResetCancellationSource{signal.slot()};

    co_await cobalt::spawn(executor, unlockImpl(locked), cobalt::use_op);
  }

private:
  cobalt::executor executor;
  asio::cancellation_signal signal;
  bool locked = false;
};
klemens-morgenstern commented 3 months ago

I don't understand the problem. Why don't you just spawn a task and use asio::bind_cancellation_slot on the completion token?

Lyulf commented 3 months ago

This is exactly what I was looking for. I knew that it should be simple to achieve but I guess my lack of experience with asio must have been showing.

Lyulf commented 3 months ago

I guess I was too hasty will closing the issue. After more testing I found that using asio::bind_cancellation_slot seems to still prevent cancellation which is called directly on a promise.

Here is an example:

#include <cassert>
#include <boost/asio.hpp>
#include <boost/cobalt.hpp>

namespace asio = boost::asio;
namespace cobalt = boost::cobalt;

using namespace std::chrono_literals;

struct SuspendUntilCancelled
{
  struct Cancellation
  {
    cobalt::executor executor;
    void* address = nullptr;

    void operator()(asio::cancellation_type /*type*/)
    {
      if (address != nullptr)
        asio::post(executor, cobalt::unique_handle<void>(address));
    }
  };

  bool await_ready() { return false; }

  template <typename Promise>
  void await_suspend(std::coroutine_handle<Promise> handle)
  {
    auto executor = handle.promise().get_executor();
    slot = handle.promise().get_cancellation_slot();
    assert(slot.is_connected());
    slot.emplace<Cancellation>(executor, handle.address());
  }

  void await_resume()
  {
    if (slot.is_connected())
      slot.clear();
  }

  asio::cancellation_slot slot;
};

struct CancellableContainer
{
  cobalt::promise<void> suspendUntilCancel()
  {
    co_await cobalt::spawn(co_await cobalt::this_coro::executor,
                           suspend(),
                           asio::bind_cancellation_slot(
                             signal.slot(), cobalt::use_op)); // using just cobalt::use_op works just fine
  }

  void cancel() { signal.emit(asio::cancellation_type::all); }

private:
  cobalt::task<void> suspend() { co_await SuspendUntilCancelled{}; }

  asio::cancellation_signal signal;
};

cobalt::main co_main(int /*argc*/, char** /*argv*/)
{
  CancellableContainer container;
  auto promise = container.suspendUntilCancel();
  //container.cancel(); // works correctly
  promise.cancel();
  co_await promise; // permanently suspended if cancelled using promise.cancel()
  co_return 0;
}
klemens-morgenstern commented 3 months ago

In the above code you're intercepting the cancel on the promise. That is the co_await spawn doesn't get the cancel of the promise forwarded.

Lyulf commented 3 months ago

Then is there a way to allow cancellation from both cobalt::promise and asio::cancellation_signal or should I write something on my own?

klemens-morgenstern commented 3 months ago

You'd need to write it on your own. You can get the cancellation_state as describe here.

Lyulf commented 3 months ago

Ok. Thank you for your help!