zeromq / netmq

A 100% native C# implementation of ZeroMQ for .NET
Other
2.95k stars 744 forks source link

Unhandled Exceptions in NetMQ.NetMQPoller.RunPoller() & NetMQ.Core.Utils.Proactor.Loop() #971

Open SteveHarveyUK opened 3 years ago

SteveHarveyUK commented 3 years ago

Environment

NetMQ Version:    v4.0.1.6
Operating System: Windows Server 2008R2/20012R2
.NET Version:     .Net v4.7.2

Context

I'm using NetMQ for client/server comms for a pricing system using Pub/Sub for price data and Req/Res for C&C. It's working very well except that I'm seeing random and unreproducible exceptions occurring in the bowels of the NetMQ library in my client service.

It's possible that these exceptions are a result of network issues, but even so being able to handle that fault with out the service failing is mission critical.

Expected behaviour

Some way to suppress or handle exceptions within the internal NetMQ stack.

Actual behaviour

Unhandled exceptions causing the service to fail.

Steps to reproduce the behaviour

The client is using the following constructs:

NetMQBeacon in a NetMQPoller via RunAsync. SubscriberSocket in a NetMQPoller via RunAsync. RequestSocket directly via SendMoreFrame/TrySendFrame and TryReceiveFrameString/TryReceiveFrameBytes.

Example Exceptions Seen:

Link to complete list: ExceptionsSpreadsheet

Exception Info: System.ArgumentOutOfRangeException
   at System.ThrowHelper.ThrowArgumentOutOfRangeException(System.ExceptionArgument, System.ExceptionResource)
   at NetMQ.Core.Patterns.Utils.ArrayExtensions.Swap[[System.__Canon, mscorlib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089]](System.Collections.Generic.List`1<System.__Canon>, Int32, Int32)
   at NetMQ.Core.Patterns.Utils.Distribution.Activated(NetMQ.Core.Pipe)
   at NetMQ.Core.ZObject.ProcessCommand(NetMQ.Core.Command)
   at NetMQ.Core.SocketBase.ProcessCommands(Int32, Boolean, System.Threading.CancellationToken)
   at NetMQ.Core.SocketBase.GetSocketOption(NetMQ.Core.ZmqSocketOption)
   at NetMQ.NetMQSelector.Select(Item[], Int32, Int64)
   at NetMQ.NetMQPoller.RunPoller()
   at NetMQ.NetMQPoller.Run(System.Threading.SynchronizationContext)
   at System.Threading.ExecutionContext.RunInternal(System.Threading.ExecutionContext, System.Threading.ContextCallback, System.Object, Boolean)
   at System.Threading.ExecutionContext.Run(System.Threading.ExecutionContext, System.Threading.ContextCallback, System.Object, Boolean)
   at System.Threading.ExecutionContext.Run(System.Threading.ExecutionContext, System.Threading.ContextCallback, System.Object)
   at System.Threading.ThreadHelper.ThreadStart()
Exception Info: NetMQ.FaultException
   at NetMQ.Msg.Close()
   at NetMQ.Core.Pipe.ProcessPipeTermAck()
   at NetMQ.Core.ZObject.ProcessCommand(NetMQ.Core.Command)
   at NetMQ.Core.IOThread.Ready()
   at NetMQ.Core.Utils.Proactor.Loop()
   at System.Threading.ExecutionContext.RunInternal(System.Threading.ExecutionContext, System.Threading.ContextCallback, System.Object, Boolean)
   at System.Threading.ExecutionContext.Run(System.Threading.ExecutionContext, System.Threading.ContextCallback, System.Object, Boolean)
   at System.Threading.ExecutionContext.Run(System.Threading.ExecutionContext, System.Threading.ContextCallback, System.Object)
   at System.Threading.ThreadHelper.ThreadStart()
Exception Info: NetMQ.FaultException
   at NetMQ.Msg.Close()
   at NetMQ.Core.Transports.EncoderBase.Encode(NetMQ.Core.Transports.ByteArraySegment ByRef, Int32)
   at NetMQ.Core.Transports.StreamEngine.BeginSending()
   at NetMQ.Core.Transports.StreamEngine.ProcessHandshakeCommand(NetMQ.Msg ByRef)
   at NetMQ.Core.Transports.StreamEngine.ProcessInput()
   at NetMQ.Core.Transports.StreamEngine.FeedAction(Action, System.Net.Sockets.SocketError, Int32)
   at NetMQ.Core.Utils.Proactor.Loop()
   at System.Threading.ExecutionContext.RunInternal(System.Threading.ExecutionContext, System.Threading.ContextCallback, System.Object, Boolean)
   at System.Threading.ExecutionContext.Run(System.Threading.ExecutionContext, System.Threading.ContextCallback, System.Object, Boolean)
   at System.Threading.ExecutionContext.Run(System.Threading.ExecutionContext, System.Threading.ContextCallback, System.Object)
   at System.Threading.ThreadHelper.ThreadStart()
Exception Info: NetMQ.FaultException
   at NetMQ.Msg.Close()
   at NetMQ.Core.Transports.EncoderBase.Encode(NetMQ.Core.Transports.ByteArraySegment ByRef, Int32)
   at NetMQ.Core.Transports.StreamEngine.BeginSending()
   at NetMQ.Core.Transports.StreamEngine.FeedAction(Action, System.Net.Sockets.SocketError, Int32)
   at NetMQ.Core.Utils.Proactor.Loop()
   at System.Threading.ExecutionContext.RunInternal(System.Threading.ExecutionContext, System.Threading.ContextCallback, System.Object, Boolean)
   at System.Threading.ExecutionContext.Run(System.Threading.ExecutionContext, System.Threading.ContextCallback, System.Object, Boolean)
   at System.Threading.ExecutionContext.Run(System.Threading.ExecutionContext, System.Threading.ContextCallback, System.Object)
   at System.Threading.ThreadHelper.ThreadStart()
Exception Info: NetMQ.FaultException
   at NetMQ.Msg.Close()
   at NetMQ.Core.Transports.Pgm.PgmSession.DropSubscriptions()
   at NetMQ.Core.ZObject.ProcessCommand(NetMQ.Core.Command)
   at NetMQ.Core.IOThread.Ready()
   at NetMQ.Core.Utils.Proactor.Loop()
   at System.Threading.ExecutionContext.RunInternal(System.Threading.ExecutionContext, System.Threading.ContextCallback, System.Object, Boolean)
   at System.Threading.ExecutionContext.Run(System.Threading.ExecutionContext, System.Threading.ContextCallback, System.Object, Boolean)
   at System.Threading.ExecutionContext.Run(System.Threading.ExecutionContext, System.Threading.ContextCallback, System.Object)
   at System.Threading.ThreadHelper.ThreadStart()
Exception Info: System.ArgumentOutOfRangeException
   at System.ThrowHelper.ThrowArgumentOutOfRangeException(System.ExceptionArgument, System.ExceptionResource)
   at NetMQ.Core.Patterns.Utils.ArrayExtensions.Swap[[System.__Canon, mscorlib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089]](System.Collections.Generic.List`1<System.__Canon>, Int32, Int32)
   at NetMQ.Core.Patterns.Utils.Distribution.Activated(NetMQ.Core.Pipe)
   at NetMQ.Core.ZObject.ProcessCommand(NetMQ.Core.Command)
   at NetMQ.Core.SocketBase.ProcessCommands(Int32, Boolean, System.Threading.CancellationToken)
   at NetMQ.Core.SocketBase.GetSocketOption(NetMQ.Core.ZmqSocketOption)
   at NetMQ.NetMQSelector.Select(Item[], Int32, Int64)
   at NetMQ.NetMQPoller.RunPoller()
   at NetMQ.NetMQPoller.Run(System.Threading.SynchronizationContext)
   at System.Threading.ExecutionContext.RunInternal(System.Threading.ExecutionContext, System.Threading.ContextCallback, System.Object, Boolean)
   at System.Threading.ExecutionContext.Run(System.Threading.ExecutionContext, System.Threading.ContextCallback, System.Object, Boolean)
   at System.Threading.ExecutionContext.Run(System.Threading.ExecutionContext, System.Threading.ContextCallback, System.Object)
   at System.Threading.ThreadHelper.ThreadStart()
Exception Info: System.IndexOutOfRangeException
   at NetMQ.Core.Patterns.Utils.Trie.Check(System.Span`1<Byte>)
   at NetMQ.Core.Patterns.XSub.XHasIn()
   at NetMQ.Core.SocketBase.GetSocketOption(NetMQ.Core.ZmqSocketOption)
   at NetMQ.NetMQSelector.Select(Item[], Int32, Int64)
   at NetMQ.NetMQPoller.RunPoller()
   at NetMQ.NetMQPoller.Run(System.Threading.SynchronizationContext)
   at System.Threading.ExecutionContext.RunInternal(System.Threading.ExecutionContext, System.Threading.ContextCallback, System.Object, Boolean)
   at System.Threading.ExecutionContext.Run(System.Threading.ExecutionContext, System.Threading.ContextCallback, System.Object, Boolean)
   at System.Threading.ExecutionContext.Run(System.Threading.ExecutionContext, System.Threading.ContextCallback, System.Object)
   at System.Threading.ThreadHelper.ThreadStart()
Exception Info: System.IndexOutOfRangeException
   at NetMQ.Core.Utils.YQueue`1[[NetMQ.Msg, NetMQ, Version=4.0.1.4, Culture=neutral, PublicKeyToken=a6decef4ddc58b3a]].Push(NetMQ.Msg ByRef)
   at NetMQ.Core.YPipe`1[[NetMQ.Msg, NetMQ, Version=4.0.1.4, Culture=neutral, PublicKeyToken=a6decef4ddc58b3a]].Write(NetMQ.Msg ByRef, Boolean)
   at NetMQ.Core.Pipe.Write(NetMQ.Msg ByRef)
   at NetMQ.Core.Patterns.XSub+<>c.<.cctor>b__8_0(Byte[], Int32, System.Object)
   at NetMQ.Core.Patterns.Utils.Trie.ApplyHelper(Byte[], Int32, Int32, TrieDelegate, System.Object)
8< -- Many repeated lines -- >8
   at NetMQ.Core.Patterns.Utils.Trie.ApplyHelper(Byte[], Int32, Int32, TrieDelegate, System.Object)
   at NetMQ.Core.Patterns.XSub.XAttachPipe(NetMQ.Core.Pipe, Boolean)
   at NetMQ.Core.SocketBase.AttachPipe(NetMQ.Core.Pipe, Boolean)
   at NetMQ.Core.ZObject.ProcessCommand(NetMQ.Core.Command)
   at NetMQ.Core.SocketBase.ProcessCommands(Int32, Boolean, System.Threading.CancellationToken)
   at NetMQ.Core.SocketBase.GetSocketOption(NetMQ.Core.ZmqSocketOption)
   at NetMQ.NetMQSelector.Select(Item[], Int32, Int64)
   at NetMQ.NetMQPoller.RunPoller()
   at NetMQ.NetMQPoller.Run(System.Threading.SynchronizationContext)
   at System.Threading.ExecutionContext.RunInternal(System.Threading.ExecutionContext, System.Threading.ContextCallback, System.Object, Boolean)
   at System.Threading.ExecutionContext.Run(System.Threading.ExecutionContext, System.Threading.ContextCallback, System.Object, Boolean)
   at System.Threading.ExecutionContext.Run(System.Threading.ExecutionContext, System.Threading.ContextCallback, System.Object)
   at System.Threading.ThreadHelper.ThreadStart()
Exception Info: System.IndexOutOfRangeException
   at NetMQ.Core.Utils.YQueue`1[[NetMQ.Msg, NetMQ, Version=4.0.1.6, Culture=neutral, PublicKeyToken=a6decef4ddc58b3a]].Push(NetMQ.Msg ByRef)
   at NetMQ.Core.YPipe`1[[NetMQ.Msg, NetMQ, Version=4.0.1.6, Culture=neutral, PublicKeyToken=a6decef4ddc58b3a]].Write(NetMQ.Msg ByRef, Boolean)
   at NetMQ.Core.Pipe.Write(NetMQ.Msg ByRef)
   at NetMQ.Core.Patterns.XSub+<>c.<.cctor>b__8_0(Byte[], Int32, System.Object)
   at NetMQ.Core.Patterns.Utils.Trie.ApplyHelper(Byte[], Int32, Int32, TrieDelegate, System.Object)
8< -- Many repeated lines -- >8
   at NetMQ.Core.Patterns.Utils.Trie.ApplyHelper(Byte[], Int32, Int32, TrieDelegate, System.Object)
   at NetMQ.Core.Patterns.XSub.XHiccuped(NetMQ.Core.Pipe)
   at NetMQ.Core.Pipe.ProcessHiccup(System.Object)
   at NetMQ.Core.ZObject.ProcessCommand(NetMQ.Core.Command)
   at NetMQ.Core.SocketBase.ProcessCommands(Int32, Boolean, System.Threading.CancellationToken)
   at NetMQ.Core.SocketBase.GetSocketOption(NetMQ.Core.ZmqSocketOption)
   at NetMQ.NetMQSelector.Select(Item[], Int32, Int64)
   at NetMQ.NetMQPoller.RunPoller()
   at NetMQ.NetMQPoller.Run(System.Threading.SynchronizationContext)
   at System.Threading.ExecutionContext.RunInternal(System.Threading.ExecutionContext, System.Threading.ContextCallback, System.Object, Boolean)
   at System.Threading.ExecutionContext.Run(System.Threading.ExecutionContext, System.Threading.ContextCallback, System.Object, Boolean)
   at System.Threading.ExecutionContext.Run(System.Threading.ExecutionContext, System.Threading.ContextCallback, System.Object)
   at System.Threading.ThreadHelper.ThreadStart()
Exception Info: System.Net.Sockets.SocketException
   at System.Net.Sockets.Socket.Receive(Byte[], Int32, Int32, System.Net.Sockets.SocketFlags)
   at NetMQ.Core.Mailbox.TryRecv(Int32, NetMQ.Core.Command ByRef)
   at NetMQ.Core.Reaper.InEvent()
   at NetMQ.Core.Utils.Poller.Loop()
   at System.Threading.ExecutionContext.RunInternal(System.Threading.ExecutionContext, System.Threading.ContextCallback, System.Object, Boolean)
   at System.Threading.ExecutionContext.Run(System.Threading.ExecutionContext, System.Threading.ContextCallback, System.Object, Boolean)
   at System.Threading.ExecutionContext.Run(System.Threading.ExecutionContext, System.Threading.ContextCallback, System.Object)
   at System.Threading.ThreadHelper.ThreadStart()
Exception Info: System.Net.Sockets.SocketException
   at System.Net.Sockets.Socket.Receive(Byte[], Int32, Int32, System.Net.Sockets.SocketFlags)
   at NetMQ.Core.Mailbox.TryRecv(Int32, NetMQ.Core.Command ByRef)
   at NetMQ.Core.SocketBase.ProcessCommands(Int32, Boolean, System.Threading.CancellationToken)
   at NetMQ.Core.SocketBase.GetSocketOption(NetMQ.Core.ZmqSocketOption)
   at NetMQ.NetMQSelector.Select(Item[], Int32, Int64)
   at NetMQ.NetMQPoller.RunPoller()
   at NetMQ.NetMQPoller.Run(System.Threading.SynchronizationContext)
   at System.Threading.ExecutionContext.RunInternal(System.Threading.ExecutionContext, System.Threading.ContextCallback, System.Object, Boolean)
   at System.Threading.ExecutionContext.Run(System.Threading.ExecutionContext, System.Threading.ContextCallback, System.Object, Boolean)
   at System.Threading.ExecutionContext.Run(System.Threading.ExecutionContext, System.Threading.ContextCallback, System.Object)
   at System.Threading.ThreadHelper.ThreadStart()

--

nxip commented 2 years ago

is there any fix or suggestion for this exception? Facing same issue after some hours of service working, pattern is Dealer router, on Router side this exception happened

Code : smartDeviceRouterServer.Options.RouterHandover = true; smartDeviceRouterServer.Options.IPv4Only = false; smartDeviceRouterServer.ReceiveReady += SmartDeviceRouterServer_ReceiveReady; smartDeviceRouterPoller = new NetMQPoller(); smartDeviceRouterPoller.Add(smartDeviceRouterServer); smartDeviceRouterPoller.RunAsync();

Exception Info: System.Net.Sockets.SocketException at System.Net.Sockets.Socket.Receive(Byte[], Int32, Int32, System.Net.Sockets.SocketFlags) at NetMQ.Core.Mailbox.TryRecv(Int32, NetMQ.Core.Command ByRef) at NetMQ.Core.SocketBase.ProcessCommands(Int32, Boolean, System.Threading.CancellationToken) at NetMQ.Core.SocketBase.GetSocketOption(NetMQ.Core.ZmqSocketOption) at NetMQ.NetMQSelector.Select(Item[], Int32, Int64) at NetMQ.NetMQPoller.RunPoller() at NetMQ.NetMQPoller.Run(System.Threading.SynchronizationContext) at System.Threading.ExecutionContext.RunInternal(System.Threading.ExecutionContext, System.Threading.ContextCallback, System.Object, Boolean) at System.Threading.ExecutionContext.Run(System.Threading.ExecutionContext, System.Threading.ContextCallback, System.Object, Boolean) at System.Threading.ExecutionContext.Run(System.Threading.ExecutionContext, System.Threading.ContextCallback, System.Object) at System.Threading.ThreadHelper.ThreadStart()

SteveHarveyUK commented 2 years ago

Afternoon @IPMADAR, In my case, I believe that at least some of the issues were caused by cross threaded use of sockets... specifically the Subscribe/Unsubscribe calls on the SubscriberSocket. Presumably they are actually sending data to the PublisherSocket at the other end so have the same prohibition to a single thread.