zeromq / cppzmq

Header-only C++ binding for libzmq
http://www.zeromq.org
MIT License
1.93k stars 757 forks source link

Deadlock/blocking on zmq_proxy #571

Closed Alireza67 closed 1 year ago

Alireza67 commented 1 year ago

How can stop zmq_proxy in correct way? I tried to implement a design with multiple publishers and multiple subscribers. I used XSUB and XPUB in my code. My code works as I expected but when program wants to close, I face with block on calling zmq_proxy function.


#include <string>
#include <atomic>
#include <thread>
#include <iostream>
#include <mutex>
#include <vector>

using namespace std;

atomic<bool> kLiveFlag = true;
auto liveCouner = 0;
mutex lock_;

void Print(string msg)
{
    unique_lock<mutex>lk(lock_);
    cout << msg << endl;
}

auto ctx = zmq_ctx_new();

void Proxy(vector<string> publisherAddresses, string proxyPublisherAddress)
{
    auto xpub = zmq_socket(ctx, ZMQ_XPUB);
    auto res = zmq_bind(xpub, proxyPublisherAddress.c_str());

    auto xsub = zmq_socket(ctx, ZMQ_XSUB);
    for (auto& item : publisherAddresses)
    {
        res = zmq_connect(xsub, item.c_str());
    }   

    res = zmq_proxy(xsub, xpub, NULL);

    while (kLiveFlag)
    {
        this_thread::sleep_for(1s);
    }

    auto lingerTime = 0;
    res = zmq_setsockopt(xsub, ZMQ_LINGER, &lingerTime, sizeof(lingerTime));
    res = zmq_setsockopt(xpub, ZMQ_LINGER, &lingerTime, sizeof(lingerTime));
    res = zmq_close(xsub);
    res = zmq_close(xpub);
}

void Publisher(string name, string address, int message)
{
    auto socketSender = zmq_socket(ctx, ZMQ_PUB);
    auto res = zmq_bind(socketSender, address.c_str());

    while (kLiveFlag.load())
    {
        res = zmq_send(socketSender, &message, sizeof(message), 0);
        if (res)
        {
            //auto msg = "Publisher (" + name + ") send: " + to_string(message);
            //Print(msg);
        }
        this_thread::sleep_for(1s);
    }

    auto lingerTime = 0;
    res = zmq_setsockopt(socketSender, ZMQ_LINGER, &lingerTime, sizeof(lingerTime));
    res = zmq_close(socketSender);
}

void Subscriber(string name, string ProxyAddress, int filter)
{
    auto socketReceiver = zmq_socket(ctx, ZMQ_SUB);
    auto res = zmq_connect(socketReceiver, ProxyAddress.c_str());
    res = zmq_setsockopt(socketReceiver, ZMQ_SUBSCRIBE, &filter, sizeof(filter));
    auto timeout = 1000;
    res = zmq_setsockopt(socketReceiver, ZMQ_RCVTIMEO, &timeout, sizeof(timeout));

    int buffer;

    while (kLiveFlag.load())
    {
        res = zmq_recv(socketReceiver, &buffer, sizeof(buffer), 0);
        if (res < 0)
        {
            this_thread::sleep_for(1s);
        }
        else
        {
            auto msg = "Subscriber (" + name + ") receive: " + to_string(buffer);
            Print(msg);
        }
    }

    auto lingerTime = 0;
    res = zmq_setsockopt(socketReceiver, ZMQ_LINGER, &lingerTime, sizeof(lingerTime));
    res = zmq_close(socketReceiver);
}

int main()
{
    auto publisherPort = 9000;
    auto publisherAddress = "inproc://job_1";
    auto pub1 = thread(Publisher, "pub1"s, publisherAddress, 66);

    auto publisherPort2 = 9001;
    auto publisherAddress2 = "inproc://job_2";
    auto pub2 = thread(Publisher, "pub2"s, publisherAddress2, 77);

    auto publisherAddresses = vector<string>{ publisherAddress, publisherAddress2 };

    auto proxyPublisherPort = 10000;
    auto proxyPublisherAddress = "tcp://127.0.0.1:"s + to_string(proxyPublisherPort);
    auto proxy = thread(Proxy, publisherAddresses, proxyPublisherAddress);

    auto sub1 = thread(Subscriber, "sub1"s, proxyPublisherAddress, 66);
    auto sub2 = thread(Subscriber, "sub2"s, proxyPublisherAddress, 77);

    while (liveCouner < 5)
    {
        liveCouner++;
        this_thread::sleep_for(1s);
    }
    kLiveFlag.store(false);
    proxy.join();
    pub1.join();
    pub2.join();
    sub1.join();
    sub2.join();

    auto res = zmq_ctx_destroy(ctx);

}
Alireza67 commented 1 year ago

I figured out if I destroy context before join my proxy thread, blocking on zmq_proxy is going to solve.

The zmq_proxy() function always returns -1 and errno set to ETERM (the ØMQ context associated with either of the specified sockets was terminated).

Is it a safe way? Are there any standard ways?

auto res = zmq_ctx_destroy(ctx); proxy.join();

gummif commented 1 year ago

I think here the safe way is to call shutdown, then join all threads and finally terminate/destroy.