zeromq / netmq

A 100% native C# implementation of ZeroMQ for .NET
Other
2.97k stars 743 forks source link

invalid socket in NetMQMonitor Disconnected event #126

Closed tmuthmann closed 4 years ago

tmuthmann commented 10 years ago

The following example uses a ROUTER server and two DEALER clients connecting the server on localhost. I want to log all connects and disconnects from the server and all clients and used NetMQMonitor for this.

The Problem now is that the first call of repmon.Disconnected (after closing client1) has an useable socket, but on the second call of repmon.Disconnected (after closing client2) the socket "a.Socket" is already destroyed (CleanedUp is true) and so RemoteEndPoint is invalid.

I tested NetMQ 3.3.0.9 via NuGet using Visual Studio 2013, NET 4.5.

To test set an breakpoint on Line 76 (Console.WriteLine("test");), and run.

Now you get something like this:

client1 con 127.0.0.1:3470 to 127.0.0.1:5002
client2 con 127.0.0.1:3471 to 127.0.0.1:5002
server listen on 127.0.0.1:5002
server accept from 127.0.0.1:3470
server accept from 127.0.0.1:3471
wait key to stop client1

now press a key in the console to stop client1 and you stop at the breakpoint. you now can inspect "a.Socket" and it is valid. continue the program and you get this:

wait key to stop client2
test
server discon from 127.0.0.1:3470

now press a key in the console to stop client2 and you stop at the breakpoint. you now can inspect "a.Socket" and it is INVALID. RemoteEndPoint throws an exceptions, so only the first WriteLine with "test" goes out. continue the program and you get this:

wait key to end
test

I'm missing the Line:

server discon from 127.0.0.1:3471

used example:

using System;
using System.Threading;
using System.Threading.Tasks;
using NetMQ;
using NetMQ.Monitoring;
using NetMQ.Sockets;
using NetMQ.zmq;

namespace test2
{
   internal class Program
   {
      private class Client
      {
         public void Start(NetMQContext contex, string name, CancellationToken ct)
         {
            ct_link = CancellationTokenSource.CreateLinkedTokenSource(ct, cts_int.Token).Token;

            using(var req = contex.CreateDealerSocket())
            using(var reqmon = new NetMQMonitor(contex, req, "inproc://" + name,
                                                SocketEvent.Connected | SocketEvent.Disconnected))
            {
               reqmon.Connected +=
                  (s, a) =>
                  {
                     Console.WriteLine(name + " con " + a.Socket.LocalEndPoint.ToString() + " to " +
                                       a.Socket.RemoteEndPoint.ToString());
                  };
               reqmon.Disconnected +=
                  (s, a) =>
                  {
                     Console.WriteLine(name + " discon " + a.Socket.LocalEndPoint.ToString() + " to " +
                                       a.Socket.RemoteEndPoint.ToString());
                  };

               reqmon.Timeout = TimeSpan.FromMilliseconds(100);
               Task.Factory.StartNew(reqmon.Start);

               req.Connect(addr);

               while(!ct_link.IsCancellationRequested)
               {
                  req.Poll(TimeSpan.FromMilliseconds(100));
               }
            }
         }

         public void Stop()
         {
            cts_int.Cancel();
         }

         private readonly CancellationTokenSource cts_int = new CancellationTokenSource();
         private CancellationToken ct_link;
      }

      private class Server
      {
         public void Start(NetMQContext contex, string name, CancellationToken ct)
         {
            ct_link = CancellationTokenSource.CreateLinkedTokenSource(ct, cts_int.Token).Token;

            using(var rep = contex.CreateRouterSocket())
            using(var repmon = new NetMQMonitor(contex, rep, "inproc://" + name,
                                                SocketEvent.Accepted | SocketEvent.Disconnected |
                                                SocketEvent.Listening))
            {
               repmon.Accepted +=
                  (s, a) =>
                  {
                     Console.WriteLine(name + " accept from " + a.Socket.RemoteEndPoint.ToString());
                  };
               repmon.Disconnected +=
                  (s, a) =>
                  {
                     Console.WriteLine("test");
                     Console.WriteLine(name + " discon from " + a.Socket.RemoteEndPoint.ToString());
                  };
               repmon.Listening +=
                  (s, a) =>
                  {
                     Console.WriteLine(name + " listen on " + a.Socket.LocalEndPoint.ToString());
                  };

               repmon.Timeout = TimeSpan.FromMilliseconds(100);
               Task.Factory.StartNew(repmon.Start);

               rep.Bind(addr);

               while(!ct_link.IsCancellationRequested)
               {
                  rep.Poll(TimeSpan.FromMilliseconds(100));
               }
            }
         }

         public void Stop()
         {
            cts_int.Cancel();
         }

         private readonly CancellationTokenSource cts_int = new CancellationTokenSource();
         private CancellationToken ct_link;
      }

      private static void Main()
      {
         var server = new Server();
         var client1 = new Client();
         var client2 = new Client();

         using(var contex = NetMQContext.Create())
         {
            Task.Factory.StartNew(() => server.Start(contex, "server", cts.Token), cts.Token);
            Task.Factory.StartNew(() => client1.Start(contex, "client1", cts.Token), cts.Token);
            Task.Factory.StartNew(() => client2.Start(contex, "client2", cts.Token), cts.Token);

            Thread.Sleep(3000);

            Console.WriteLine("wait key to stop client1");
            Console.ReadKey();
            client1.Stop();

            Console.WriteLine("wait key to stop client2");
            Console.ReadKey();
            client2.Stop();

            Console.WriteLine("wait key to end");
            Console.ReadKey();

            cts.Cancel();
         }
      }

      private static readonly CancellationTokenSource cts = new CancellationTokenSource();
      private static string addr = "tcp://127.0.0.1:5002";
   }
}
somdoron commented 10 years ago

I cannot think of a good solution for this, because after you disconnect the socket get disposed by the framework.

Maybe instead of passing the Socket we can only pass some information but that will be a big change...

The only think I can think of is you managing a dictionary of Socket to remote endpoint (add the socket on Accepted and remove on Disconnected).

tmuthmann commented 10 years ago

I don't see the reason, why closing the first socket disposes the second socket. I use two independent client classes, each with it's own Dealer socket and own monitor. So i expected no interference of any type between the two clients. Is there another solution to track connects and disconnects?

somdoron commented 10 years ago

because once you dispose the client socket netmq recognize that and dispose the peer socket on the server side because the channel is close.

as I suggested you can manage a dictionary, on Accepted add the socket to a dictionary and the remoteEndpoint as value, on disconnected use the socket as key (don't use any of the properties) and get the remote endpoint from the dictionary.

That way you are not accessing the disposed object and can access the remote endpoint you saved on the dictionary.

ndextraze-pbp commented 9 years ago

There's a patch merged for this. Is this issue fixed?

stale[bot] commented 4 years ago

This issue has been automatically marked as stale because it has not had activity for 365 days. It will be closed if no further activity occurs within 56 days. Thank you for your contributions.

SecondDim commented 1 year ago

Hello, I think this issue still exist. I'm try same way and get same error.

NetMQ: 4.0.1.13 C# .NET 6.0