zeromq / azmq

C++ language binding library integrating ZeroMQ with Boost Asio
Boost Software License 1.0
319 stars 109 forks source link

async_receive is not cancellable by the OR awaitable operator #209

Open magni-mar opened 1 year ago

magni-mar commented 1 year ago

I encountered this error while developing and produced a MVE for simplification purposes below.

#include <boost/asio.hpp>
#include <boost/asio/experimental/awaitable_operators.hpp>
#include <azmq/socket.hpp>
#include <iostream>

namespace asio = boost::asio;
using namespace asio::experimental::awaitable_operators;

auto timer(std::chrono::steady_clock::duration dur) -> asio::awaitable<void> {
  asio::steady_timer timer(co_await asio::this_coro::executor);
  timer.expires_after(dur);
  co_await timer.async_wait(asio::use_awaitable);
}

auto foo(asio::io_context& ctx) -> asio::awaitable<void> {
  azmq::sub_socket socket_ = azmq::sub_socket(ctx);

  std::array<std::byte, 4096> buffer{};

  auto value = co_await (azmq::async_receive(socket_, asio::buffer(buffer), asio::as_tuple(asio::use_awaitable)) ||
                timer(std::chrono::seconds(1)));

  std::cout << "\n\n\nvalue: " << value.index() << std::endl;

}

auto main() -> int {
  asio::io_context ctx{};
  co_spawn(ctx, foo(ctx), asio::detached);
  ctx.run();
  return 0;
} 

The OR awaitable operator waits for one coroutine to finish and when it does it terminates the other one.

When I async_receive on a socket that doesn't contain any information this simply hangs. When the timer finishes the async_receive is not terminated.

Output (using boost asio logging feature):

@asio|1691763374.475866|0^1|in 'co_spawn_entry_point' (/home/magni-the-developer/Documents/framework/vcpkg-sysroot/x64-linux-gcc/includ:147)
@asio|1691763374.475866|0*1|io_context@0x7ffe3742f260.execute
@asio|1691763374.475906|>1|
@asio|1691763374.476192|1^2|in 'co_spawn_entry_point' (/home/magni-the-developer/Documents/framework/vcpkg-sysroot/x64-linux-gcc/includ:89)
@asio|1691763374.476192|1^2|called from 'operator||<std::tuple<boost::system::error_code, long unsigned int>, boost::asio' (/home/magni-the-developer/Documents/framework/vcpkg-sysroot/x64-linux-gcc/includ:345)
@asio|1691763374.476192|1*2|descriptor@0x55859345ea28.async_read_some(null_buffers)
@asio|1691763374.476204|1^3|in 'co_spawn_entry_point' (/home/magni-the-developer/Documents/framework/vcpkg-sysroot/x64-linux-gcc/includ:147)
@asio|1691763374.476204|1^3|called from 'operator||<std::tuple<boost::system::error_code, long unsigned int>, boost::asio' (/home/magni-the-developer/Documents/framework/vcpkg-sysroot/x64-linux-gcc/includ:345)
@asio|1691763374.476204|1*3|deadline_timer@0x558593463cc8.async_wait
@asio|1691763374.476211|<1|
@asio|1691763375.476231|>3|ec=system:0
@asio|1691763375.476246|3|deadline_timer@0x558593463cc8.cancel
@asio|1691763375.476252|<3|

If it were working correctly the line

value: <index>

would be printed after the timer is canceled.

jbbjarnason commented 1 year ago

Looking a bit further into this, it seems that the reactor_op would need a cancellation key or some kind of way to support per operation cancellation. Similar to what was did here, https://github.com/chriskohlhoff/asio/commit/6bfcc13e996eb9842bb1c33333c9ca5e0910898f#diff-b4f6761af627b21d4c9ba176b0110d2c4682e913722102bee2500ac366f379f3R56

When the async_initiate is made the completion token has amended cancellation slot with nullptr handler, which would require assignment from the reactor_op similarly as is done in boost asio.