zeromq / cppzmq

Header-only C++ binding for libzmq
http://www.zeromq.org
MIT License
1.9k stars 757 forks source link

Radio Dish UDP cppzmq #624

Open vlad777442 opened 7 months ago

vlad777442 commented 7 months ago

Please use this template for reporting suspected bugs or requests for help.

Issue description

I attempted to use Radio Dish in ZeroMQ to send messages using UDP, but the client isn't receiving anything. Do you have any ideas on what might be wrong? I'm trying to send messages from Radio to Dish.

Environment

Minimal test code / Steps to reproduce the issue

Radio:

#define ZMQ_BUILD_DRAFT_API
#include <zmq.hpp>
#include <iostream>
#include <chrono>
#include <thread>

int main() {
    using namespace std::chrono_literals;

    zmq::context_t context{1};

    zmq::socket_t socket{context, zmq::socket_type::radio};
    socket.connect("udp://127.0.0.1:5555");

    const std::string data{"Hello"};

    while (true) {
        // Send the reply to the client
        socket.send(zmq::buffer(data), zmq::send_flags::none);

        // Simulate work
        std::this_thread::sleep_for(1s);
    }

    return 0;
}

Dish:

#define ZMQ_BUILD_DRAFT_API
#include <zmq.hpp>
#include <iostream>

int main() {
    // Initialize the ZeroMQ context with a single IO thread
    zmq::context_t context{1};

    zmq::socket_t socket{context, zmq::socket_type::dish};
    socket.bind("udp://127.0.0.1:5555");

    while (true) {
        zmq::message_t reply{};
        (void)socket.recv(reply, zmq::recv_flags::none);

        std::cout << "Received " << reply.to_string(); 
    }

    return 0;
}
gummif commented 7 months ago

I don't have any experience with these sockets. Does it work with tcp? Don't you need to join a group?

vlad777442 commented 5 months ago

I don't have any experience with these sockets. Does it work with tcp? Don't you need to join a group?

I tried to set a group, but I get an error undefined reference to `zmq_msg_set_group'. The version is 4.3.4

#define ZMQ_BUILD_DRAFT_API
#include <iostream>
//#include "zmq.h"
#include <zmq.hpp>
#include <string>
#include <cstring>

using namespace std;

// RADIO

void assert(bool a) {
    if (!a) {
        cout << "failed - " << zmq_errno << endl;
        throw;
    }
}

int main() {
        //printf("ZMQ version: %d.%d.%d\n", ZMQ_VERSION_MAJOR, ZMQ_VERSION_MINOR, ZMQ_VERSION_PATCH);

    void* context = zmq_ctx_new();
    assert(context != nullptr);
    int rc = zmq_ctx_set(context, ZMQ_IO_THREADS, 1);
    assert(rc == 0);

    void* radio = zmq_socket(context, ZMQ_RADIO);
    assert(radio != nullptr);

    rc = zmq_connect(radio, "udp://224.0.0.1:28650");
    assert(rc == 0);

    cout << "init success" << endl;

    while (1) {
        string t = "Test string";

        zmq_msg_t msg;
        rc = zmq_msg_init(&msg);
        assert(rc == 0);
        rc = zmq_msg_init_size(&msg, t.size());
        assert(rc == 0);
        if (t.size()) {
            memcpy(zmq_msg_data(&msg), t.data(), t.size());
        }
        //zmq::message_t msg{ t.data(), t.size() };
        //msg.set_group("telemetry");

        //rc = zmq_msg_set_group(&msg, "telemetry");
        //assert(rc == 0);

        cout << "sending now" << endl;

        int b = zmq_msg_send(&msg, radio, 0);
        cout << "sent bytes - " << b << endl;
        //rc = zmq_msg_init_size(&);
        //Sleep(100);
    }

    /*zmq::context_t ctx(1);
    zmq::socket_t r(ctx, ZMQ_RADIO);

    r.connect("udp://224.0.0.1:28650");
    while (1) {
        string t = "Test string";
        zmq::message_t msg{ t.data(), t.size() };
        msg.set_group("telemetry");
        cout << "sending data" << endl;
        r.send(msg, 0);
        //zmq_msg_send(msg.handle(), r.handle(), 0);
        Sleep(100);
    }*/

    return 0;
}