zeromq / perlzmq

version agnostic Perl bindings for zeromq
Other
23 stars 16 forks source link

dealer with AE::io don't get any event when watch_write is 0 #53

Open Evan-Adam opened 9 months ago

Evan-Adam commented 9 months ago

Hello everyone, first, thanks for this software, it's wonderful and very well documented I'm new to zmq, and inerithed a code base using a router and multiples dealer connecting to it. I tried to make a small working example for this achitecture, using zmq::ffi and ae::io with a router and a dealer.

it lead me to theese two script :

router progam ```perl use strict; use warnings; $| = 1; # autoflush stdout after each print use ZMQ::FFI qw(ZMQ_DEALER); use ZMQ::FFI::Constants qw(ZMQ_PULL ZMQ_PUSH ZMQ_SUB ZMQ_DONTWAIT ZMQ_SNDMORE ZMQ_ROUTER ZMQ_RCVHWM ZMQ_SNDHWM); use AnyEvent; # creating the context my $context = ZMQ::FFI->new(); # create a new socket in router mode my $router = $context->socket(ZMQ_ROUTER); $router->set_identity("test-router"); $router->die_on_error(1); my $ipcLocation = 'ipc:///tmp/dealer-test1'; my $err = $router->bind($ipcLocation); print "binding to ipc $ipcLocation, err is : $err \n"; print "getting the first message from the dealer to know it's identity\n"; my ($identity, $msg) = $router->recv_multipart(); print "dealer sent : $msg\n"; for (1..100){ $router->send_multipart([$identity, 'looping message']); } print "all message were sent\n"; sleep(5); $router->close(); print "script ended\n"; ```
listener progam ```perl use strict; use warnings; use v5.10; $| = 1; # autoflush stdout after each print use ZMQ::FFI qw(ZMQ_DEALER); use ZMQ::FFI::Constants qw(ZMQ_PULL ZMQ_PUSH ZMQ_SUB ZMQ_DONTWAIT ZMQ_SNDMORE ZMQ_RCVHWM ZMQ_SNDHWM); use AnyEvent; use EV; my $while_counter = 0; my $ae_counter = 0; my $mode = 0; print "mode : $mode\n"; # creating the context my $context = ZMQ::FFI->new(); # create a new socket in dealer mode. my $receiver = $context->socket(ZMQ_DEALER); $receiver->set_identity("test-identity"); $receiver->die_on_error(1); my $ipcLocation = 'ipc:///tmp/dealer-test1'; my $err = $receiver->connect($ipcLocation); print "Connecting to ipc $ipcLocation, err is : $err"; $receiver->send_multipart(['this is a message to set the identity in the router']); my $w1 = AE::signal INT => sub { print "\nAE count : $ae_counter\nwhile count : $while_counter\n"; EV::break(EV::BREAK_ALL) }; my $watcher_timer; $watcher_timer = AE::io $receiver->get_fd, $mode, sub { $ae_counter++; while ( $receiver->has_pollin ) { my $msg = $receiver->recv(); $while_counter++; print $msg . "\n"; } }; EV::run(); ```

When I run both of them at the same time, the router correctly get the message from the dealer, and send back 100 messages. The Dealer never catch a single message, after a while if I ctrl+C I got this :

mode : 0
Connecting to ipc ipc:///tmp/dealer-test1, err is : ^C
AE count : 0
while count : 0 

If I change the AE::io mode with 1 instead of 0, i correctly get every message, but this is an active loop, eating 100% of my cpu. Here the versions I use : OS : debian 11 perl version : perl 5, version 32, subversion 1 (v5.32.1) built for x86_64-linux-gnu-thread-multi AnyEvent : 7.17 libev-perl : 4.33-1+b1 libevent : 2.1-7 FFI::Platypus : 1.34 for zmq::ffi I tried with 1.17 and 1.19, with the same result.

I'm not sure on how to debug this, I saw an example of nonblocking and of dealer/router in the readme and a router/req in the zmq guide, but nothing about with both. does someone reproduce such a behaviour, or have an idea of what's going on ?

calid commented 9 months ago

You are mixing send_multipart with recv in the listener, does switching to recv_multipart fix it?

On Fri, Jan 12, 2024, 5:55 AM Evan-Adam @.***> wrote:

Hello everyone, first, thanks for this software, it's wonderful and very well documented I'm new to zmq, and inerithed a code base using a router and multiples dealer connecting to it. I tried to make a small working example for this achitecture, using zmq::ffi and ae::io with a router and a dealer.

it lead me to theese two script : router progam

use strict;use warnings; $| = 1; # autoflush stdout after each print use ZMQ::FFI qw(ZMQ_DEALER);use ZMQ::FFI::Constants qw(ZMQ_PULL ZMQ_PUSH ZMQ_SUB ZMQ_DONTWAIT ZMQ_SNDMORE ZMQ_ROUTER ZMQ_RCVHWM ZMQ_SNDHWM); use AnyEvent;

creating the context

my $context = ZMQ::FFI->new();# create a new socket in router mode my $router = $context->socket(ZMQ_ROUTER); $router->set_identity("test-router");$router->die_on_error(1); my $ipcLocation = 'ipc:///tmp/dealer-test1';my $err = $router->bind($ipcLocation);print "binding to ipc $ipcLocation, err is : $err \n"; print "getting the first message from the dealer to know it's identity\n";my ($identity, $msg) = $router->recv_multipart();print "dealer sent : $msg\n"; for (1..100){ $router->send_multipart([$identity, 'looping message']); }print "all message were sent\n";sleep(5);$router->close();print "script ended\n";

listener progam

use strict;use warnings;use v5.10; $| = 1; # autoflush stdout after each print use ZMQ::FFI qw(ZMQ_DEALER);use ZMQ::FFI::Constants qw(ZMQ_PULL ZMQ_PUSH ZMQ_SUB ZMQ_DONTWAIT ZMQ_SNDMORE ZMQ_RCVHWM ZMQ_SNDHWM); use AnyEvent;use EV; my $while_counter = 0;my $ae_counter = 0; my $mode = 0;print "mode : $mode\n";# creating the context my $context = ZMQ::FFI->new();# create a new socket in dealer mode. my $receiver = $context->socket(ZMQ_DEALER); $receiver->set_identity("test-identity");$receiver->die_on_error(1);my $ipcLocation = 'ipc:///tmp/dealer-test1';my $err = $receiver->connect($ipcLocation);print "Connecting to ipc $ipcLocation, err is : $err"; $receiver->send_multipart(['this is a message to set the identity in the router']); my $w1 = AE::signal INT => sub { print "\nAE count : $ae_counter\nwhile count : $while_counter\n"; EV::break(EV::BREAK_ALL) }; my $watcher_timer;$watcher_timer = AE::io $receiver->get_fd, $mode, sub { $ae_counter++; while ( $receiver->has_pollin ) {

    my $msg =  $receiver->recv();
    $while_counter++;
    print $msg . "\n";

}

};

EV::run();

When I run both of them at the same time, the router correctly get the message from the dealer, and send back 100 messages. The Dealer never catch a single message, after a while if I ctrl+C I got this :

mode : 0 Connecting to ipc ipc:///tmp/dealer-test1, err is : ^C AE count : 0 while count : 0

If I change the AE::io mode with 1 instead of 0, i correctly get every message, but this is an active loop, eating 100% of my cpu. Here the versions I use : OS : debian 11 perl version : perl 5, version 32, subversion 1 (v5.32.1) built for x86_64-linux-gnu-thread-multi AnyEvent : 7.17 libev-perl : 4.33-1+b1 libevent : 2.1-7 FFI::Platypus : 1.34 for zmq::ffi I tried with 1.17 and 1.19, with the same result.

I'm not sure on how to debug this, I saw an example of nonblocking and of dealer/router in the readme and a router/req in the zmq guide, but nothing about with both. does someone reproduce such a behaviour, or have an idea of what's going on ?

— Reply to this email directly, view it on GitHub https://github.com/zeromq/perlzmq/issues/53, or unsubscribe https://github.com/notifications/unsubscribe-auth/AADYWROQSCSNTNG5K4PZVQDYOEJC3AVCNFSM6AAAAABBX6CACGVHI2DSMVQWIX3LMV43ASLTON2WKOZSGA3TQNJXGQYTKNQ . You are receiving this because you are subscribed to this thread.Message ID: @.***>

Evan-Adam commented 9 months ago

Hello Dylan, Thanks for your response. I see the same behaviour with recv_multipart() instead of recv. I believe as I send a message with no empty frame, the router pop the first part to find where it should send it to, and the dealer get only one frame. I tried with more frame, but it's the same result, the dealer AE loop never call my function. Regards,

calid commented 9 months ago

Ah, hoped I had found a quick fix :) It's been a minute since I looked at the code (or zeromq for that matter), so I'll let @zmughal or @ghenry chime in with any thoughts, as they're fresher with it.

Evan-Adam commented 7 months ago

hello does anyone have time to look into this ? I don't know ffi or xs, so I really don't know where to look for this strange behaviour.

garnier-quentin commented 1 month ago
selon ma compréhension la méthode actuelle semble poser quelques problème :
* EV ne semble pas marcher comme nous l'attendons, lors de mes tests la fonction io() de EV ne permettait pas de récupérer les messages zmq : https://github.com/zeromq/perlzmq/issues/53
* ce module clientzmq est synchrone, en partie, je pense, à cause du problème mentionné ci-dessus

En effet, dans la partie clientzmq il y a une zone synchrone lors de la connexion. Mais le patch ne change pas le comportement. Il devrait donc être intégré.

Pour le souci EV, le comportement est particulier mais ça semble conforme avec le fonctionnement de ZMQ. Le patch permet d'avoir le bon comportement et contourner les problèmes EV et ZMQ.