booksbyus / zguide

Learning and Using ØMQ
http://zguide.zeromq.org
Other
3.45k stars 1.65k forks source link

Attempt to update tripping.c example from czmq v3 to v4 #720

Open datgrog opened 6 years ago

datgrog commented 6 years ago

Hi,

New to zeromq, I tried to play with .c examples but some (or many?) are not up to date. As a good exercice for me, I tried to update the tripping.c from czmq v3 to v4, as zthread is now deprecated. After "try harding" a while (see asynchronous-majordomo-pattern), here is my unproper update below. Unproper because I didn't figure it out how to manage the termination of the program correctly neither the handling of the Ctrl+C ( ^C ).

If someone could review, give me some hint or just finish it that could be nice.

//  Round-trip demonstrator
//  While this example runs in a single process, that is just to make
//  it easier to start and stop the example. The client task signals to
//  main when it's ready.

#include <czmq.h>

static void
client_task (zsock_t *pipe, void *args)
{
    assert (streq ((char *) args, "Hello, Client"));
    zsock_signal (pipe, 0);

    zsock_t *client = zsock_new (ZMQ_DEALER);
    zsock_connect (client, "tcp://127.0.0.1:5555");

    printf ("Setting up test...\n");
    zclock_sleep (100);

    int requests;
    int64_t start;

    printf ("Synchronous round-trip test...\n");
    start = zclock_time ();
    for (requests = 0; requests < 10000; requests++) {
        zstr_send (client, "hello");

        zmsg_t *msgh = zmsg_recv (client);
        zmsg_destroy (&msgh);

    }
    printf (" %d calls/second\n",
        (1000 * 10000) / (int) (zclock_time () - start));

    printf ("Asynchronous round-trip test...\n");
    start = zclock_time ();
    for (requests = 0; requests < 100000; requests++) {
        zstr_send (client, "hello");
    }
    for (requests = 0; requests < 100000; requests++) {
        char *reply = zstr_recv (client);
        zstr_free (&reply);
    }
    printf (" %d calls/second\n",
        (1000 * 100000) / (int) (zclock_time () - start));

    zstr_send (pipe, "done");
    printf("send 'done' to pipe\n");
}

//  Here is the worker task. All it does is receive a message, and
//  bounce it back the way it came:

static void
worker_task (zsock_t *pipe, void *args)
{
    assert (streq ((char *) args, "Hello, Worker"));
    zsock_signal (pipe, 0);

    zsock_t *worker = zsock_new (ZMQ_DEALER);
    zsock_connect (worker, "tcp://127.0.0.1:5556");

    bool terminated = false;
    while (!terminated) {
        zmsg_t *msg = zmsg_recv (worker);
        zmsg_send (&msg, worker);
        // zstr_send (worker, "hello back"); // Give better perf I don't know why

    }
    zsock_destroy (&worker);
}

//  Here is the broker task. It uses zproxy actor instance to switch
//  messages between frontend and backend:

static void
broker_task (zsock_t *pipe, void *args)
{
    assert (streq ((char *) args, "Hello, Task"));
    zsock_signal (pipe, 0);

    //  Prepare our proxy and its sockets
    zactor_t *proxy = zactor_new (zproxy, NULL);
    zstr_sendx (proxy, "FRONTEND", "DEALER", "tcp://127.0.0.1:5555", NULL);
    zsock_wait (proxy);
    zstr_sendx (proxy, "BACKEND", "DEALER", "tcp://127.0.0.1:5556", NULL);
    zsock_wait (proxy);

    bool terminated = false;
    while (!terminated) {
        zmsg_t *msg = zmsg_recv (pipe);
        if (!msg)
            break;              //  Interrupted
        char *command = zmsg_popstr (msg);

        if (streq (command, "$TERM")) {
            terminated = true;
            printf("broker received $TERM\n");
        }

        freen (command);
        zmsg_destroy (&msg);
    }

    zactor_destroy (&proxy);
}

//  Finally, here's the main task, which starts the client, worker, and
//  broker, and then runs until the client signals it to stop:

int main (void)
{

    //  Create threads
    zactor_t *client = zactor_new (client_task, "Hello, Client");
    assert (client);
    zactor_t *worker = zactor_new (worker_task, "Hello, Worker");
    assert (worker);
    zactor_t *broker = zactor_new (broker_task, "Hello, Task");
    assert (broker);

    char *signal = zstr_recv (client);
    printf("signal %s\n", signal);
    zstr_free (&signal);

    zactor_destroy (&client);
    printf("client done\n");
    zactor_destroy (&worker);
    printf("worker done\n");
    zactor_destroy (&broker);
    printf("broker done\n");

    return 0;
}
sappo commented 6 years ago

Hi onepix,

you want to do something like this:

if (zsys_interrupted)
        terminated = true;

Am 03.03.2018 2:00 nachm. schrieb "onepix" notifications@github.com:

Hi,

New to zeromq, I tried to play with .c examples but some (or many?) are not up to date. As a good exercice for me, I tried to update the tripping.c from czmq v3 to v4, as zthread is now deprecated. After "try harding" a while (see asynchronous-majordomo-pattern https://stackoverflow.com/questions/49019846/asynchronous-majordomo-pattern-example-using-the-czmq-4-1-0-new-zsock-api-update), here is my unproper update below. Unproper because I didn't figure it out how to manage the termination of the program correctly neither the handling of the Ctrl+C ( ^C ).

If someone could review, give me some hint or just finish it that could be nice.

// Round-trip demonstrator // While this example runs in a single process, that is just to make // it easier to start and stop the example. The client task signals to // main when it's ready.

include

static void client_task (zsock_t pipe, void args) { assert (streq ((char *) args, "Hello, Client")); zsock_signal (pipe, 0);

zsock_t *client = zsock_new (ZMQ_DEALER);
zsock_connect (client, "tcp://127.0.0.1:5555");

printf ("Setting up test...\n");
zclock_sleep (100);

int requests;
int64_t start;

printf ("Synchronous round-trip test...\n");
start = zclock_time ();
for (requests = 0; requests < 10000; requests++) {
    zstr_send (client, "hello");

    zmsg_t *msgh = zmsg_recv (client);
    zmsg_destroy (&msgh);

}
printf (" %d calls/second\n",
    (1000 * 10000) / (int) (zclock_time () - start));

printf ("Asynchronous round-trip test...\n");
start = zclock_time ();
for (requests = 0; requests < 100000; requests++) {
    zstr_send (client, "hello");
}
for (requests = 0; requests < 100000; requests++) {
    char *reply = zstr_recv (client);
    zstr_free (&reply);
}
printf (" %d calls/second\n",
    (1000 * 100000) / (int) (zclock_time () - start));

zstr_send (pipe, "done");
printf("send 'done' to pipe\n");

}

// Here is the worker task. All it does is receive a message, and // bounce it back the way it came:

static void worker_task (zsock_t pipe, void args) { assert (streq ((char *) args, "Hello, Worker")); zsock_signal (pipe, 0);

zsock_t *worker = zsock_new (ZMQ_DEALER);
zsock_connect (worker, "tcp://127.0.0.1:5556");

bool terminated = false;
while (!terminated) {
    zmsg_t *msg = zmsg_recv (worker);
    zmsg_send (&msg, worker);
    // zstr_send (worker, "hello back"); // Give better perf I

don't know why

}
zsock_destroy (&worker);

}

// Here is the broker task. It uses zproxy actor instance to switch // messages between frontend and backend:

static void broker_task (zsock_t pipe, void args) { assert (streq ((char *) args, "Hello, Task")); zsock_signal (pipe, 0);

//  Prepare our proxy and its sockets
zactor_t *proxy = zactor_new (zproxy, NULL);
zstr_sendx (proxy, "FRONTEND", "DEALER", "tcp://127.0.0.1:5555", NULL);
zsock_wait (proxy);
zstr_sendx (proxy, "BACKEND", "DEALER", "tcp://127.0.0.1:5556", NULL);
zsock_wait (proxy);

bool terminated = false;
while (!terminated) {
    zmsg_t *msg = zmsg_recv (pipe);
    if (!msg)
        break;              //  Interrupted
    char *command = zmsg_popstr (msg);

    if (streq (command, "$TERM")) {
        terminated = true;
        printf("broker received $TERM\n");
    }

    freen (command);
    zmsg_destroy (&msg);
}

zactor_destroy (&proxy);

}

// Finally, here's the main task, which starts the client, worker, and // broker, and then runs until the client signals it to stop:

int main (void) {

//  Create threads
zactor_t *client = zactor_new (client_task, "Hello, Client");
assert (client);
zactor_t *worker = zactor_new (worker_task, "Hello, Worker");
assert (worker);
zactor_t *broker = zactor_new (broker_task, "Hello, Task");
assert (broker);

char *signal = zstr_recv (client);
printf("signal %s\n", signal);
zstr_free (&signal);

zactor_destroy (&client);
printf("client done\n");
zactor_destroy (&worker);
printf("worker done\n");
zactor_destroy (&broker);
printf("broker done\n");

return 0;

}

— You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHub https://github.com/booksbyus/zguide/issues/720, or mute the thread https://github.com/notifications/unsubscribe-auth/AAeGulLzLuWunNEQuRzUaAoWaFtLjXd_ks5tapP2gaJpZM4Sa6Qw .

tan-wei commented 2 years ago

So I have the same attempt. Any better solution?