zeromq / libzmq

ZeroMQ core engine in C++, implements ZMTP/3.1
https://www.zeromq.org
Mozilla Public License 2.0
9.6k stars 2.35k forks source link

ZMQ_CONFLATE on PUB/SUB is broken when using filters #1688

Open petke opened 8 years ago

petke commented 8 years ago

I have a realtime PUB feed. I use the ZMQ_CONFLATE setting to make sure clients never get old queued up messages.

I have a hundred SUB clients that use filter to get only the messages with a key they are interested in. With the setting ZMQ_CONFLATE the clients receive queue size is at most 1 message. That means even the most recent messages with keys the clients are subscribed to get throws away. Only one message remains in the queue, instead of one message for every key as one would like.

I tried to use the workaround to use one PUB/SUB socket per filter. That way every message key gets its own queue. But that can quickly use up thousands of sockets, which is probably not a good idea (windows FD_SETSIZE is set to only 64).

As a use case lets imagine a stock price pub feed. The clients are day traders using automated tools. They do not want to trade using old prices. Every millisecond counts. The filter are the id/name of the stocks they are trading on.

petke commented 8 years ago

The workaround I made was to create a client that manually drops old queued messages. Googling I found many people have asked for the same thing. I do think ZeroMQ should provide functionality for this common use case somehow.

The recommendation to use the suicidal snail for slow subscribers, and simply kill them, doesn't seem practical. What if we need to support clients where some are naturally slower than others. We might not have control over all clients. Some might live on slow computers, and some might be fast but only care to receive the messages once in a while. Old messages are of little interest for realtime messaging.

Examples are say a feed server sending out current share prices (used of trading), or a driver sending out current mouse cursor position (for rendering), or a FPS game server sending out current position of the players (for clients to keep in sync with). Old messages are of little or no use to such clients. Only the latest messages matter. The old messages are the ones we want to drop in case the client cant keep up.

Anyways here is my workaround. It seems a bit of a hack. Im hoping you clever people could come up with something better internally in the library.

http://stackoverflow.com/questions/34503252/howto-make-zeromq-pub-sub-drop-old-messages-instead-of-new-for-realtime-feeds/34563635#34563635

valschmidt commented 7 years ago

+1 for this feature request (I would argue bug fix). Without it, ZMQ is seems totally inappropriate for real-time data streams, for all the reasons petke states.

m2kz commented 5 years ago

It is almost three years old issue, but could I try to do it?

m2kz commented 5 years ago

Okay, today I spent some time to learn something about codebase and contribution. Please someone to make sure that my idea of solving that problem is correct: there should be added a socket option, let's call it ZMQ_UNIQUE_MSG, that while added to PUB configuration via zmq_setsockopt() would keep only one message per one SUB filter. Is that correct way?

samuelrohr commented 5 years ago

@mkmodrzew i dont think so. What would be great as a opt to tell zmq that when the queue is full drop de oldest message on it and replace with the new that have just arrived.

ZMQ_CONFLATE seems great but for SUB with filters and for cases where it just take a bit longer to process message on the SUB will just make it lose message as @petke said.

themightyoarfish commented 3 years ago

Is there any progress on this? No way to use Pub/Sub with filters and get non-stale messages? It seems like quite a problem to me.

ahoereth commented 2 years ago

Again bringing this up. Any news here?

ghost commented 2 years ago

A potential solution is the following. On the receiving end, set up a finite thread safe queue with the policy that the queue drops the head element when it is full and a push occurs (eg a ring buffer). The push/pop/peek should be nonblocking (push drops head and writes to tail if queue is full, pop and peek return empty element or otherwise signal that the queue is empty if it is empty). Set up two threads. The first one receives from ZMQ (possibly with filters in place) and pushes to your queue. The second one is your main processing thread that consumes from your finite queue.

Of course, this assumes that the first thread (that pops from ZMQ and pushes to your finite queue) runs fast enough to keep up with the ZMQ publisher.