zeromq / libzmq

ZeroMQ core engine in C++, implements ZMTP/3.1
https://www.zeromq.org
Mozilla Public License 2.0
9.8k stars 2.36k forks source link

Using solarflare onload with epgm fails an assert in src/session_base.cpp #4319

Open nkarnev opened 2 years ago

nkarnev commented 2 years ago

Issue description

Using onload prior to running a binary which uses epgm protocol to create a ZMQ_PUB socket results in an assert failure despite zmq_bind returning 0 (no error). I suspect that onload hijacks the socket api and somehow changes the parameters being passed around. This does not appear to be a problem if TCP protocol is used with ZMQ_PUB.

The Error message. oo:pgm_publisher[54637]: Using OpenOnload 201710-u1.1 Copyright 2006-2018 Solarflare Communications, 2002-2005 Level 5 Networks [7] oo:pgm_publisher[54637]: Importing OpenOnload 201710-u1.1 Copyright 2006-2018 Solarflare Communications, 2002-2005 Level 5 Networks [0,c54637-c0] Invalid argument (src/session_base.cpp:723) Aborted

Below are lines 721-723 int rc = pgm_sender->init (udp_encapsulation, _addr->address.c_str ()); errno_assert (rc == 0);

Environment

g++ 9.2.0

Minimal test code / Steps to reproduce the issue

In addition to the code below, one has to start the resulting binary using onload which also requires a solarflare NIC. Understandingly that is quite limiting.

inline static void zmq_version_used() { int major, minor, patch; zmq_version(&major, &minor, &patch); fprintf(stdout, "Current 0MQ version is %d.%d.%d\n", major, minor, patch); }

int main() { void context = zmq_ctx_new(); assert(context); zmq_version_used(); void pub = zmq_socket(context, ZMQ_PUB); int ttl = 30; int socket_opt_rc = zmq_setsockopt(pub,ZMQ_MULTICAST_HOPS, &ttl,sizeof(int)); if (socket_opt_rc == -1) fprintf(stderr, "Failed to set multicast hops: %s\n", zmq_strerror(errno));

int rc = zmq_bind(pub, "epgm://interface;address_group:port"); if (rc == -1) fprintf(stderr, "Some error %s\n", zmq_strerror(errno)); else fprintf(stdout, "Bound socket\n"); while (1) { zmq_send(pub, "TEST",4,0); } return 0; }

What's the actual result? (include assertion message & call stack if applicable)

(gdb) bt

0 0x00007ffff6ca1277 in __GI_raise (sig=sig@entry=6) at ../nptl/sysdeps/unix/sysv/linux/raise.c:56

1 0x00007ffff6ca2968 in __GI_abort () at abort.c:90

2 0x00000000004058c0 in zmq::zmqabort (errmsg=errmsg_@entry=0x7ffff6df299d "Invalid argument") at src/err.cpp:88

3 0x000000000044ca3f in zmq::session_base_t::start_connecting(bool) () at src/session_base.cpp:734

4 0x000000000041b483 in zmq::object_t::processcommand (this=0x6ea130, cmd=...) at src/object.cpp:87

5 0x0000000000417a5c in zmq::io_thread_t::in_event (this=0x6e7e60) at src/io_thread.cpp:91

6 0x0000000000416e06 in zmq::epoll_t::loop (this=0x6e8400) at src/epoll.cpp:206

7 0x000000000042f8a1 in threadroutine (arg=0x6e8458) at src/thread.cpp:402

8 0x00007ffff7932e25 in start_thread (arg=0x7ffff5e4a700) at pthread_create.c:308

9 0x00007ffff6d69bad in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:113

(gdb) quit

What's the expected result?

Publisher to start broadcasting data

ljluestc commented 9 months ago
#include <zmq.h>
#include <cassert>
#include <cstdio>
#include <cstring>
#include <unistd.h>

// Function to check the version of ZeroMQ
inline static void zmq_version_used() {
    int major, minor, patch;
    zmq_version(&major, &minor, &patch);
    fprintf(stdout, "Current 0MQ version is %d.%d.%d\n", major, minor, patch);
}

int main() {
    // Create a ZeroMQ context
    void *context = zmq_ctx_new();
    assert(context);

    // Display the ZeroMQ version
    zmq_version_used();

    // Create a PUB socket
    void *pub = zmq_socket(context, ZMQ_PUB);
    assert(pub);

    // Set multicast hops
    int ttl = 30;
    int socket_opt_rc = zmq_setsockopt(pub, ZMQ_MULTICAST_HOPS, &ttl, sizeof(int));
    if (socket_opt_rc == -1) {
        fprintf(stderr, "Failed to set multicast hops: %s\n", zmq_strerror(errno));
    }

    // Bind the socket using the EPGM protocol
    int rc = zmq_bind(pub, "epgm://interface;address_group:port");
    if (rc == -1) {
        fprintf(stderr, "Some error %s\n", zmq_strerror(errno));
        zmq_close(pub);
        zmq_ctx_destroy(context);
        return 1;
    } else {
        fprintf(stdout, "Bound socket\n");
    }

    // Publish data indefinitely
    while (1) {
        zmq_send(pub, "TEST", 4, 0);
        usleep(1000000); // Sleep for 1 second
    }

    // Clean up
    zmq_close(pub);
    zmq_ctx_destroy(context);

    return 0;
}