zeromq / cppzmq

Header-only C++ binding for libzmq
http://www.zeromq.org
MIT License
1.97k stars 757 forks source link

Crash in multithreaded server example #348

Open kakyoism opened 5 years ago

kakyoism commented 5 years ago

I'm new to zmq and cppzmq. While trying to run the multithreaded example in the official guide: http://zguide.zeromq.org/cpp:mtserver

My setup

I hit a few problems.

Problem 1

When running source code in the guide, it hangs forever without any stdout output shown up.

Here is the code directly copied from the Guide.

/*
    Multithreaded Hello World server in C
*/

#include <pthread.h>
#include <unistd.h>
#include <cassert>
#include <string>
#include <iostream>
#include <zmq.hpp>

void *worker_routine (void *arg)
{
    zmq::context_t *context = (zmq::context_t *) arg;

    zmq::socket_t socket (*context, ZMQ_REP);
    socket.connect ("inproc://workers");

    while (true) {
        //  Wait for next request from client
        zmq::message_t request;
        socket.recv (&request);
        std::cout << "Received request: [" << (char*) request.data() << "]" << std::endl;

        //  Do some 'work'
        sleep (1);

        //  Send reply back to client
        zmq::message_t reply (6);
        memcpy ((void *) reply.data (), "World", 6);
        socket.send (reply);
    }
    return (NULL);
}

int main ()
{
    //  Prepare our context and sockets
    zmq::context_t context (1);
    zmq::socket_t clients (context, ZMQ_ROUTER);
    clients.bind ("tcp://*:5555");
    zmq::socket_t workers (context, ZMQ_DEALER);
    workers.bind ("inproc://workers");

    //  Launch pool of worker threads
    for (int thread_nbr = 0; thread_nbr != 5; thread_nbr++) {
        pthread_t worker;
        pthread_create (&worker, NULL, worker_routine, (void *) &context);
    }
    //  Connect work threads to client threads via a queue
    zmq::proxy (static_cast<void*>(clients),
                static_cast<void*>(workers),
                nullptr);
    return 0;
}

It crashes soon after I put a breakpoin in the while loop of the worker.

Problem 2

Noticing that the compiler prompted me to replace deprecated API calls, I modified the above sample code to make the warnings disappear.

/*
 Multithreaded Hello World server in C
 */

#include <pthread.h>
#include <unistd.h>
#include <cassert>
#include <string>
#include <iostream>
#include <zmq.hpp>

void *worker_routine (void *arg)
{
    zmq::context_t *context = (zmq::context_t *) arg;

    zmq::socket_t socket (*context, ZMQ_REP);
    socket.connect ("inproc://workers");

    while (true) {
        //  Wait for next request from client
        std::array<char, 1024> buf{'\0'};
        zmq::mutable_buffer request(buf.data(), buf.size());
        socket.recv(request, zmq::recv_flags::dontwait);
        std::cout << "Received request: [" << (char*) request.data() << "]" << std::endl;

        //  Do some 'work'
        sleep (1);

        //  Send reply back to client
        zmq::message_t reply (6);
        memcpy ((void *) reply.data (), "World", 6);
        try {
            socket.send (reply, zmq::send_flags::dontwait);
        }
        catch (zmq::error_t& e) {
            std::cout << "ERROR: " << e.num() << std::endl;
        }
    }
    return (NULL);
}

int main ()
{
    //  Prepare our context and sockets
    zmq::context_t context (1);
    zmq::socket_t clients (context, ZMQ_ROUTER);
    clients.bind ("tcp://*:5555");  // who i talk to.
    zmq::socket_t workers (context, ZMQ_DEALER);
    workers.bind ("inproc://workers");

    //  Launch pool of worker threads
    for (int thread_nbr = 0; thread_nbr != 5; thread_nbr++) {
        pthread_t worker;
        pthread_create (&worker, NULL, worker_routine, (void *) &context);
    }
    //  Connect work threads to client threads via a queue
    zmq::proxy (clients, workers);
    return 0;
}

This code keeps giving me error number 156384763 from the try-catch block. I can't find the definition of the error number.

I'd appreciate it if anyone can point out where I did wrong.

gummif commented 5 years ago

First I would try changing the receiving part to something like

std::array<char, 1024> buf{};
auto ret = socket.recv(zmq::buffer(buf), zmq::recv_flags::none);
if (ret)
{
    std::cout << "Received request: [" << std::string(buf.data(), ret->size) << "]" << std::endl;
}
else
{
    std::cout << "Receive timeout" << std::endl;
    continue;
}

And also try changing e.num() to e.what() to get an error description. You can also simplify the send with socket.send(zmq::buffer(std::string_view("World")), zmq::send_flags::dontwait);.