zeromq / cppzmq

Header-only C++ binding for libzmq
http://www.zeromq.org
MIT License
1.94k stars 757 forks source link

zmq::proxy_steerable with capture does not shutdown #478

Closed jlouazel closed 3 years ago

jlouazel commented 3 years ago

While using zmq::proxy_steerable, I've come across an interesting issue. Everything seems to work according to the documentation except when I plug a capture socket as well.

So with a little bit of code, here's what's happening:

zmq::socket_t frontend(context, zmq::socket_type::router);
zmq::socket_t backend(context, zmq::socket_type::dealer);
zmq::socket_t capture(context, zmq::socket_type::pair);
zmq::socket_t control(context, zmq::socket_type::sub);

frontend.bind("inproc://frontend");
backend.connect("inproc://backend");
capture.connect("inproc://capture");
control.connect("inproc://control");

control.set(zmq::sockopt::subscribe, zmq::str_buffer(""));

// Case 1: using `proxy_steerable` without capture, everything goes fine 
zmq::proxy_steerable(frontend,
                     backend,
                     zmq::socket_ref(),
                     control);

// Case 2: using `proxy_steerable` with capture doesn't end the proxy 
zmq::proxy_steerable(frontend,
                     backend,
                     capture,
                     control);

I'm using the following code from another thread to communicate my intent to close the proxy:

zmq::socket_t control(context, zmq::socket_type::pub);
control.bind("inproc://control");
control.send(zmq::str_buffer("TERMINATE"), zmq::send_flags::none);
control.close();

I'm using cppzmq 4.7.1 on OSX, if that can pinpoint you to a more precise answer. I'm expecting in this situation to be able to benefit both from the capture and the control capabilities of the proxy. I'm just wondering if that is possible at all.

Thanks in advance for your answers.

gummif commented 3 years ago

I'm not sure what could cause this. Could you try sleeping maybe 10 ms around the line control.send(zmq::str_buffer("TERMINATE"), zmq::send_flags::none); to check if this is some timing issues. If that fails you could try asking in libzmq.

jlouazel commented 3 years ago

I actually succeeded in reproducing it properly. I got an example of a client/server where the client will send an incrementing counter every second to the server through the router. If you run it, you'll be able to see that both the backend and the capture will get the message.

The problem, as stated in the original issue, is really in the deinitialization phase.

If we consider the code below:

Blocking router deinitialization ```cpp #include #include #include #include void router_fn(zmq::context_t& ctx) { zmq::socket_t frontend(ctx, zmq::socket_type::router); zmq::socket_t backend(ctx, zmq::socket_type::dealer); zmq::socket_t capture(ctx, zmq::socket_type::pair); zmq::socket_t control(ctx, zmq::socket_type::sub); frontend.bind("inproc://frontend"); backend.bind("inproc://backend"); capture.connect("inproc://capture"); control.connect("inproc://control.kill"); control.set(zmq::sockopt::subscribe, zmq::str_buffer("")); zmq::proxy_steerable(zmq::socket_ref(frontend), zmq::socket_ref(backend), zmq::socket_ref(capture), zmq::socket_ref(control)); frontend.close(); backend.close(); capture.close(); control.close(); std::cout << "router successfully finished" << std::endl; } void backend_fn(zmq::context_t& ctx) { zmq::socket_t killer(ctx, zmq::socket_type::pair); zmq::socket_t backend(ctx, zmq::socket_type::router); killer.bind("inproc://backend.kill"); backend.connect("inproc://backend"); zmq::pollitem_t poll_items[]{{killer, 0, ZMQ_POLLIN, 0}, {backend, 0, ZMQ_POLLIN, 0}}; while (true) { assert(zmq::poll(poll_items, 2)); if (poll_items[0].revents & ZMQ_POLLIN) { break; } else if (poll_items[1].revents & ZMQ_POLLIN) { zmq::multipart_t mp; assert(zmq::recv_multipart(backend, std::back_inserter(mp))); // multipart_size = 3, mp[0] == `router_identity`, mp[1] == // `frontend_identity` ,mp[2] == `msg` if (mp.size() == 3) { std::cout << "backend received: " << mp[2].to_string() << '\n'; } } } killer.close(); backend.close(); std::cout << "backend successfully finished" << std::endl; } void capture_fn(zmq::context_t& ctx) { zmq::socket_t killer(ctx, zmq::socket_type::pair); zmq::socket_t capture(ctx, zmq::socket_type::pair); killer.bind("inproc://capture.kill"); capture.bind("inproc://capture"); zmq::pollitem_t poll_items[]{{killer, 0, ZMQ_POLLIN, 0}, {capture, 0, ZMQ_POLLIN, 0}}; while (true) { assert(zmq::poll(poll_items, 2)); if (poll_items[0].revents & ZMQ_POLLIN) { break; } else if (poll_items[1].revents & ZMQ_POLLIN) { zmq::multipart_t mp; assert(zmq::recv_multipart(capture, std::back_inserter(mp))); // multipart_size = 2, mp[0] == `identity`, mp[1] == `msg` if (mp.size() == 2) { std::cout << "capture received: " << mp[1].to_string() << '\n'; } } } killer.close(); capture.close(); std::cout << "capture successfully finished" << std::endl; } void frontend_fn(zmq::context_t& ctx) { zmq::socket_t killer(ctx, zmq::socket_type::pair); zmq::socket_t frontend(ctx, zmq::socket_type::dealer); killer.bind("inproc://frontend.kill"); frontend.connect("inproc://frontend"); uint32_t counter = 0; while (true) { std::this_thread::sleep_for(std::chrono::seconds(1)); zmq::message_t msg; if (killer.recv(msg, zmq::recv_flags::dontwait)) { break; } frontend.send(zmq::message_t(std::to_string(counter++)), zmq::send_flags::none); } killer.close(); frontend.close(); std::cout << "frontend successfully finished" << std::endl; } int main() { zmq::context_t ctx; std::thread router_thread(&router_fn, std::ref(ctx)); std::thread capture_thread(&capture_fn, std::ref(ctx)); std::thread backend_thread(&backend_fn, std::ref(ctx)); std::thread frontend_thread(&frontend_fn, std::ref(ctx)); std::cout << "Press enter to end the program..." << std::endl; std::cin.get(); zmq::socket_t frontend_killer(ctx, zmq::socket_type::pair); zmq::socket_t backend_killer(ctx, zmq::socket_type::pair); zmq::socket_t capture_killer(ctx, zmq::socket_type::pair); zmq::socket_t control_killer(ctx, zmq::socket_type::pub); // Kill `frontend`. frontend_killer.connect("inproc://frontend.kill"); frontend_killer.send(zmq::message_t(), zmq::send_flags::none); frontend_killer.close(); // Kill `backend`. backend_killer.connect("inproc://backend.kill"); backend_killer.send(zmq::message_t(), zmq::send_flags::none); backend_killer.close(); // Kill `capture`. capture_killer.connect("inproc://capture.kill"); capture_killer.send(zmq::message_t(), zmq::send_flags::none); capture_killer.close(); // Kill router using `control` socket. control_killer.bind("inproc://control.kill"); control_killer.send(zmq::str_buffer("TERMINATE"), zmq::send_flags::none); control_killer.close(); // Join threads. frontend_thread.join(); backend_thread.join(); capture_thread.join(); router_thread.join(); return 0; } ```

you can reproduce the behavior of the router being stuck forever. Now the exciting part is when we change the order of the router being deinitialized:

Diff to apply to avoid the router blocking ```diff diff --git a/sample/sample.cpp b/sample/sample.cpp index 6ee758f..00c5a5e 100644 --- a/sample/sample.cpp +++ b/sample/sample.cpp @@ -145,6 +145,11 @@ int main() { zmq::socket_t capture_killer(ctx, zmq::socket_type::pair); zmq::socket_t control_killer(ctx, zmq::socket_type::pub); + // Kill router using `control` socket. + control_killer.bind("inproc://control.kill"); + control_killer.send(zmq::str_buffer("TERMINATE"), zmq::send_flags::none); + control_killer.close(); + // Kill `frontend`. frontend_killer.connect("inproc://frontend.kill"); frontend_killer.send(zmq::message_t(), zmq::send_flags::none); @@ -160,11 +165,6 @@ int main() { capture_killer.send(zmq::message_t(), zmq::send_flags::none); capture_killer.close(); - // Kill router using `control` socket. - control_killer.bind("inproc://control.kill"); - control_killer.send(zmq::str_buffer("TERMINATE"), zmq::send_flags::none); - control_killer.close(); - // Join threads. frontend_thread.join(); backend_thread.join(); ```

And with this, it works! 🤔

In summary:

As a bonus, leaving the capture socket empty within the router seems to work as well! I suppose all of this trouble has something to do with this, but I cannot figure out why. Maybe your sharp eyes will be able to see something I didn't!

gummif commented 3 years ago

Ok thanks for that. I can image that sending the messages to the capture socket in the implementation is blocking and will block forever if the receiving side is closed before the proxy finishes? The implementation is non-trivial so I can't tell https://github.com/zeromq/libzmq/blob/master/src/proxy.cpp.

Anyway, I would suggest you post this to https://github.com/zeromq/libzmq to maybe to get expert take a look.

jlouazel commented 3 years ago

Thanks @gummif. As you suggested, I fixed this by making sure the capture is being deinitialized properly before the rest. That did the trick!

Hopefully, this thread can help people in the same situation to understand what's going on. As far as I'm concerned, we're done here.

navono commented 3 years ago

@jlouazel Do you have a complete demo?