zeromq / netmq

A 100% native C# implementation of ZeroMQ for .NET
Other
2.94k stars 742 forks source link

Possible problem with Poller and ordering of socket creation #1041

Open mpgerlek-apira opened 1 year ago

mpgerlek-apira commented 1 year ago

Environment

NetMQ Version:    4.0.1.10
Operating System:  Win11H2
.NET Version:     6.0

Expected behaviour

The test case below has a pub socket, a sub socket, a poller, and a queue to feed the pub socket. It creates the two sockets, adds them to the poller, and then calls RunAsync. A single message (17) is queued, retrieved from the queue by the pub's SendReady method, then received by the sub's ReceiveReady. Works great.

However, my use case is such that the sockets will come and go over the lifetime of the poller, so what I actually want is to call RunAsync and then add the sockets (as noted in code). But when I do that way, it seems that ReceiveReady is never triggered.

So either there's a bug in the poller, or (more likely) I'm doing something wrong... Does the call to Add need to happen inside the poller's thread maybe? I tried that -- using the poller's Run(Action) method -- but it didn't seem to work.

Actual behaviour

The code below, as written, passes cleanly.

Steps to reproduce the behaviour

Comment out the line poller.RunAsync(); and uncomment the line /* poller.RunAsync(); */. The test will now hang.

///////////////////////////////////////////////////////////////////////

[Fact]
public async void TestBug()
{
    var sentOk = false;
    var recvOk = false;

    var port = "tcp://localhost:55550";

    var queue = new NetMQQueue<byte>();

    using (var poller = new NetMQPoller())
    {
        // *** Having the poller start here, before the socket creation, fails: the
        // *** ReceiveReady event is never triggered.
        /* poller.RunAsync(); */

        var pub = new PublisherSocket(port);
        poller.Add(pub);

        var sub = new SubscriberSocket(port);
        sub.Subscribe("");
        poller.Add(sub);

        // Having the poller start here, after the socket creation, works fine.
        poller.RunAsync();

        sub.ReceiveReady += (s, e) =>
        {
            var data = e.Socket.ReceiveFrameBytes();
            Assert.Equal(17, data[0]);
            recvOk = true;
        };

        pub.SendReady += (s, e) =>
        {
            bool ok = queue.TryDequeue(out byte i, TimeSpan.FromMilliseconds(1));
            if (ok)
            {
                Assert.Equal(17, i);
                pub.SendFrame(new byte[] { i });
                sentOk = true;
            }
        };

        queue.Enqueue(17);

        // wait for the messages to go
        await Task.Delay(1000);

        Assert.True(sentOk);
        Assert.True(recvOk);

        poller.RemoveAndDispose(pub);
        poller.RemoveAndDispose(sub);

        poller.Stop();
    }
}