zeromq / cppzmq

Header-only C++ binding for libzmq
http://www.zeromq.org
MIT License
1.93k stars 757 forks source link

zmq::monitor_t events not working? #541

Closed danielhalati closed 2 years ago

danielhalati commented 2 years ago

Hello everyone. I am trying to monitor(count) the incoming subscriptions in a PUB-SUB system. I have written this piece of code in C/C++. None of the implementations are working. They both get stuck into:

#include <unordered_map>
#include "zhelpers.hpp"
#include <thread>
#include <functional>

//#define CPP 1

std::string topic = "TEST_TOPIC";
std::string publish_address = "ipc://test";

void req_socket_monitor(void* ctx, void* socket, const char* address)
{
    zmq_event_t event;
    int rc;

    rc = zmq_socket_monitor(socket, address, ZMQ_EVENT_CONNECTED);
    assert(rc == 0);

    void* monitor = zmq_socket(ctx, ZMQ_PAIR);
    assert(monitor);

    rc = zmq_connect(monitor, address);
    assert(rc == 0);
    while (true) {
        zmq_msg_t msg;
        zmq_msg_init(&msg);
        rc = zmq_recvmsg(monitor, &msg, 0);
        if (rc == -1 && zmq_errno() == ETERM)
            break;
        assert(rc != -1);
        memcpy(&event, zmq_msg_data(&msg), sizeof(event));
        switch (event.event) {
        case ZMQ_EVENT_CONNECTED:
            std::cout << "Subscription";
            break;
        case ZMQ_EVENT_CLOSED:
            // handle socket closed event
            break;
        }
    }
    zmq_close(monitor);
}

class connect_monitor_t : public zmq::monitor_t {
public:
    virtual void on_monitor_started() override { std::cout << "monitor started" << std::endl; }
    void on_event_accepted(const zmq_event_t& event, const char* addr) override { std::cout << "got connection from " << addr << std::endl; }
    void on_event_connected(const zmq_event_t& event, const char* addr) override { std::cout << "got connection from " << addr << std::endl; }
    void on_event_listening(const zmq_event_t& event_, const char* addr) override { std::cout << "got connection from " << addr << std::endl; }
};

int main()
{
    zmq::context_t context(1);
    zmq::socket_t backend(context, ZMQ_PUB);
    backend.bind(publish_address);
#ifdef CPP
    connect_monitor_t monitor;
    monitor.init(backend, "inproc://monitor", ZMQ_EVENT_CONNECTED);

    auto thread = std::thread([&monitor] {
        while (monitor.check_event(-1)) {
        }
    });
#else

    auto thread = std::thread(std::bind(&req_socket_monitor, context.handle(), backend.handle(), "inproc://monitor"));
#endif
    while (true) {
        s_sendmore(backend, topic);
        s_send(backend, "TEST_DATA");
    }
    thread.join();
    return 0;
}

Then I have my client written in Python, nothing happens when the subscriber connects to the socket. I want to mention that the Python scripts that connects it is able to receive data.


import zmq

context = zmq.Context()

print("Connecting to hello world server…")
socket = context.socket(zmq.SUB)

socket.connect("ipc://test")
socket.setsockopt(zmq.SUBSCRIBE, b'')

for request in range(100000):
    message = socket.recv()
    print("Received reply %s [ %s ]" % (request, message))```