zeromq / netmq

A 100% native C# implementation of ZeroMQ for .NET
Other
2.97k stars 746 forks source link

NetMQ 3.3.0.11, XSUB socket with Poller Not Working #217

Closed hivanov closed 9 years ago

hivanov commented 9 years ago

The example provided in http://netmq.readthedocs.org/en/latest/xpub-xsub/ uses Proxy to bridge XPub and XSub sockets of the Intermediary. When using Poller and implementing the logic by hand, the Poller does not fire the ReceiveReady event of the XSub socket. The problem is with PUB-XSub socket pairs only (Pub-Sub works just fine).

ndextraze-pbp commented 9 years ago

Can you reproduce the same issue with the code at https://github.com/somdoron/netmq. (or update to netmq 3.3.0.12-rc1 using nuget)

hivanov commented 9 years ago

3.3.0.12-rc1 has the same issue.

ndextraze-pbp commented 9 years ago

The Poller doesn't trigger the ReceiveReady event for every message received by the XSub socket or only for the subscribe message?

ndextraze-pbp commented 9 years ago

My bad that doesn't make any sense.

Did you send a subscribing message with the XSub socket first, otherwise it will never receive any messages. The subscription has to be done manually with the XSub socket.

somdoron commented 9 years ago

Can you send your implementation? On Jan 29, 2015 7:48 PM, "hivanov" notifications@github.com wrote:

The example provided in http://netmq.readthedocs.org/en/latest/xpub-xsub/ uses Proxy to bridge XPub and XSub sockets of the Intermediary. When using Poller and implementing the logic by hand, the Poller does not fire the ReceiveReady event of the XSub socket. The problem is with PUB-XSub socket pairs only (Pub-Sub works just fine).

— Reply to this email directly or view it on GitHub https://github.com/zeromq/netmq/issues/217.

hivanov commented 9 years ago

I can send a reference example on Monday. In the meantime, here are some answers/clues: 1) My Intermediary component is started first (the socket has to be bound first before receiving any messages); 2) The Intermediary component does not report that it is ready until the Poller interface has been up and running; 3) My set-up (to see if everything is working correctly) is the following: a) Intermediary starts up, doing bind() and then the Poller is started with

m_Socket = m_Context.CreateXSubSocket();
m_Socket.Options.ReceiveHighWatermark = 0;
m_Socket.ReceiveReady += (sender, e) =>
{
  Console.WriteLine("Received!");
  var message = m_Socket.ReceiveMessage();
  m_XSubSocket.SendMessage(message);
}
m_Socket.Bind(<<xsub_address>>);
Task.Factory.StartNew(m_Poller.Start);
do
{
  Thread.Sleep(1)
} while (!m_Poller.IsRunning);

b) The client and server are done in the same thread, using the same context:

using (var context = NetMQContext.Create())
using (var socket = context.CreatePublisherSocket())
{
  socket.Connect(<<xsub_address>>);
  // wait before send
  Thread.Sleep(2500);
  var message = new Message();
  message.Append("Hello!");
  socket.SendMessage(message);
}

4) I am omitting the set-up of the other components as they seem irrelevant (the event is never called); 5) The same set-up works just by changing the Poller to Proxy bridging the XSub with XPub sockets; 6) The firewall is off and both set-ups are using the same ports (so the message is not filtered before receiving).

Please see if this helps. If not, I could send you the project sources for debugging purposes.

somdoron commented 9 years ago

You need to forward the subscription from the XPublisher to the XSubscriber, if you are not doing that you will never get messages from the subscriber.

On Thu, Jan 29, 2015 at 10:25 PM, hivanov notifications@github.com wrote:

I can send a reference example on Monday. In the meantime, here are some answers/clues: 1) My Intermediary component is started first (the socket has to be bound first before receiving any messages); 2) The Intermediary component does not report that it is ready until the Poller interface has been up and running; 3) My set-up (to see if everything is working correctly) is the following: a) Intermediary starts up, doing bind() and then the Poller is started with

m_Socket = m_Context.CreateXSubSocket(); m_Socket.Options.ReceiveHighWatermark = 0; m_Socket.ReceiveReady += (sender, e) => { Console.WriteLine("Received!"); var message = m_Socket.ReceiveMessage(); m_XSubSocket.SendMessage(message); } m_Socket.Bind(<>); Task.Factory.StartNew(m_Poller.Start);do { Thread.Sleep(1) } while (!m_Poller.IsRunning);

b) The client and server are done in the same thread, using the same context:

using (var context = NetMQContext.Create())using (var socket = context.CreatePublisherSocket()) { socket.Connect(<>); // wait before send Thread.Sleep(2500); var message = new Message(); message.Append("Hello!"); socket.SendMessage(message); }

4) I am omitting the set-up of the other components as they seem irrelevant (the event is never called); 5) The same set-up works just by changing the Poller to Proxy bridging the XSub with XPub sockets; 6) The firewall is off and both set-ups are using the same ports (so the message is not filtered before receiving).

Please see if this helps. If not, I could send you the project sources for debugging purposes.

— Reply to this email directly or view it on GitHub https://github.com/zeromq/netmq/issues/217#issuecomment-72097635.

hivanov commented 9 years ago

How to do that? Is th

hivanov commented 9 years ago

Sorry for closing the issue -- it may be documentation-related. How to distinguish between the regular and the subscription messages?

somdoron commented 9 years ago

Listen for messages from the publisher and send them as is to the subscriber On Jan 30, 2015 10:40 AM, "hivanov" notifications@github.com wrote:

How to do that? Is th

— Reply to this email directly or view it on GitHub https://github.com/zeromq/netmq/issues/217#issuecomment-72176397.

somdoron commented 9 years ago

You listen for messages from xpub. If it starts with 0 or 1 is a subscription or unsubscription send that include the first byte to the xsub. On Jan 30, 2015 10:48 AM, "hivanov" notifications@github.com wrote:

Sorry for closing the issue -- it may be documentation-related. How to distinguish between the regular and the subscription messages?

— Reply to this email directly or view it on GitHub https://github.com/zeromq/netmq/issues/217#issuecomment-72177261.