zeromq / zmqpp

0mq 'highlevel' C++ bindings
http://zeromq.github.io/zmqpp
Mozilla Public License 2.0
438 stars 195 forks source link

Sample tutorials for more functionality. #160

Open ghost opened 8 years ago

ghost commented 8 years ago

It will be nice to have some more tutorial and examples for functionality like Reactors and Pollers.

sunkin351 commented 8 years ago

Tutorials on actors and inter-thread communications would be very nice too.

benjamg commented 8 years ago

Would having versions of the examples from the zmq guide (https://github.com/imatix/zguide/tree/master/examples) build with zmqpp solve that? I'm not sure I'm all that good at writing a tutorial.

ghost commented 8 years ago

That would be very helpful. On May 19, 2016 11:40 AM, "Ben Gray" notifications@github.com wrote:

Would having versions of the examples from the zmq guide ( https://github.com/imatix/zguide/tree/master/examples) build with zmqpp solve that? I'm not sure I'm all that good at writing a tutorial.

— You are receiving this because you authored the thread. Reply to this email directly or view it on GitHub https://github.com/zeromq/zmqpp/issues/160#issuecomment-220262569

markns commented 8 years ago

+1

homer6 commented 8 years ago
#include <iostream>
#include <string>
#include <chrono>
#include <thread>
#include <vector>

using namespace std;

#include <zmqpp/context.hpp>
#include <zmqpp/socket.hpp>
#include <zmqpp/message.hpp>
#include <zmqpp/reactor.hpp>

#include <signal.h>

static volatile bool interrupted = false;

void my_signal_handler(int){
    cout << "signal" << endl;
    interrupted = true;
}

void producer_task(){

    try{

        zmqpp::context zmq_context;
        zmqpp::socket socket {
            zmq_context,
            zmqpp::socket_type::push
        };
        socket.bind( "tcp://*:4099" );

        int x = 0;

        while( 1 ){

            std::this_thread::sleep_for( std::chrono::seconds(1) );

            cout << "Message: " << x << endl;

            zmqpp::message request;
            request << "Message: " + std::to_string(x);
            request << x;
            socket.send(request);

            x++;

        }

    }catch( zmqpp::zmq_internal_exception& e ){

        cerr << "Exception: " << e.what() << endl;

    }catch( ... ){

        cerr << "Unknown Exception: " << endl;

    }

}

void consumer_task(){

    try{

        zmqpp::context zmq_context;

        zmqpp::socket socket_1 {
            zmq_context,
            zmqpp::socket_type::pull
        };
        socket_1.connect( "tcp://127.0.0.1:4099" );

        zmqpp::socket socket_2 {
            zmq_context,
            zmqpp::socket_type::pull
        };
        socket_2.connect( "tcp://127.0.0.1:4099" );

        zmqpp::reactor reactor;

        auto first_listener = [&socket_1](){
            zmqpp::message response;
            socket_1.receive(response);
            cout << "first_listener: " << response.get(0) << endl;
        };

        auto second_listener = [&socket_2](){
            zmqpp::message response;
            socket_2.receive(response);
            cout << "second_listener: " << response.get(0) << endl;
        };

        reactor.add( socket_1, first_listener );
        reactor.add( socket_2, second_listener );

        while( reactor.poll() && !interrupted ){

        }

        //interrupted
        cout << "interrupted" << endl;

    }catch( zmqpp::zmq_internal_exception& e ){

        cerr << "Exception: " << e.what() << endl;

    }catch( ... ){

        cerr << "Unknown Exception: " << endl;

    }

}

void usage(){

    cout << "usage: test [producer|consumer]" << endl;

}

int main( int argc, char** argv ){

    signal(SIGINT, my_signal_handler);

    string task;

    vector<string> arguments;
    if( argc > 1 ){
        int count = 0;
        while( count < argc ){          
            if( count == 1 ) task = string(argv[1]);
            arguments.push_back( argv[count] );
            count++;
        }
    }else{
        usage();
        return 0;
    }

    if( task == "producer" ){
        std::thread producer_thread( producer_task );
        producer_thread.join();
    }else if( task == "consumer" ){
        std::thread consumer_thread( consumer_task );
        consumer_thread.join();
    }else{
        usage();
        return 0;
    }

    return 0;

}
homer6 commented 8 years ago

Alternative using the reactor's poller and one callback:

        zmqpp::context zmq_context;

        zmqpp::socket socket_1 {
            zmq_context,
            zmqpp::socket_type::pull
        };
        socket_1.connect( "tcp://127.0.0.1:4099" );
        //socket_1.subscribe("");

        zmqpp::socket socket_2 {
            zmq_context,
            zmqpp::socket_type::pull
        };
        socket_2.connect( "tcp://127.0.0.1:4099" );
        //socket_2.subscribe("");

        zmqpp::socket pub_socket {
            zmq_context,
            zmqpp::socket_type::pub
        };
        pub_socket.bind( "tcp://127.0.0.1:4098" );

        zmqpp::reactor reactor;
        zmqpp::poller& poller = reactor.get_poller();

        auto socket_listener = [&poller, &socket_1, &socket_2, &pub_socket](){

            zmqpp::message response;

            if( poller.has_input(socket_1) ){
                socket_1.receive(response);
                cout << "1: " << response.get(0) << endl;
                pub_socket.send("OKAY 1");
            }

            if( poller.has_input(socket_2) ){
                socket_2.receive(response);
                cout << "2: " << response.get(0) << endl;
                pub_socket.send("OKAY 2");
            }

        };

        reactor.add( socket_1, socket_listener );
        reactor.add( socket_2, socket_listener );

        while( reactor.poll() != -1 && !interrupted ){

        }
jgornowich commented 4 years ago

+1