zeromq / netmq

A 100% native C# implementation of ZeroMQ for .NET
Other
2.97k stars 746 forks source link

Ventilation inconsistent depending bind/connect on push(1)/pull(2) #381

Closed eric-b closed 9 years ago

eric-b commented 9 years ago

Hello,

I'm trying to understand a difference I notice depending which socket is bound between one push and two pulls.

If push socket is bound, the example below seems fine to me: messages are ventilated between the two pull sockets.

If I bind one of the two pull sockets and I connect the other two sockets, the ventilation does not work anymore and only the pull socket bound receive all the messages.

Did I miss something or is it a bug ?

Code example


void Main()
{
    var context = NetMQContext.Create();
    var push = context.CreatePushSocket();
    var pull1 = context.CreatePullSocket();
    var pull2 = context.CreatePullSocket();
    Poller poller = new Poller();
    try 
    {
        const string address = "ipc://test";
        push.Options.Linger = TimeSpan.FromSeconds(1);

        // OK : 
        push.Bind(address);
        pull1.Connect(address);
        pull2.Connect(address);

        /*
        // NOT OK : 
        pull2.Bind(address);
        pull1.Connect(address);
        push.Connect(address);
        */

        int rec1 = 0, rec2 = 0;
        pull1.ReceiveReady += (o,e)=>
        {
            Console.WriteLine("pull1.ReceiveReady #{0}", ++rec1);
            byte[] buffer;
            while (e.Socket.TryReceiveFrameBytes(out buffer)){}
        };

        pull2.ReceiveReady += (o,e)=>
        {
            Console.WriteLine("pull2.ReceiveReady #{0}", ++rec2);
            byte[] buffer;
            while (e.Socket.TryReceiveFrameBytes(out buffer)){}
        };

        poller.AddSocket(pull1);
        poller.AddSocket(pull2);
        var sendBuffer = new byte[8];
        var rnd = new Random();
        poller.PollTillCancelledNonBlocking();
        for (int i = 0; i < 1000; i++)
        {
            rnd.NextBytes(sendBuffer);
            push.SendFrame(sendBuffer); 
            if (rnd.Next(50) == 3)
                Thread.Sleep(50);
        }

        poller.CancelAndJoin();
        Console.WriteLine("REC1: {0} REC2: {1}", rec1, rec2);
    }
    finally
    {
        push.Dispose();
        poller.Dispose();
        pull1.Dispose();
        pull2.Dispose();
        context.Dispose();
    }
}

If I bind the push socket, the output is like : REC1: 25 REC2: 26

If I bind the second pull socket, the output is like : REC1: 0 REC2: 19

Same thing with inproc transport.

Anyway, thanks a lot to all contributors for sharing their work with this awesome lib.

somdoron commented 9 years ago

Your second example won't work, Connect and Bind on inproc behave the same way as TCP.

What you actually doing is connecting a pull socket to a pull socket which is not a valid connection. You also connecting the push socket to the pull socket and this will work.

Try this instead:

pull2.Bind("ipc://test2"); pull1.Bind("ipc://test"); push.Connect("ipc://test"); push.Connect("ipc://test2");

Or this: pull2.Bind("ipc://test2"); push.Bind("ipc://test"); pull1.Connect("ipc://test"); push.Connect("ipc://test2");

On Fri, Sep 4, 2015 at 6:29 PM, Eric Boumendil notifications@github.com wrote:

Hello,

I'm trying to understand a difference I notice depending which socket is bound between one push and two pulls.

If push socket is bound, the example below seems fine to me: messages are ventilated between the two pull sockets.

If I bind one of the two pull sockets and I connect the other two sockets, the ventilation does not work anymore and only the pull socket bound receive all the messages.

Did I miss something or is it a bug ? Code example

void Main() { var context = NetMQContext.Create(); var push = context.CreatePushSocket(); var pull1 = context.CreatePullSocket(); var pull2 = context.CreatePullSocket(); Poller poller = new Poller(); try { const string address = "ipc://test"; push.Options.Linger = TimeSpan.FromSeconds(1);

    // OK :
    push.Bind(address);
    pull1.Connect(address);
    pull2.Connect(address);

    /*
    // NOT OK :
    pull2.Bind(address);
    pull1.Connect(address);
    push.Connect(address);
    */

    int rec1 = 0, rec2 = 0;
    pull1.ReceiveReady += (o,e)=>
    {
        Console.WriteLine("pull1.ReceiveReady #{0}", ++rec1);
        byte[] buffer;
        while (e.Socket.TryReceiveFrameBytes(out buffer)){}
    };

    pull2.ReceiveReady += (o,e)=>
    {
        Console.WriteLine("pull2.ReceiveReady #{0}", ++rec2);
        byte[] buffer;
        while (e.Socket.TryReceiveFrameBytes(out buffer)){}
    };

    poller.AddSocket(pull1);
    poller.AddSocket(pull2);
    var sendBuffer = new byte[8];
    var rnd = new Random();
    poller.PollTillCancelledNonBlocking();
    for (int i = 0; i < 1000; i++)
    {
        rnd.NextBytes(sendBuffer);
        push.SendFrame(sendBuffer);
        if (rnd.Next(50) == 3)
            Thread.Sleep(50);
    }

    poller.CancelAndJoin();
    Console.WriteLine("REC1: {0} REC2: {1}", rec1, rec2);
}
finally
{
    push.Dispose();
    poller.Dispose();
    pull1.Dispose();
    pull2.Dispose();
    context.Dispose();
}

}

If I bind the push socket, the output is like : REC1: 25 REC2: 26

If I bind the second pull socket, the output is like : REC1: 0 REC2: 19

Same thing with inproc transport.

Anyway, thanks a lot to all contributors for sharing their work with this awesome lib.

— Reply to this email directly or view it on GitHub https://github.com/zeromq/netmq/issues/381.

eric-b commented 9 years ago

Ok, I understand. That behavior seems logical and conventional and I misinterpreted the freedom of choice between bind and connect.

Thanks for your response!