zeromq / cppzmq

Header-only C++ binding for libzmq
http://www.zeromq.org
MIT License
1.94k stars 757 forks source link

dealer can not receive all messages. #539

Closed wangyuqian1234 closed 2 years ago

wangyuqian1234 commented 2 years ago

use socket type: dealer(client)---router(server) send 100000 message, dealer will not receive all message.

Here's code, is it right? client code:

#define CLIENT_SEND_MAX_MESSAGE (100000)
class client_task {
public:
    client_task()
        : ctx_(1),
          client_socket_(ctx_, ZMQ_DEALER)
    {}
    void start() {
        char identity[10] = {};
        sprintf(identity, "%04X-%04X", within(0x20000), within(0x20000));
        int linger = 0;
        int sendbuf = 60000000;
        client_socket_.setsockopt(ZMQ_RCVHWM, &sendbuf, sizeof(int));
        client_socket_.setsockopt (ZMQ_LINGER, &linger, sizeof (linger));
        client_socket_.setsockopt(ZMQ_IDENTITY, identity, strlen(identity));
        client_socket_.connect("tcp://localhost:5570");
        int recv_cnt = 0;
        char msg[32] = {};
        for (int count = 0; count < 10000; count++) {
            sprintf(msg, "Hello world #%d", count);
            zmq::message_t message(strlen(msg));
                memcpy(message.data(), msg, strlen(msg));
                client_socket_.send(message);
        }
        try {
            zmq::pollitem_t items[] = {
            { client_socket_, 0, ZMQ_POLLIN, 0 } };
            zmq::message_t message1;
            int more;               //  Multipart detection
            for (int count = 0; count < CLIENT_SEND_MAX_MESSAGE + 1 
                && recv_cnt < CLIENT_SEND_MAX_MESSAGE; count++) {
                zmq::poll(items, 1, 5);
                if (items[0].revents & ZMQ_POLLIN) {
                    recv_cnt ++;
                    while(1){
                        client_socket_.recv(&message1);
                        size_t more_size = sizeof (more);
                        client_socket_.getsockopt(ZMQ_RCVMORE, &more, &more_size);
                        if (!more) {
                            break;      //  Last message part
                        }
                    }
                }
            }

        }
        catch (std::exception &e) {}
        std::cout << recv_cnt << " replies received " << std::endl;
    }
private:
    zmq::context_t ctx_;
    zmq::socket_t client_socket_;
};

int main (void)
{
    client_task ct1;
    ct1.start();
    getchar();
    return 0;
}

server code:


class server_worker {
public:
    server_worker(zmq::context_t &ctx, int sock_type)
        : ctx_(ctx),
          worker_(ctx_, sock_type)
    {}

    void work() {
            worker_.connect("inproc://backend");
            int linger = 0;
            worker_.setsockopt (ZMQ_LINGER, &linger, sizeof (linger));
        try {
            while (true) {
                zmq::message_t identity;
                zmq::message_t msg;
                zmq::message_t copied_id;
                zmq::message_t copied_msg;
                worker_.recv(&identity);
                worker_.recv(&msg);
                copied_id.copy(&identity);
                copied_msg.copy(&msg);
                worker_.send(copied_id, ZMQ_SNDMORE);
                worker_.send(copied_msg);
            }
        }
        catch (std::exception &e) {}
    }

private:
    zmq::context_t &ctx_;
    zmq::socket_t worker_;
};

class server_task {
public:
    server_task()
        : ctx_(1),
          frontend_(ctx_, ZMQ_ROUTER),
          backend_(ctx_, ZMQ_DEALER)
    {}

    enum { kMaxThread = 1 };

    void run() {
        frontend_.bind("tcp://*:5570");
        backend_.bind("inproc://backend");

        int sendbuf = 60000000;
        int linger = 0;
        frontend_.setsockopt(ZMQ_SNDHWM, &sendbuf, sizeof(int));
        frontend_.setsockopt (ZMQ_LINGER, &linger, sizeof (linger));
        backend_.setsockopt (ZMQ_LINGER, &linger, sizeof (linger));

        std::vector<server_worker *> worker;
        std::vector<std::thread *> worker_thread;
        for (int i = 0; i < kMaxThread; ++i) {
            worker.push_back(new server_worker(ctx_, ZMQ_DEALER));
            worker_thread.push_back(new std::thread(std::bind(&server_worker::work, worker[i])));
            worker_thread[i]->detach();
        }
    int i =0;
        try {
        zmq::pollitem_t items [] = {
            { frontend_, 0, ZMQ_POLLIN, 0 },
            { backend_, 0, ZMQ_POLLIN, 0 }
        };
        //  Switch messages between sockets
        while (1) {
            zmq::message_t message;
            int more;               //  Multipart detection
            zmq::poll (&items [0], 2, -1);
            if (items [0].revents & ZMQ_POLLIN) {
                while (1) {
                    frontend_.recv(&message);
                    size_t more_size = sizeof (more);
                    frontend_.getsockopt(ZMQ_RCVMORE, &more, &more_size);
                    backend_.send(message, more? ZMQ_SNDMORE: 0);
                    if (!more)
                        break;      //  Last message part
                }
            }
            if (items [1].revents & ZMQ_POLLIN) {
                while (1) {
                    backend_.recv(&message);
                    size_t more_size = sizeof (more);
                    backend_.getsockopt(ZMQ_RCVMORE, &more, &more_size);
                    frontend_.send(message, more? ZMQ_SNDMORE: 0);
                    if (!more) {
                        break;      //  Last message part
                    }
                }
            }
        }

        }
        catch (std::exception &e) {}
        for (int i = 0; i < kMaxThread; ++i) {
            delete worker[i];
            delete worker_thread[i];
        }
    }
private:
    zmq::context_t ctx_;
    zmq::socket_t frontend_;
    zmq::socket_t backend_;
};

int main (void)
{
    server_task st;
    std::thread t4(std::bind(&server_task::run, &st));
    t4.detach();
    getchar();
    return 0;
}