pebbe / zmq4

A Go interface to ZeroMQ version 4
BSD 2-Clause "Simplified" License
1.17k stars 163 forks source link

How to read and write one socket from the same thread without delay? #134

Open djmarcin opened 6 years ago

djmarcin commented 6 years ago

I'm having trouble grokking the right way to use zmq4 for the following situation. I have a server that accepts connections from workers. The workers register with the server, receive control messages from the server, and send data to the server. The current worker setup is a Dealer socket. The workers generate a stream of data that must be sent on the Dealer socket as soon as possible. Control messages from the server should also be read from the Dealer socket as soon as possible.

The problem I'm running into is that there doesn't seem to be a way to write a select statement with cases for both the stream of generated data (which arrives on a go channel) and control messages from the zeroMQ Dealer socket. I also can't just transform the read into a go channel in a goroutine because then I would be reading and writing a single socket from multiple threads.

One potential solution is to create an inproc Push/Pull socket in the worker with one goroutine writing to the Push socket and another goroutine executing a Poller on the Pull socket and the Dealer socket. When the Pull socket gets data, the goroutine simply copies it to the Dealer socket and restarts the Poller. This would seem to fulfill my desire to publish messages to the Dealer as soon as possible, as well as read messages from the Dealer as they arrive.

However, this feels weird. Ultimately, I guess the inproc channel is basically like a go channel without the fancy language support. Is this the right way to accomplish what I want with the current library? Are there any plans to make zmq4 support go channels as a method for reading and writing sockets so this can be written using the native language (i.e. select statements) instead?

pebbe commented 6 years ago

Have a look at NewReactor. Perhaps this will solve your needs. There are a few of the included examples that use this.

You could wrap a socket in a goroutine, and communicate with it through Go channels.

There are some new socket types that are thread save, meant to replace the non-thread safe versions. These are in "draft" state, and subject to change. To use these, you need to install ZeroMQ with the configure option --enable-drafts, end then import the draft version of zmq4:

import zmq "github.com/pebbe/zmq4/draft"
djmarcin commented 6 years ago

The comments on NewReactor.Run indicate that it will alternate polling the socket for the given interval and the channel. This means that new data on the channel may have to wait for the entire polling interval before it is handled, but I want to ensure it is handled immediately.

Wrapping the socket in a goroutine was my initial thought but I don't see how it solves the problem of Socket.ReadMessage or Poller.Poll being blocking calls. I wouldn't be able to respond to data on the incoming channel and send it on the socket until after the timeout, similar to the Reactor solution. The issue is basically how do I respond immediately (i.e. without a timeout delay) to incoming data on either a Socket or a Channel.

The inproc Push/Pull channel does seem to solve this problem and I could probably abstract that into a channel based wrapper if you think it would be generally useful.

pebbe commented 6 years ago

If time-outs are an obstacle, then the last option would be best, probably. Instead of the Reactor, you can use the Poller, and include the Pull socket in the llist of regular sockets being polled.

I don't think a wrapper in zmq4 would be useful. It should be simple enough to tailor your own channel-to-socket routine fitted to your specific needs, whenever you need it.