zeromq / azmq

C++ language binding library integrating ZeroMQ with Boost Asio
Boost Software License 1.0
319 stars 109 forks source link

cancel() runs completion handlers immediately but should post/defer them instead #210

Open dkl opened 1 year ago

dkl commented 1 year ago

Hi,

socket::cancel() appears to call pending completion handlers immediately, instead of delaying their execution until after the call returns. As a result there are dead locks when calling more socket operations (such as async_send()) from the completion handlers, because the socket uses a non-recursive mutex internally. This differs from other boost::asio objects such as boost::asio::steady_timer, which allow this case, so for example you can restart a timer from inside the operation_aborted completion handler.

An example to show the issue:

// Build: g++ -Wall -g azmq_cancel_timing.cpp -lzmq -lboost_filesystem -o azmq_cancel_timing

#include <azmq/socket.hpp>
#include <azmq/version.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/ip/udp.hpp>
#include <boost/asio/steady_timer.hpp>
#include <chrono>
#include <stdio.h>
#include <zmq.hpp>

int main()
{
    printf("boost version: %i\n", BOOST_VERSION);
    printf("azmq version: %i\n", AZMQ_VERSION);

    boost::asio::io_context ioctx;

    boost::asio::steady_timer timer(ioctx);
    timer.expires_from_now(std::chrono::seconds(1));
    timer.async_wait(
        [](boost::system::error_code const& ec)
        {
            printf("timer async_wait completion handler, ec = %s\n", ec.message().c_str());
        }
    );

    boost::asio::ip::udp::socket udpsocket(ioctx, boost::asio::ip::udp::endpoint(boost::asio::ip::make_address("127.0.0.1"), 0));
    std::array<uint8_t, 1000> buffer1;
    udpsocket.async_receive(boost::asio::buffer(buffer1),
        [](boost::system::error_code const& ec, size_t)
        {
            printf("udp socket async_receive completion handler, ec = %s\n", ec.message().c_str());
        }
    );

    azmq::socket azmqsocket(ioctx, ZMQ_PULL);
    azmqsocket.set_option(azmq::socket::linger(0));
    azmqsocket.connect("tcp://127.0.0.1:12345");
    std::array<uint8_t, 1000> buffer2;
    azmqsocket.async_receive(boost::asio::buffer(buffer2),
        [](boost::system::error_code const& ec, size_t)
        {
            printf("azmq socket async_receive completion handler, ec = %s\n", ec.message().c_str());
        }
    );

    printf("timer.cancel()...\n");
    timer.cancel();
    printf("timer.cancel()... done\n");

    printf("udpsocket.cancel()...\n");
    udpsocket.cancel();
    printf("udpsocket.cancel()... done\n");

    printf("azmqsocket.cancel()...\n");
    azmqsocket.cancel();
    printf("azmqsocket.cancel()... done\n");

    printf("io_context.run()...\n");
    ioctx.run();
    printf("io_context.run()... done\n");

    return 0;
}

Actual output, azmq socket competion handler called during cancel(), instead of later like the others:

boost version: 108200
azmq version: 10002
timer.cancel()...
timer.cancel()... done
udpsocket.cancel()...
udpsocket.cancel()... done
azmqsocket.cancel()...
azmq socket async_receive completion handler, ec = Operation canceled
azmqsocket.cancel()... done
io_context.run()...
timer async_wait completion handler, ec = Operation canceled
udp socket async_receive completion handler, ec = Operation canceled
io_context.run()... done

Expected output: All completion handlers are called later through the io_service.

Degoah commented 11 months ago

Any plans to work on this finding? I have also been caught by this one.

sehe commented 10 months ago

This behaviour absolutely needs to be fixed because it violates [Asio guarantees](https://www.boost.org/doc/libs/1_83_0/doc/html/boost_asio/overview/core/threads.html#:~:text=Asynchronous%20completion%20handlers%20will%20only%20be%20called%20from%20threads%20that%20are%20currently%20calling%20io_context%3A%3Arun()):

_"Asynchronous completion handlers will only be called from threads that are currently calling io_context::run()_

@Degoah conjectures here simply skipping cancel_ops() appears to maybe fix the surface symptom:

https://github.com/zeromq/azmq/blob/master/azmq/detail/socket_service.hpp#L483

However, I'm not qualified to judge whether this is okay w.r.t. lifetimes (as the descriptor is being unregistered - which may or may not be okay?).

Also, note it doesn't solve the issue that the executor is not being honored as it should according to the Asio specs:

https://compiler-explorer.com/z/W83xMGqve, which (with the cancel_ops() removed as described) outputs

image

That's still wrong, as the azmqsocket callback should be invoked ON the strand.

Degoah commented 10 months ago

However, I'm not qualified to judge whether this is okay w.r.t. lifetimes (as the descriptor is being unregistered - which may or may not be okay?).

What I have understood from the implementation is, that de-registering the descriptor is ok, as it gets implicitly registered again in context of a new call to async_receive/async_send and an internal call to schedule(...).