zeromq / azmq

C++ language binding library integrating ZeroMQ with Boost Asio
Boost Software License 1.0
318 stars 108 forks source link

Completion of async operation using boost::asio::yield_context not working #212

Open ACvanWyk opened 9 months ago

ACvanWyk commented 9 months ago

When a boost::asio::yield_context is used with the async receive/send functions there seems to be threading issues where after the function calls the thread that called the async function call is not the same one that was used to launch the async function. This causes threading issues where after the async function calls the same thread that launched the async functions are not the same thread after the async function finishes causing race conditions.

Running something like below

        boost::asio::io_service ios;
    boost::asio::strand<boost::asio::executor> strand{ios.get_executor()};

    azmq::socket sb(ios, ZMQ_ROUTER);
    sb.bind(subj(BOOST_CURRENT_FUNCTION));

    azmq::socket sc(ios, ZMQ_DEALER);
    sc.connect(subj(BOOST_CURRENT_FUNCTION));

    //send coroutine task
    boost::asio::spawn(strand, [&](boost::asio::yield_context yield) {
        REQUIRE(strand.running_in_this_thread());
        boost::system::error_code ecc;
        auto const btc = azmq::async_send(sc, snd_bufs, yield[ecc]);
        REQUIRE(strand.running_in_this_thread());
        REQUIRE(!ecc);
        REQUIRE(btc == 4);
    });

    //receive coroutine task
    boost::asio::spawn(strand, [&](boost::asio::yield_context yield) {
        std::array<char, 5> ident;
        std::array<char, 2> a;
        std::array<char, 2> b;
x
        std::array<boost::asio::mutable_buffer, 3> rcv_bufs = { {boost::asio::buffer(ident),
                        boost::asio::buffer(a),
                        boost::asio::buffer(b)}};

        boost::system::error_code ecc;

        REQUIRE(strand.running_in_this_thread());
        auto const btb = azmq::async_receive(sb, rcv_bufs, yield[ecc]);
        REQUIRE(strand.running_in_this_thread());
        REQUIRE(!ecc);
        REQUIRE(btb == 9);

        REQUIRE(message_ref(snd_bufs.at(0)) == boost::string_ref(a.data(), 2));
        REQUIRE(message_ref(snd_bufs.at(1)) == boost::string_ref(b.data(), 2));
    });

    ios.run();

will not work since when the completion handlers of the async functions are called there is no guarantee that the executor that is assigned the the completion handler is the same one which was used to create to completion handler. The REQUIRE(strand.running_in_this_thread()); before the async function call works but fails after the async function call.