zeromq / libzmq

ZeroMQ core engine in C++, implements ZMTP/3.1
https://www.zeromq.org
Mozilla Public License 2.0
9.75k stars 2.36k forks source link

ZMQ_SNDBUF&&ZMQ_RCVBUF on ZMQ_PUB&&ZMQ_SUB no effect #4415

Open kongshuiJ opened 2 years ago

kongshuiJ commented 2 years ago

Issue description

I used ZMQ_SNDBUF && ZMQ_RCVBUF on ZMQ_PUB&&ZMQ_SUB no effect. But I executed the following command and it worked. sudo sysctl -w net.core.wmem_default=5000

Environment

Is that why?

Code

sender

//
// Created by kongshui on 22-8-23.
//
#include <unistd.h>
#include <iostream>
#include <zmq.hpp>

void qqq()
{
    const int first_count = 10000000;
    int hwm = 5;
    int time = 10;

    auto con = zmq_ctx_new();

    // Set up bind socket
    void *pub_socket = zmq_socket(con, ZMQ_PUB);
    zmq_setsockopt(pub_socket, ZMQ_SNDTIMEO, &time, sizeof(time));
    zmq_setsockopt(pub_socket, ZMQ_RCVTIMEO, &time, sizeof(time));
    zmq_setsockopt(pub_socket, ZMQ_SNDHWM, &hwm, sizeof(hwm));
    zmq_setsockopt(pub_socket, ZMQ_RCVHWM, &hwm, sizeof(hwm));
    zmq_bind(pub_socket, "ipc:///tmp/qos_out");

    int buf_size = 5000;
    auto rrr = zmq_setsockopt(pub_socket, ZMQ_SNDBUF, &buf_size, sizeof(buf_size));
    printf("rrr: %d", rrr);
    hwm = 10;
    size_t hwm_size = sizeof(hwm);
    rrr = zmq_getsockopt(pub_socket, ZMQ_SNDBUF, &hwm, &hwm_size);
    printf("rrr: %d, size: %d\n", rrr, hwm);

    sleep(3);

    // Send messages
    int send_count = 0;
    int recv_count = 0;

    std::string msg;
    while (send_count < first_count)
    {
        msg = std::to_string(send_count);
        int res = zmq_send(pub_socket, msg.c_str(), msg.length(), ZMQ_DONTWAIT);

        printf("res:%d, msg:%s", res, msg.c_str());
        if (msg.length() == res)
            ++send_count;
        else
            break;

        usleep(1000);
    }

    // Clean up
    zmq_close(pub_socket);
}

void ttt()
{
    zmq::context_t context(1);

    const int first_count = 15;
    int hwm = 10;
    int buf_size = 10000;
    zmq::socket_t pub_socket(context, ZMQ_PUB);
    zmq_setsockopt(pub_socket, ZMQ_SNDHWM, &hwm, sizeof(hwm));
    zmq_setsockopt(pub_socket, ZMQ_SNDBUF, &buf_size, sizeof(buf_size));

    pub_socket.bind("ipc:///tmp/qos_in");

    std::string msg = "111";

    while (true)
    {
        auto res = zmq_send(pub_socket, msg.c_str(), msg.length(), ZMQ_DONTWAIT);
        printf("res:%d\n", res);
    }
}

int main(int, char **)
{
    qqq();

    return 0;
}

receiver

//
// Created by kongshui on 22-8-23.
//
#include <unistd.h>
#include <iostream>
#include <zmq.hpp>

void qqq()
{
    const int first_count = 15;
    int hwm = 5;
    int time = 10;

    auto con = zmq_ctx_new();

    hwm = 10;
    // Set up connect socket
    void *sub_socket = zmq_socket(con, ZMQ_SUB);
    zmq_setsockopt(sub_socket, ZMQ_SNDTIMEO, &time, sizeof(time));
    zmq_setsockopt(sub_socket, ZMQ_RCVTIMEO, &time, sizeof(time));
    zmq_setsockopt(sub_socket, ZMQ_SNDHWM, &hwm, sizeof(hwm));
    zmq_setsockopt(sub_socket, ZMQ_RCVHWM, &hwm, sizeof(hwm));

    int buf_size = 5000;
    std::string topic = "www";

    auto rrr = zmq_setsockopt(sub_socket, ZMQ_RCVBUF, &buf_size, sizeof(buf_size));
    printf("rrr: %d", rrr);
    hwm = 10;
    size_t hwm_size = sizeof(hwm);
    rrr = zmq_getsockopt(sub_socket, ZMQ_RCVBUF, &hwm, &hwm_size);
    printf("rrr: %d, size: %d\n", rrr, hwm);

    zmq_connect(sub_socket, "ipc:///tmp/qos_out");
//    zmq_setsockopt(sub_socket, ZMQ_SUBSCRIBE, topic.c_str(), topic.size());
    zmq_setsockopt(sub_socket, ZMQ_SUBSCRIBE, nullptr, 0);

    sleep(3);

    // Send messages
    int send_count = 0;
    int recv_count = 0;

    std::string msg;

    char res[1024] = {0};
    while (zmq_recv(sub_socket, res, 1021, ZMQ_DONTWAIT) > 0)
    {
        ++recv_count;
        printf("res:%s", res);
        printf("first_count: %d, recv_count: %d\n", first_count, recv_count);
        memset(res, 0, sizeof(res));
    }

    while (1)
    {
        if (zmq_recv(sub_socket, res, 1021, ZMQ_DONTWAIT) > 0)
        {
            ++recv_count;
            printf("res:%s", res);
            memset(res, 0, sizeof(res));
            printf("first_count: %d, recv_count: %d\n", first_count, recv_count);

            sleep(1);
        }
    }

    // Clean up
    zmq_close(sub_socket);
}

void ttt()
{
    zmq::context_t context(1);

    const int first_count = 15;
    int hwm = 10;
    int buf_size = 10000;
    zmq::socket_t pub_socket(context, ZMQ_PUB);
    zmq_setsockopt(pub_socket, ZMQ_SNDHWM, &hwm, sizeof(hwm));
    zmq_setsockopt(pub_socket, ZMQ_SNDBUF, &buf_size, sizeof(buf_size));

    pub_socket.bind("ipc:///tmp/qos_in");

    std::string msg = "111";

    while (true)
    {
        auto res = zmq_send(pub_socket, msg.c_str(), msg.length(), ZMQ_DONTWAIT);
        printf("res:%d\n", res);
    }
}

int main(int, char **)
{
    qqq();

    return 0;
}
DrCorvax commented 2 years ago

Try zmq_setsockopt(sub_socket, ZMQ_SUBSCRIBE, "", 0); not zmq_setsockopt(sub_socket, ZMQ_SUBSCRIBE, nullptr, 0);