zeromq / netmq

A 100% native C# implementation of ZeroMQ for .NET
Other
2.97k stars 743 forks source link

Pub/Sub multiple frame message, frames are out of order #1063

Closed zhaoyul closed 1 year ago

zhaoyul commented 1 year ago

Environment

NetMQ Version:   tested on "4.0.1.6" and 4.0.1.12
Operating System: win10, win11, osx
.NET Version:     . net 6.0

Expected behaviour

topic2|content2
topic2|content2
topic2|content2
topic2|content2
topic2|content2
topic1|content1
topic1|content1
topic2|content2
topic2|content2
topic2|content2
topic1|content1
topic2|content2
topic1|content1
topic2|content2
topic2|content2
topic2|content2
topic2|content2
topic2|content2
topic1|content1
....

Actual behaviour

topic2|content2
topic2|content2
topic2|content2
topic2|content2
topic2|content2
topic1|content1
topic1|content1
topic2|content2
topic2|content2
topic2|content2
topic1|content1
topic2|content2
topic1|content1
topic2|content2
topic2|content2
topic2|content2
topic2|content2
topic2|content2
topic1|content1
topic2|topic1|content2              <==  Error
content1                                     <== Error
topic1|content1

Steps to reproduce the behaviour

using NUnit.Framework;
using NetMQ;
using NetMQ.Sockets;
using System;
using System.Threading.Tasks;
using System.Threading;

namespace projectName;

public class Tests
{
    [SetUp]
    public void Setup()
    {
    }

    [Test]
    public void TestInprocPubSub()
    {
        PublisherSocket req = new PublisherSocket();
        SubscriberSocket rsp = new SubscriberSocket();
        req.Bind("inproc://good");
        rsp.Connect("inproc://good");
        rsp.Subscribe("");
        var rand = new Random();
        Task.Run(() =>
        {
            while(true)
            {
                Thread.Sleep(rand.Next(5));
                req.SendMoreFrame("topic1").SendFrame("content1");
            }
        } );

        Task.Run(() =>
        {
            while(true)
            {
                Thread.Sleep(rand.Next(3));
                req.SendMoreFrame("topic2").SendFrame("content2");
            }
        } );

        Task.Run(() => {
            while(true)
                Console.WriteLine(string.Join("|", rsp.ReceiveMultipartStrings()));
        });

        Thread.Sleep(10000);
    }

}
dxdjgl commented 1 year ago

Sockets are not threadsafe, see https://stackoverflow.com/questions/35245387/netmq-sockets-are-thread-safe