zeromq / netmq

A 100% native C# implementation of ZeroMQ for .NET
Other
2.93k stars 744 forks source link

Two xpublishers and single subscriber,Why do I only get one? #1053

Closed goodspring8 closed 1 year ago

goodspring8 commented 1 year ago

Environment

<Project Sdk="Microsoft.NET.Sdk">
  <PropertyGroup>
    <OutputType>Exe</OutputType>
    <TargetFramework>net7.0</TargetFramework>
    <ImplicitUsings>enable</ImplicitUsings>
    <Nullable>enable</Nullable>
  </PropertyGroup>
    <ItemGroup>
        <PackageReference Include="NetMQ" Version="4.0.1.11" />
    </ItemGroup>

</Project>

XPublisherA

using NetMQ;
using NetMQ.Sockets;

namespace XPublisherA
{
    internal class Program
    {
        static void Main(string[] args)
        {
            using (var pubSocket = new PublisherSocket(">tcp://127.0.0.1:5666"))
            {
                Console.WriteLine("Publisher A socket connecting...");
                pubSocket.Options.SendHighWatermark = 1000;
                var rand = new Random(50);
                while (true)
                {
                    var randomizedTopic = rand.NextDouble();
                    if (randomizedTopic > 0.5)
                    {
                        var msg = "TopicA msg-" + randomizedTopic;
                        Console.WriteLine("Sending message : {0}", msg);
                        pubSocket.SendMoreFrame("TopicA").SendFrame(msg);
                    }
                    else
                    {
                        var msg = "TopicB msg-" + randomizedTopic;
                        Console.WriteLine("Sending message : {0}", msg);
                        pubSocket.SendMoreFrame("TopicB").SendFrame(msg);
                    }
                    Thread.Sleep(500);
                }
            }
        }
    }
}

XPublisherB

using NetMQ;
using NetMQ.Sockets;

namespace XPublisherB
{
    internal class Program
    {
        static void Main(string[] args)
        {
            using (var pubSocket = new PublisherSocket(">tcp://127.0.0.1:5777"))
            {
                Console.WriteLine("Publisher B socket connecting...");
                pubSocket.Options.SendHighWatermark = 1000;
                var rand = new Random(50);
                while (true)
                {
                    var randomizedTopic = rand.NextDouble();
                    if (randomizedTopic > 0.5)
                    {
                        var msg = "TopicAAAA msg-" + randomizedTopic;
                        Console.WriteLine("Sending message : {0}", msg);
                        pubSocket.SendMoreFrame("TopicAAAA").SendFrame(msg);
                    }
                    else
                    {
                        var msg = "TopicBBBB msg-" + randomizedTopic;
                        Console.WriteLine("Sending message : {0}", msg);
                        pubSocket.SendMoreFrame("TopicBBBB").SendFrame(msg);
                    }
                    Thread.Sleep(600);
                }
            }
        }
    }
}

Intermediary

using NetMQ;
using NetMQ.Sockets;

namespace Intermediary
{
    internal class Program
    {
        static void Main(string[] args)
        {
            using (var xPub = new XPublisherSocket("@tcp://127.0.0.1:1234"))
            using (var xSub = new XSubscriberSocket("@tcp://127.0.0.1:5666"))
            using (var xSub7 = new XSubscriberSocket("@tcp://127.0.0.1:5777"))
            {
                Console.WriteLine("Intermediary started, and waiting for messages");
                // proxy messages between frontend / backend
                var proxy = new Proxy(xSub, xPub); 
                Task.Factory.StartNew(proxy.Start);
                //proxy.Start();
                var proxy7 = new Proxy(xSub7, xPub);
                Task.Factory.StartNew(proxy7.Start);
                //proxy.Start();

                Console.ReadKey();
            }
        }
    }
}

XSubscriber

using NetMQ.Sockets;
using NetMQ;

namespace XSubscriberA
{
    internal class Program
    {
        static void Main(string[] args)
        {
            using (var subSocket = new SubscriberSocket(">tcp://127.0.0.1:1234"))
            {
                subSocket.Options.ReceiveHighWatermark = 1000;
                subSocket.Subscribe("");
                Console.WriteLine("Subscriber socket connecting...");
                while (true)
                {
                    string messageTopicReceived = subSocket.ReceiveFrameString();
                    string messageReceived = subSocket.ReceiveFrameString();
                    Console.WriteLine($"{messageTopicReceived} ==> {messageReceived}");
                }
            }
        }
    }
}

Debug

Received only one publisher,XPublisherB or XPublisherA,Why? How do I optimize my intermediaries? Thanks!

goodspring8 commented 1 year ago

Environment

<Project Sdk="Microsoft.NET.Sdk">
  <PropertyGroup>
    <OutputType>Exe</OutputType>
    <TargetFramework>net7.0</TargetFramework>
    <ImplicitUsings>enable</ImplicitUsings>
    <Nullable>enable</Nullable>
  </PropertyGroup>
  <ItemGroup>
      <PackageReference Include="NetMQ" Version="4.0.1.11" />
  </ItemGroup>

</Project>

XPublisherA

using NetMQ;
using NetMQ.Sockets;

namespace XPublisherA
{
    internal class Program
    {
        static void Main(string[] args)
        {
            using (var pubSocket = new PublisherSocket(">tcp://127.0.0.1:5666"))
            {
                Console.WriteLine("Publisher A socket connecting...");
                pubSocket.Options.SendHighWatermark = 1000;
                var rand = new Random(50);
                while (true)
                {
                    var randomizedTopic = rand.NextDouble();
                    if (randomizedTopic > 0.5)
                    {
                        var msg = "TopicA msg-" + randomizedTopic;
                        Console.WriteLine("Sending message : {0}", msg);
                        pubSocket.SendMoreFrame("TopicA").SendFrame(msg);
                    }
                    else
                    {
                        var msg = "TopicB msg-" + randomizedTopic;
                        Console.WriteLine("Sending message : {0}", msg);
                        pubSocket.SendMoreFrame("TopicB").SendFrame(msg);
                    }
                    Thread.Sleep(500);
                }
            }
        }
    }
}

XPublisherB

using NetMQ;
using NetMQ.Sockets;

namespace XPublisherB
{
    internal class Program
    {
        static void Main(string[] args)
        {
            using (var pubSocket = new PublisherSocket(">tcp://127.0.0.1:5777"))
            {
                Console.WriteLine("Publisher B socket connecting...");
                pubSocket.Options.SendHighWatermark = 1000;
                var rand = new Random(50);
                while (true)
                {
                    var randomizedTopic = rand.NextDouble();
                    if (randomizedTopic > 0.5)
                    {
                        var msg = "TopicAAAA msg-" + randomizedTopic;
                        Console.WriteLine("Sending message : {0}", msg);
                        pubSocket.SendMoreFrame("TopicAAAA").SendFrame(msg);
                    }
                    else
                    {
                        var msg = "TopicBBBB msg-" + randomizedTopic;
                        Console.WriteLine("Sending message : {0}", msg);
                        pubSocket.SendMoreFrame("TopicBBBB").SendFrame(msg);
                    }
                    Thread.Sleep(600);
                }
            }
        }
    }
}

Intermediary

using NetMQ;
using NetMQ.Sockets;

namespace Intermediary
{
    internal class Program
    {
        static void Main(string[] args)
        {
            using (var xPub = new XPublisherSocket("@tcp://127.0.0.1:1234"))
            using (var xSub = new XSubscriberSocket("@tcp://127.0.0.1:5666"))
            using (var xSub7 = new XSubscriberSocket("@tcp://127.0.0.1:5777"))
            {
                Console.WriteLine("Intermediary started, and waiting for messages");
                // proxy messages between frontend / backend
                var proxy = new Proxy(xSub, xPub); 
                Task.Factory.StartNew(proxy.Start);
                //proxy.Start();
                var proxy7 = new Proxy(xSub7, xPub);
                Task.Factory.StartNew(proxy7.Start);
                //proxy.Start();

                Console.ReadKey();
            }
        }
    }
}

XSubscriber

using NetMQ.Sockets;
using NetMQ;

namespace XSubscriberA
{
    internal class Program
    {
        static void Main(string[] args)
        {
            using (var subSocket = new SubscriberSocket(">tcp://127.0.0.1:1234"))
            {
                subSocket.Options.ReceiveHighWatermark = 1000;
                subSocket.Subscribe("");
                Console.WriteLine("Subscriber socket connecting...");
                while (true)
                {
                    string messageTopicReceived = subSocket.ReceiveFrameString();
                    string messageReceived = subSocket.ReceiveFrameString();
                    Console.WriteLine($"{messageTopicReceived} ==> {messageReceived}");
                }
            }
        }
    }
}

Debug

Received only one publisher,XPublisherB or XPublisherA,Why? How do I optimize my intermediaries? Thanks!