zeromq / netmq

A 100% native C# implementation of ZeroMQ for .NET
Other
2.94k stars 742 forks source link

Run Push/Pull Example Error:https://netmq.readthedocs.io/en/latest/push-pull/ #972

Open mainxx opened 3 years ago

mainxx commented 3 years ago

Environment

NetMQ Version:   4.0.1.6
Operating System: windows
.NET Version:  .net5.0   and   .net4.7.2

Expected behaviour

run the example : https://netmq.readthedocs.io/en/latest/push-pull/

Actual behaviour

System.NotSupportedException HResult=0x80131515 Message=Pull socket doesn't support sending Source=NetMQ StackTrace: at NetMQ.Sockets.PullSocket.TrySend(Msg& msg, TimeSpan timeout, Boolean more) at NetMQ.OutgoingSocketExtensions.Send(IOutgoingSocket socket, Msg& msg, Boolean more) at NetMQ.OutgoingSocketExtensions.SendFrame(IOutgoingSocket socket, String message, Boolean more) at Ventilator.Program.Main(String[] args) in G:\VS_Repos\NetMQPushPullDemo\NetMQPushPullDemo\Ventilator\Program.cs:line 23

Steps to reproduce the behaviour

Ventilator

using System;
using NetMQ;
using NetMQ.Sockets; //new using

namespace Ventilator
{
    public class Program
    {
        public static void Main(string[] args)
        {
            // Task Ventilator
            // Binds PUSH socket to tcp://localhost:5557
            // Sends batch of tasks to workers via that socket
            Console.WriteLine("====== VENTILATOR ======");
            using (var sender = new PushSocket("@tcp://*:5557"))
            using (var sink = new PullSocket(">tcp://localhost:5558"))
            {
                Console.WriteLine("Press enter when worker are ready");
                Console.ReadLine();
                //the first message it "0" and signals start of batch
                //see the Sink.csproj Program.cs file for where this is used
                Console.WriteLine("Sending start of batch to Sink");
                sink.SendFrame("");
                Console.WriteLine("Sending tasks to workers");
                //initialise random number generator
                Random rand = new Random(0);
                //expected costs in Ms
                int totalMs = 0;
                //send 100 tasks (workload for tasks, is just some random sleep time that
                //the workers can perform, in real life each work would do more than sleep
                for (int taskNumber = 0; taskNumber < 100; taskNumber++)
                {
                    //Random workload from 1 to 100 msec
                    int workload = rand.Next(0, 100);
                    totalMs += workload;
                    Console.WriteLine("Workload : {0}", workload);
                    sender.SendFrame(workload.ToString());
                }
                Console.WriteLine("Total expected cost : {0} msec", totalMs);
                Console.WriteLine("Press Enter to quit");
                Console.ReadLine();
            }
        }
    }
}

Sink

using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using NetMQ;
using NetMQ.Sockets;  //new using

namespace Sink
{
    public class Program
    {
        public static void Main(string[] args)
        {
            // Task Sink
            // Bindd PULL socket to tcp://localhost:5558
            // Collects results from workers via that socket
            Console.WriteLine("====== SINK ======");

            //socket to receive messages on
            using (var receiver = new PullSocket("@tcp://localhost:5558"))
            {
                //wait for start of batch (see Ventilator.csproj Program.cs)
                var startOfBatchTrigger = receiver.ReceiveFrameString();
                Console.WriteLine("Seen start of batch");

                //Start our clock now
                var watch = Stopwatch.StartNew();

                for (int taskNumber = 0; taskNumber < 100; taskNumber++)
                {
                    var workerDoneTrigger = receiver.ReceiveFrameString();
                    if (taskNumber % 10 == 0)
                    {
                        Console.Write(":");
                    }
                    else
                    {
                        Console.Write(".");
                    }
                }
                watch.Stop();
                //Calculate and report duration of batch
                Console.WriteLine();
                Console.WriteLine("Total elapsed time {0} msec", watch.ElapsedMilliseconds);
                Console.ReadLine();
            }
        }
    }
}

Worker

using System;
using System.Threading;
using NetMQ;
using NetMQ.Sockets;  //new using

namespace Worker
{
    public class Program
    {
        public static void Main(string[] args)
        {
            // Task Worker
            // Connects PULL socket to tcp://localhost:5557
            // collects workload for socket from Ventilator via that socket
            // Connects PUSH socket to tcp://localhost:5558
            // Sends results to Sink via that socket
            Console.WriteLine("====== WORKER ======");
            using (var receiver = new PullSocket(">tcp://localhost:5557"))
            using (var sender = new PushSocket(">tcp://localhost:5558"))
            {
                //process tasks forever
                while (true)
                {
                    //workload from the vetilator is a simple delay
                    //to simulate some work being done, see
                    //Ventilator.csproj Proram.cs for the workload sent
                    //In real life some more meaningful work would be done
                    string workload = receiver.ReceiveFrameString();
                    //simulate some work being done
                    Thread.Sleep(int.Parse(workload));
                    //send results to sink, sink just needs to know worker
                    //is done, message content is not important, just the presence of
                    //a message means worker is done.
                    //See Sink.csproj Proram.cs
                    Console.WriteLine("Sending to Sink");
                    sender.SendFrame(string.Empty);
                }
            }
        }
    }
}
stale[bot] commented 2 years ago

This issue has been automatically marked as stale because it has not had activity for 365 days. It will be closed if no further activity occurs within 56 days. Thank you for your contributions.

spliffli commented 4 months ago

I just ran into the exact same problem so it seems that they haven't fixed their docs. The code examples they give are slightly wrong. In the Ventilator program, it declares the sink as a Pull Socket which it tries to connect to and then send a frame, which causes this exception. The sink itself is a Pull socket which can only receive, and when the worker sends a message to the sink it uses a Push socket, so clearly the sink in Ventilator should also be a Push socket.

Just change part where it says: var sink = new PullSocket(">tcp://localhost:5558") to var sink = new PushSocket(">tcp://localhost:5558") and then it will work.

I might also make a pull request in NetMQ to update the docs so the code example actually works.

The Ventilator code should look like this:

using System;
using NetMQ;
using NetMQ.Sockets;

namespace Ventilator
{
    public class Program
    {
        public static void Main(string[] args)
        {
            // Task Ventilator
            // Binds PUSH socket to tcp://localhost:5557
            // Connects sink PUSH socket to tcp://localhost:5558
            // Sends batch of tasks to workers via that socket
            Console.WriteLine("====== VENTILATOR ======");
            using (var sender = new PushSocket("@tcp://*:5557"))
            using (var sink = new PushSocket(">tcp://localhost:5558"))
            {
                Console.WriteLine("Press enter when worker are ready");
                Console.ReadLine();
                //the first message it "0" and signals start of batch
                //see the Sink.csproj Program.cs file for where this is used
                Console.WriteLine("Sending start of batch to Sink");
                sink.SendFrame("0");
                Console.WriteLine("Sending tasks to workers");
                //initialise random number generator
                Random rand = new Random(0);
                //expected costs in Ms
                int totalMs = 0;
                //send 100 tasks (workload for tasks, is just some random sleep time that
                //the workers can perform, in real life each work would do more than sleep
                for (int taskNumber = 0; taskNumber < 100; taskNumber++)
                {
                    //Random workload from 1 to 100 msec
                    int workload = rand.Next(0, 100);
                    totalMs += workload;
                    Console.WriteLine("Workload : {0}", workload);
                    sender.SendFrame(workload.ToString());
                }
                Console.WriteLine("Total expected cost : {0} msec", totalMs);
                Console.WriteLine("Press Enter to quit");
                Console.ReadLine();
            }
        }
    }
}
spliffli commented 4 months ago

So I just checked the docs in the NetMQ repo, and the code example for Ventilator in push-pull.md is the same as the one that I wrote. So it seems that was a mistake in the docs which they already fixed, but it must have not been updated on the readthedocs website.