zeromq / netmq

A 100% native C# implementation of ZeroMQ for .NET
Other
2.95k stars 744 forks source link

Using NetMQ async with pub/sub #1007

Closed FreDP47 closed 2 years ago

FreDP47 commented 2 years ago

Environment

NetMQ Version: 4.0.1.8    
Operating System: WIN 10 64-bit
.NET Version: 3.1     

Expected behaviour

We except that we can use async method ReceiveFrameStringAsync to receive frames from the publisher and be able to stop that at any moment in time using the cancellation token. We are not able to do this since the method is not receiving anything in the first place.

Actual behaviour

When the code shared below gets executed, the Subscriber never receives the frames published by the Publisher - it will just hang forever.

Steps to reproduce the behaviour

static void Main(string[] args)
        {
            using (var runtime = new NetMQRuntime())
            {
                runtime.Run(SubscriberAsync(), PublisherAsync());
            }
        }
        static async Task PublisherAsync()
        {
            using (var publisher = new PublisherSocket())
            {
                publisher.Bind("tcp://localhost:50080");
                for (int i = 0; i < 1000; i++)
                {
                    await Task.Delay(100);
                    publisher.SendFrame($"Greeting {i}");
                    await Task.Delay(100);
                }
            }
        }
        static async Task SubscriberAsync()
        {
            var ct = new CancellationToken();
            using (var subscriber = new SubscriberSocket())
            {
                subscriber.Connect("tcp://localhost:50080");
                while (true)
                {
                    var (message, _) = await subscriber.ReceiveFrameStringAsync(ct);
                    Console.WriteLine(message);
                    await Task.Delay(100);
                }
            }
        }
fluzionhanaj commented 2 years ago

You are not receiving published messages because subscriber has not subscribed to any topic. By default when creating a subscriber its subscriptions are empty.

You can use Subscriber.SubscribeToAnyTopic() to subscribe to all topics or subscriber.Subscribe("topic-name") to specific topics. You can subscribe at any time but you need to remember that you receive messages only after subscription. So in order to receive all messages above you need to subscribe before connect part on subscriber. Keep in mind that subscriptions should be updated on the same thread that created Subscriber or runtime in this case since it is not thread safe.

Another key point is that if publisher is started before subscriber starts listening some messages might be lost. It is the job of application to synchronize data.

FreDP47 commented 2 years ago

Thank you for your response! This is the solution that works, although we are missing first few frames in the beginning. Is there any recommendations on how to avoid that loss?

static Task Main(string[] args)
        {
            var cts = new CancellationTokenSource();
            cts.CancelAfter(TimeSpan.FromSeconds(10));
            using (var runtime = new NetMQRuntime())
            {
                runtime.Run(SubscriberAsync(cts.Token), PublisherAsync(cts.Token));
            }
            return Task.CompletedTask;
        }
        static async Task PublisherAsync(CancellationToken cancellationToken)
        {
            using (var publisher = new PublisherSocket())
            {
                publisher.Bind("tcp://localhost:50080");
                for (int i = 0; i < 1000; i++)
                {
                    if (cancellationToken.IsCancellationRequested)
                    {
                        throw new OperationCanceledException();
                    }
                    await Task.Delay(100);
                    publisher.SendFrame($"Greeting {i}");
                }
            }
        }
        static async Task SubscriberAsync(CancellationToken cancellationToken)
        {
            using (var subscriber = new SubscriberSocket())
            {
                subscriber.Connect("tcp://localhost:50080");
                subscriber.SubscribeToAnyTopic();
                while (!cancellationToken.IsCancellationRequested)
                {
                    var (message, _) = await subscriber.ReceiveFrameStringAsync(cancellationToken);
                    Console.WriteLine(message);
                    await Task.Delay(100);
                }
            }
        }
fluzionhanaj commented 2 years ago

That loss is because of the time subscriber is connected to publisher. You can manage it in a way that subscriber connects before the publisher or you have to manage it in application logic.

For complex scenarios when you send thousands of updates you include a version on the message and correlate that on the subscriber side. The subscriber side also requests a snapshot from the publisher via another connection (mainly router-dealer typology). In this way it is possible to achieve synchronization of last state. This is a known problem and you can find more on zeromq guide on advanced pub/sub patterns.

I have used these patterns successfully to build a pub-sub library used in production at our company with impressive results and performance.

FreDP47 commented 2 years ago

The advanced pub/sub pattern doesn't completely solve our problems but thanks for suggesting it. Will close this issue now.