zeromq / netmq

A 100% native C# implementation of ZeroMQ for .NET
Other
2.95k stars 744 forks source link

Proactor to Asyncio issue on PC but not Mac #836

Closed bc closed 3 years ago

bc commented 4 years ago

Environment

NetMQ Version:    4.0.0.207
Operating System: PC Windows 10 
.NET Version:     .NET 4.x via Unity 2019.3.0f1, installed via NuGet, and Asyncio version: 0.1.69

Expected behaviour

I want to build a C# DealerSocket and connect it to a Router at a known external address. I want to send messages to the Router frequently, and leave the Dealer up for the entire game.

Actual behaviour

On Mac, 0 error, works perfectly. On PC: I get this error message, and the dealer is prevented from sending messages

SocketException: The operation completed successfully.

AsyncIO.Windows.Socket.SetSocketOption (System.Net.Sockets.SocketOptionLevel optionLevel, System.Net.Sockets.SocketOptionName optionName, System.Int32 optionValue) (at <3c72113274de4b3face0e2579126a901>:0)
AsyncIO.AsyncSocket.set_NoDelay (System.Boolean value) (at <3c72113274de4b3face0e2579126a901>:0)
NetMQ.Core.Transports.Tcp.TcpConnector.OutCompleted (System.Net.Sockets.SocketError socketError, System.Int32 bytesTransferred) (at <21696b85a92a4a0eb8332ff57aebfd69>:0)
NetMQ.Core.Transports.Tcp.TcpConnector.StartConnecting () (at <21696b85a92a4a0eb8332ff57aebfd69>:0)
NetMQ.Core.Transports.Tcp.TcpConnector.ProcessPlug () (at <21696b85a92a4a0eb8332ff57aebfd69>:0)
NetMQ.Core.ZObject.ProcessCommand (NetMQ.Core.Command cmd) (at <21696b85a92a4a0eb8332ff57aebfd69>:0)
NetMQ.Core.IOThread.Ready () (at <21696b85a92a4a0eb8332ff57aebfd69>:0)
NetMQ.Core.IOThreadMailbox.RaiseEvent () (at <21696b85a92a4a0eb8332ff57aebfd69>:0)
NetMQ.Core.Utils.Proactor.Loop () (at <21696b85a92a4a0eb8332ff57aebfd69>:0)
System.Threading.ThreadHelper.ThreadStart_Context (System.Object state) (at <437ba245d8404784b9fbab9b439ac908>:0)
System.Threading.ExecutionContext.RunInternal (System.Threading.ExecutionContext executionContext, System.Threading.ContextCallback callback, System.Object state, System.Boolean preserveSyncCtx) (at <437ba245d8404784b9fbab9b439ac908>:0)
System.Threading.ExecutionContext.Run (System.Threading.ExecutionContext executionContext, System.Threading.ContextCallback callback, System.Object state, System.Boolean preserveSyncCtx) (at <437ba245d8404784b9fbab9b439ac908>:0)
System.Threading.ExecutionContext.Run (System.Threading.ExecutionContext executionContext, System.Threading.ContextCallback callback, System.Object state) (at <437ba245d8404784b9fbab9b439ac908>:0)
System.Threading.ThreadHelper.ThreadStart () (at <437ba245d8404784b9fbab9b439ac908>:0)
UnityEngine.<>c:<RegisterUECatcher>b__0_0(Object, UnhandledExceptionEventArgs)

Steps to reproduce the behaviour

code for the Dealer (no other dependencies in the project)

using System.Collections;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using NetMQ;
using NetMQ.Sockets;
using UnityEngine;

public class Dealer : MonoBehaviour
{
    private bool _dealerCancelled;
    private IEnumerator _dealerWorkerRoutine;
    public Queue<string> OutBox = new Queue<string>();
    public Queue<string> Inbox = new Queue<string>();
    public List<float> pings = new List<float>();
    private DealerSocket _sock;
    public delegate void SocketDel();

    private void Start()
    {
        //,"tcp://192.168.1.67:8000"
        var connections = new List<string>() {$"tcp://40.117.225.101:8000"};
        SetupDealer(connections);
    }

    static void Client_ReceiveReady(object sender, NetMQSocketEventArgs e)
    {
        Msg myMessage = new Msg();
        e.Socket.Receive(ref myMessage);
        Debug.Log(myMessage);
    }
    private void SetupDealer(List<string> connections)
    {

        using (var poller = new NetMQPoller())
        {
            var resTask = Task.Factory.StartNew(() =>
            {
                DealerSocket client = null;
                var clientSocketPerThread = new ThreadLocal<DealerSocket>();
                if (!clientSocketPerThread.IsValueCreated)
                {
                    client = new DealerSocket();
                    client.Options.Identity =
                        Encoding.Unicode.GetBytes(5.ToString());
                    client.Connect(connections[0]);
                    client.ReceiveReady += Client_ReceiveReady;
                    clientSocketPerThread.Value = client;
                    poller.Add(client);
                }
                else
                {
                    client = clientSocketPerThread.Value;
                }

                while (true)
                {
                    var messageToServer = new NetMQMessage();
                    messageToServer.AppendEmptyFrame();
                    messageToServer.Append("messageToServer");
                    Console.WriteLine("======================================");
                    Console.WriteLine(" OUTGOING MESSAGE TO SERVER ");
                    Console.WriteLine("======================================");
                    Debug.Log($"Client Sending {messageToServer}");
                    client.SendMultipartMessage(messageToServer);
                }
            },  TaskCreationOptions.LongRunning);

            poller.RunAsync();
            Debug.Log($"task:{resTask.Status.ToString()}");
            Debug.Log($"poller:{poller.IsRunning}");
        }
    }

    // will handle the incoming message with the special 'HEARTBEAT'.
    // Returns true if the message is not a special and should be enqueued to the list.
    // returns false if the message is a heartbeat throwaway.
    private static bool DealerHandleIngress(string message, ref List<float> pingList)
    {
        if (message.StartsWith("HEARTBEAT"))
        {
            var splits = message.Split('|').Skip(1).Select(Convert.ToDouble).ToArray();
            pingList.Add((float) Time.time - (float)splits[0]);
            pingList.Remove(0);
            return false;
        }
        else
        {
            return true;
        }
    }```
Thats my code that's running in a Unity Monobehavior. The main thread in Start() should start the long-running task that would be handling the Dealer socket and all of its communication. I thought that maybe it was a thread issue, so I reimplemented it as a co-routine, and got the exact same error:

using System; using System.Collections; using System.Collections.Generic; using System.Linq; using System.Net.Sockets; using System.Threading; using NetMQ; using NetMQ.Sockets; using UnityEngine;

public class TestCoroutine : MonoBehaviour { private bool _dealerCancelled; private IEnumerator _dealerWorkerRoutine; public Queue OutBox = new Queue(); public Queue Inbox = new Queue(); public List pings = new List(); private DealerSocket _sock; public delegate void SocketDel();

private void Start()
{
    //,"tcp://192.168.1.67:8000"
    var connections = new List<string>() {$"tcp://40.117.225.101:8000"};
    StartDealer(connections);
}

private void StartDealer(List<string> targetConnections)
{
    _dealerWorkerRoutine = DealerWorker(targetConnections);
    StartCoroutine(_dealerWorkerRoutine);
    pings.Clear();
}

private IEnumerator DealerWorker(List<string> toConnect)
{
    _sock = new DealerSocket();
    _sock.Options.SendHighWatermark = 1000;
    foreach (var x in toConnect)
    {
        _sock.Connect(x);
    }
    _dealerCancelled = false;
    Debug.Log("St");
    Thread.Sleep(2000);
    Debug.Log("looping");
    while (!_dealerCancelled)
    {

        // handle incoming
        while (_sock.HasIn &&
               _sock.TryReceiveFrameString(out var message))
        {
            if (DealerHandleIngress(message, ref pings))
            {
                Inbox.Enqueue(message);
            }
        }

        // handle outgoing
        while (OutBox.Count > 0)
        {
            _sock.SendFrame(OutBox.Dequeue());
        }

        _sock.SendFrame($"HEARTBEAT|{(long)Time.time}");

        yield return new WaitForSeconds(0.1f);
    }

    Debug.Log("post");
}

// will handle the incoming message with the special 'HEARTBEAT'.
// Returns true if the message is not a special and should be enqueued to the list.
// returns false if the message is a heartbeat throwaway.
private static bool DealerHandleIngress(string message, ref List<float> pingList)
{
    if (message.StartsWith("HEARTBEAT"))
    {
        var splits = message.Split('|').Skip(1).Select(Convert.ToDouble).ToArray();
        pingList.Add((float) Time.time - (float)splits[0]);
        pingList.Remove(0);
        return false;
    }
    else
    {
        return true;
    }
}


this also works perfectly on Mac, but not on PC at all.

1. What is the recommended way to run a Dealer in Unity as a long-running process?
2. How can I debug this issue with the SocketException?
3. Am I missing any other references, permissions, etc?

I've learned a ton through the problem-solving process but would love some help. thanks!
bc commented 4 years ago

I added this line above the Coroutine just now: AsyncIO.ForceDotNet.Force(); and it now works.

However, when I try the same thing on my Task-based approach, I get a new error:


Parameter name: socket
AsyncIO.Windows.CompletionPort.AssociateSocket (AsyncIO.AsyncSocket socket, System.Object state) (at <3c72113274de4b3face0e2579126a901>:0)
NetMQ.Core.Utils.Proactor.AddSocket (AsyncIO.AsyncSocket socket, NetMQ.Core.IProactorEvents proactorEvents) (at <21696b85a92a4a0eb8332ff57aebfd69>:0)
NetMQ.Core.IOObject.AddSocket (AsyncIO.AsyncSocket socket) (at <21696b85a92a4a0eb8332ff57aebfd69>:0)
NetMQ.Core.Transports.Tcp.TcpConnector.StartConnecting () (at <21696b85a92a4a0eb8332ff57aebfd69>:0)
NetMQ.Core.Transports.Tcp.TcpConnector.ProcessPlug () (at <21696b85a92a4a0eb8332ff57aebfd69>:0)
NetMQ.Core.ZObject.ProcessCommand (NetMQ.Core.Command cmd) (at <21696b85a92a4a0eb8332ff57aebfd69>:0)
NetMQ.Core.IOThread.Ready () (at <21696b85a92a4a0eb8332ff57aebfd69>:0)
NetMQ.Core.IOThreadMailbox.RaiseEvent () (at <21696b85a92a4a0eb8332ff57aebfd69>:0)
NetMQ.Core.Utils.Proactor.Loop () (at <21696b85a92a4a0eb8332ff57aebfd69>:0)
System.Threading.ThreadHelper.ThreadStart_Context (System.Object state) (at <437ba245d8404784b9fbab9b439ac908>:0)
System.Threading.ExecutionContext.RunInternal (System.Threading.ExecutionContext executionContext, System.Threading.ContextCallback callback, System.Object state, System.Boolean preserveSyncCtx) (at <437ba245d8404784b9fbab9b439ac908>:0)
System.Threading.ExecutionContext.Run (System.Threading.ExecutionContext executionContext, System.Threading.ContextCallback callback, System.Object state, System.Boolean preserveSyncCtx) (at <437ba245d8404784b9fbab9b439ac908>:0)
System.Threading.ExecutionContext.Run (System.Threading.ExecutionContext executionContext, System.Threading.ContextCallback callback, System.Object state) (at <437ba245d8404784b9fbab9b439ac908>:0)
System.Threading.ThreadHelper.ThreadStart () (at <437ba245d8404784b9fbab9b439ac908>:0)
UnityEngine.<>c:<RegisterUECatcher>b__0_0(Object, UnhandledExceptionEventArgs)```
bc commented 4 years ago

apologies, first line of err msg got cut off: it's ArgumentException: socket must be of type Windows.Socket

stale[bot] commented 3 years ago

This issue has been automatically marked as stale because it has not had activity for 365 days. It will be closed if no further activity occurs within 56 days. Thank you for your contributions.

liamjwang commented 1 year ago

I was getting the same ArgumentException: socket must be of type Windows.Socket

I found that calling ForceDotNet.Force(); before using netmq seemed to work around the issue.

woodj80 commented 7 months ago

I was also having a socket issue on Windows, but not Mac. I had moved my socket creation out of a using declaration so I could catch exceptions. I needed to call ForceDotNet.Force() before the socket was created instead of in the listener thread. The old order still works on Mac, but stopped working on Windows for some reason. Thanks for this clue!