zeromq / netmq

A 100% native C# implementation of ZeroMQ for .NET
Other
2.95k stars 744 forks source link

Socket.TryReceiveMultipartMessage(timeout, ref receivedMessage) Timeout not reliable #980

Open Belisama-Jonathan opened 3 years ago

Belisama-Jonathan commented 3 years ago

Hello, Above all thank you for this amazing Library.

Environment

NetMQ Version:    4.0.16
Operating System: Windows 10 Pro 19042.985
.NET Version: 4.8

Expected behaviour

A client has a DealerSocket and is connected to the server's RouterSocket. 10 clients are started at the same time (1 thread = 1 client = 1 Dealer Socket). Each client sends a message to the server and TryReceive an answer with Socket.TryReceiveMultipartMessage(timeout, ref answer) The server won't answer within the timeout delay. So, TryReceiveMultipartMessage on client side should return false after the timeout delay.

Actual behaviour

Some Clients' TryReceiveMultipartMessage return false immediately (although I wasn't able to reproduce this case with the code given below) Some Clients' TryReceiveMultipartMessage return false after timeout*INT with INT>1

Steps to reproduce the behaviour (sometimes, everything will work perfectly sometimes it won't)

Program

using System;
using System.Threading;
using System.Collections.Generic;

internal class Program
{
    internal static string address = "tcp://127.0.0.1:1985";
    static void Main()
    {
        var nt = new Thread(() =>
        {
            var server = new Server();
            server.Run(7000);
        });
        nt.Start();
        Console.WriteLine(\"Server launched in thread {0}", nt.ManagedThreadId);

        var threads = new List<Thread>();
        for (int i = 0; i < 10; i++)
        {
            var client = new Client("C" + i);
            new Thread(() =>
            {
                client.Run(1500);
            }).Start();
            Thread.Sleep(10);
        }
    }
}

Server

using NetMQ;
using NetMQ.Sockets;
using System.Threading;

internal class Server
{
    RouterSocket Socket { get; set; } = new RouterSocket();

    /// <summary>
    /// Slow answering server
    /// </summary>
    /// <param name="delayBeforeAnswer">Milliseconds</param>
    public void Run(int delayBeforeAnswer)
    {
        Socket.Bind(Program.address);
        var request = new NetMQMessage();

        using (var poller = new NetMQPoller() { Socket })
        {
            Socket.ReceiveReady += (s, a) =>
            {
                if (a.Socket.TryReceiveMultipartMessage(ref request, 3))
                {
                    var answer = new NetMQMessage(2);
                    answer.Append(request[0]);
                    answer.Append("Whatever");
                    Thread.Sleep(delayBeforeAnswer);
                    a.Socket.TrySendMultipartMessage(answer);
                }
            };

            poller.Run();
        }
    }
}

Client

using NetMQ;
using NetMQ.Sockets;
using System.Threading;
using System;

class Client
{
    internal string Name { get; set; }
    DealerSocket Socket { get; set; } = new DealerSocket();

    internal Client(string name)
    {
        Name = name;
        Socket.Options.Identity = System.Text.Encoding.UTF8.GetBytes(Name);
        Socket.Connect(Program.address);
    }

    /// <summary>
    /// Run Client
    /// </summary>
    /// <param name="timeout">MilliSeconds</param>
    public void Run(double timeout)
    {

        var message = new NetMQMessage();

        if (!Socket.TrySendFrame("Hello"))
        {
            Console.WriteLine("{0} => message NOT sent", Name);
            return;
        }

        var start = DateTime.UtcNow;
        if (!Socket.TryReceiveMultipartMessage(TimeSpan.FromMilliseconds(timeout), ref message, 1))
        {
            Console.WriteLine("{0} => Timeout after {1}ms [timeout={2}] [Thread={3}]", Name, DateTime.UtcNow.Subtract(start).TotalMilliseconds.ToString("0"), timeout.ToString("0"), Thread.CurrentThread.ManagedThreadId);
        }
        else
        {
            Console.WriteLine("{0} <= {1} received after {2}ms [timeout={3}] [Thread={4}]", Name, message[0].ConvertToString(), DateTime.UtcNow.Subtract(start).TotalMilliseconds.ToString("0"), timeout.ToString("0"), Thread.CurrentThread.ManagedThreadId);
        }
        Socket.Close();
    }
}

Output (Sometimes there is an issue sometimes there is not)

Server launched in thread 3 C1 => Timeout after 1517ms [timeout=1500] [Thread=7] C2 => Timeout after 1504ms [timeout=1500] [Thread=8] C3 => Timeout after 1503ms [timeout=1500] [Thread=9] C4 => Timeout after 1505ms [timeout=1500] [Thread=10] C5 => Timeout after 1506ms [timeout=1500] [Thread=11] C6 => Timeout after 1507ms [timeout=1500] [Thread=12] C7 => Timeout after 1506ms [timeout=1500] [Thread=13] C8 => Timeout after 1509ms [timeout=1500] [Thread=14] C9 => Timeout after 1509ms [timeout=1500] [Thread=15] C0 => Timeout after 2916ms [timeout=1500] [Thread=6]

And here is the kind of output I get with my real application:

21.05.29_20h55:42.693] Pete <= LoginServer| Receive timeout (Awaited time for receiving = 4/4965ms) [thread=57] 21.05.29_20h55:42.693] Robert <= LoginServer| Receive timeout (Awaited time for receiving = 4/4967ms) [thread=60] 21.05.29_20h55:47.736] Jaja <= LoginServer| Receive timeout (Awaited time for receiving = 5007/5000ms) [thread=56] 21.05.29_20h55:47.736] Camille <= LoginServer| Receive timeout (Awaited time for receiving = 5006/4999ms) [thread=59] 21.05.29_20h55:47.736] Swen <= LoginServer| Receive timeout (Awaited time for receiving = 5006/5000ms) [thread=61]