NetMQ / NetMQ3-x

NetMQ 3.x stable release branch - bug fixes only
Other
9 stars 17 forks source link

NetMQPoller - ReceiveReady in Router socket #9

Closed ciottied closed 7 years ago

ciottied commented 8 years ago

In NetMQ 3.3.3.4 I migrated my code from the old syntax/interface (context - poller) to the new one (NetMQPoller). I found that the ReceiveReady event doesn't fire anymore.

Here is my new non-working code (sorry, is vb.net but just a few lines...)

'//Class Declarations
Private Withevents RSocket As Sockets.RouterSocket = Nothing
Private WithEvents Poller As NetMQPoller = Nothing

Public Sub connect()
                '//Create Router-Reply socket
                RSocket = New Sockets.RouterSocket
                RSocket.Bind("tcp://" + prefs.ServerIP + ":" + prefs.ServerReqRepPort)

                '//Create Poller, Hooks receive event (generated by poller) 
                Poller = New NetMQPoller
                Poller.Add(RSocket)
                AddHandler RSocket.ReceiveReady, AddressOf ProduceIncomingBuffer
                Poller.RunAsync()
 End Sub

Private Sub ProduceIncomingBuffer(sender As Object, e As NetMQSocketEventArgs)
'//Parses incoming messages
End Sub

And this is the old working code:

'//Class Declarations
Private context As NetMQContext = Nothing
Private WithEvents RSocket As Sockets.RouterSocket = Nothing
Private WithEvents poller As Poller = Nothing

  Public Sub connect()
                '//Create context and socket
                context = NetMQContext.Create()
                RSocket = context.CreateRouterSocket

                '//Create Poller and hooks NetMQ events (generated by poller)
                poller = New Poller(RSocket)
                AddHandler RSocket.ReceiveReady, AddressOf ProduceIncomingBuffer

                '//Starts everything
                RSocket.Bind("tcp://" + prefs.ServerIP + ":" + prefs.ServerReqRepPort)
               Dim t1 As Task = Task.Factory.StartNew(Sub() poller.PollTillCancelled())
End Sub

Private Sub ProduceIncomingBuffer(sender As Object, e As NetMQSocketEventArgs)
'//Parses incoming messages
End Sub

Strangely, this happens only with the RouterSocket, beacause with the SubSocket the ReceiveReady event fires regularly with the new and with the old syntax/interface.

If something is wrong in my code I apologize in advance.

Regards Edoardo

somdoron commented 8 years ago

Can you include the client code? if you can create small vb project that will be great

On Oct 13, 2016 5:01 PM, "ciottied" notifications@github.com wrote:

In NetMQ 3.3.3.4 I migrated my code from the old syntax/interface (context

  • poller) to the new one (NetMQPoller). I found that the receiveready event doesn't fire anymore.

Here is my new non-working code (sorry, is vb.net but just a few lines...)

'//Class Declarations Private Withevents RSocket As Sockets.RouterSocket = Nothing Private WithEvents Poller As NetMQPoller = Nothing

Public Sub connect() If IsNothing(RSocket) Then

        '//Create Router-Reply socket in a new dedicated thread
        RSocket = New Sockets.RouterSocket
        RSocket.Bind("tcp://" + prefs.ServerIP + ":" + prefs.ServerReqRepPort)

        '//Create Poller, Hooks receive event (generated by poller)
        Poller = New NetMQPoller
        Poller.Add(RSocket)
        AddHandler RSocket.ReceiveReady, AddressOf ProduceIncomingBuffer
        Poller.RunAsync()

End Sub

Private Sub ProduceIncomingBuffer(sender As Object, e As NetMQSocketEventArgs) '//Parses incoming messagesr End Sub

And this is the old working code:

'//Class Declarations Private context As NetMQContext = Nothing Private WithEvents RSocket As Sockets.RouterSocket = Nothing Private WithEvents poller As Poller = Nothing

Public Sub connect() '//Create context and socket context = NetMQContext.Create() RSocket = context.CreateRouterSocket

        '//Create Poller and hooks NetMQ events (generated by poller)
        poller = New Poller(RSocket)
        AddHandler RSocket.ReceiveReady, AddressOf ProduceIncomingBuffer

        '//Starts everything
        RSocket.Bind("tcp://" + prefs.ServerIP + ":" + prefs.ServerReqRepPort)
       Dim t1 As Task = Task.Factory.StartNew(Sub() poller.PollTillCancelled())

End Sub

Private Sub ProduceIncomingBuffer(sender As Object, e As NetMQSocketEventArgs) '//Parses incoming messagesr End Sub

Strangely, this happens only with the RouterSocket, beacause with the SubSocket the ReceiveReady event fires regularly with the new and with the old syntax/interface.

If something is wrong in my code I apologize in advance.

Regards Edoardo

— You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHub https://github.com/NetMQ/NetMQ3-x/issues/9, or mute the thread https://github.com/notifications/unsubscribe-auth/AClv9iBqJeA_hQLo43a61BAigRTITnjKks5qzjmqgaJpZM4KV7Hs .

ciottied commented 8 years ago

vb project: I'll do it, but It'll take some time, I've to extrapolate from a big project. Client code is a basically a dealer socket:

'//Class Declarations
Private context As NetMQContext = Nothing
Private WithEvents DSocket As Sockets.DealerSocket = Nothing
Private WithEvents poller As Poller = Nothing
Private PollerTask As Task

        Public Sub Connect()
                '//Create context and socket
                DSocket = context.CreateDealerSocket
                DSocket.Options.Identity = XASCII.GetBytes(prefs.ClientID)

                '//CreatePoller
                poller = New Poller
                poller.AddSocket(DSocket)

                '// Hooks NetMQ events (generated by poller) 
                AddHandler DSocket.ReceiveReady, AddressOf ReceiveServerReply

                '//Starts everything
                PollerTask = Task.Factory.StartNew(Sub() poller.PollTillCancelled())
                DSocket.Connect("tcp://" + prefs.ServerIP(SID) + ":" + prefs.ServerReqRepPort(SID))
      End Sub
ciottied commented 8 years ago

Here it comes the vb.net project (VS2015 Net 4.6.1).

In the form you'll see two buttons, one works with the old syntax router, the other works with the new syntax. Just don't press the two buttons in the same debug session (the project does not close and reopen the sockets).

NB: I Included a Pub-Sub pair that works flawlessly with the new syntax.

FromEdoToDoronRouterDealer.zip

ciottied commented 8 years ago

Hi somdoron. Any news ?

ciottied commented 7 years ago

I solved the problem. The problem was not in receiving messages with NetMQpoller, but in SENDING them using the NetMQPoller as a Task Scheduler.

I was trying to send messages in a thread safe manner starting a consumer task (basically a concurrent.blockingcollection):

AddHandler RSock.ReceiveReady, AddressOf ProduceIncomingBuffer
Dim T As Task = New Task(AddressOf ConsumeOutgoingBuffer)
T.Start(Poller)
Poller.RunAsync()

I solved the problem using NetMQQueue:

Public OutgoingBuffer As New NetMQQueue(Of String)
'...
 Public Sub connect(IP As String, PT As String)
            If IsNothing(RSock) Then
                '--Create Router-Reply socket in a new dedicated thread
                RSock = New Sockets.RouterSocket
                RSock.Bind("tcp://" + IP + ":" + PT)
                '--Create Poller, Hooks receive event (generated by poller) to routines and starts Outgoing queue (using poller as custom scheduler)
                Poller = New NetMQPoller
                Poller.Add(RSock)
                Poller.Add(OutgoingBuffer)
                AddHandler RSock.ReceiveReady, AddressOf ConsumeIncomingBuffer
                AddHandler OutgoingBuffer.ReceiveReady, AddressOf ConsumeOutgoingBuffer
                Poller.RunAsync()
            End If
        End Sub

      Private Sub ConsumeOutgoingBuffer(sender As Object, e As NetMQQueueEventArgs(Of String))
            Dim msg As String = ""
            While e.Queue.TryDequeue(msg, TimeSpan.FromMilliseconds(10))
                RSock.SendMoreFrame(ClID) 'first frame clientID
                RSock.SendFrame(msg) 'second frame - payload
            End While
        End Sub

or using PairSockets (here example for dealer socket):

       Private PairSend As New Sockets.PairSocket
        Private PairReceive As New Sockets.PairSocket

        Public Sub New()
            PairSend.Bind("inproc://OutTask")
            PairReceive.Connect("inproc://OutTask")
            Task.Factory.StartNew(AddressOf ConsumeOutgoingBuffer)
        End Sub

        Public Sub Connect(IP As String, PT As String)
            If IsNothing(DSock) Then
                '--Create context and socket
                DSock = New Sockets.DealerSocket
                DSock.Options.Identity = XASCII.GetBytes(ClID)
                DSock.Connect("tcp://" + IP + ":" + PT)
                '--CreatePoller 
                poller = New NetMQPoller
                AddHandler DSock.ReceiveReady, AddressOf ProduceIncomingBuffer   '-- Hooks NetMQ events (generated by poller) to routines
                AddHandler PairReceive.ReceiveReady, AddressOf SyncedSend  '-- Hooks NetMQ events (generated by poller) to routines
                poller.Add(DSock)
                poller.Add(PairReceive)
                poller.RunAsync()
            End If
        End Sub

        Private Sub ConsumeOutgoingBuffer()
            For Each msg As String In OutgoingBuffer.GetConsumingEnumerable
                PairSend.SendFrame(msg)
            Next
        End Sub

        Private Sub SyncedSend(sender As Object, e As NetMQSocketEventArgs)
            Dim msg As String = e.Socket.ReceiveFrameString(False)
            Dim zmsg As New NetMQMessage
            zmsg.Append(msg)
            DSock.SendMultipartMessage(zmsg) 'frame1 - payload. Frame0-CliD set in socket properties
        End Sub

follows small test project with netmq library updated to 4.0.0.1.

NETMQ 4.0.0.1 TEST.zip](https://github.com/NetMQ/NetMQ3-x/files/1105167/NETMQ.4.0.0.1.TEST.zip)

ciottied commented 7 years ago

closing solved issue