zeromq / netmq

A 100% native C# implementation of ZeroMQ for .NET
Other
2.93k stars 744 forks source link

xpub-xsub not working. #1052

Closed faust21 closed 11 months ago

faust21 commented 1 year ago

Environment

NetMQ Version:    4.0.1.11
Operating System:  win11_x64
.NET Version:     .net6

Expected behaviour

Subscriber will receive the messages.

Actual behaviour

Subscriber haven't received the messages.

Steps to reproduce the behaviour

the xpub-xsub's codes:

using NetMQ;
using NetMQ.Sockets;
using NetMQ.Monitoring;

static void OnPubMonitor(object? sender, NetMQMonitorEventArgs args)
{
    Console.WriteLine($"pub:{args.SocketEvent.ToString()}");
}

static void OnSubMonitor(object? sender, NetMQMonitorEventArgs args)
{
    Console.WriteLine($"sub:{args.SocketEvent.ToString()}");
}

string pubServer = "tcp://localhost:15555";
string subServer = "tcp://localhost:15556";

using var xPuber = new XPublisherSocket(pubServer);
using var xSuber = new XSubscriberSocket(subServer);

var monitorEvents = SocketEvents.Accepted
| SocketEvents.AcceptFailed
| SocketEvents.BindFailed
| SocketEvents.CloseFailed
| SocketEvents.Connected
| SocketEvents.Disconnected;

using var pubMonitor = new NetMQMonitor(xPuber, "inproc://pub", monitorEvents);
using var subMonitor = new NetMQMonitor(xSuber, "inproc://sub", monitorEvents);

pubMonitor.EventReceived += OnPubMonitor;
subMonitor.EventReceived += OnSubMonitor;

subMonitor.StartAsync();
pubMonitor.StartAsync();

var proxy = new Proxy(xSuber, xPuber);

proxy.Start();

publisher's codes:

using NetMQ;
using NetMQ.Sockets;

using var puber = new PublisherSocket(">tcp://localhost:15556");

var random = new Random();

while (true)
{
    Thread.Sleep(1000);
    int i = random.Next(1, 100);
    if (i % 2 == 0)
    {
        puber.SendMoreFrame("TopicA").SendFrame($"hello-{i}");
        Console.WriteLine($"sent: hello-{i}");
    }
    else
    {
        puber.SendMoreFrame("TopicB").SendFrame($"world-{i}");
        Console.WriteLine($"sent: world-{i}");
    }
}

subscriber's code:

using NetMQ;
using NetMQ.Sockets;

using var suber = new SubscriberSocket(">tcp://localhost:15555");

suber.Subscribe("TopicA");

while (true)
{
    string topic = suber.ReceiveFrameString();
    string msg = suber.ReceiveFrameString();

    Console.WriteLine(topic + ":" + msg);
}
dxdjgl commented 1 year ago

In your example your publisher is connecting to the XSubscriberSocket which by default connects to the port(not bind), since you have not specified bind in you connection string it will not work. So changing string subServer = "tcp://localhost:15556"; to string subServer = "@tcp://localhost:15556";

will make you code work, in general I like to be explicit about bind connect, and not rely on the underlying implementation.