Open bradgessler opened 8 years ago
If my understanding is correct, the current behavior exists because it is possible for one or more messages to have been published while the callback is run.
One possible approach is to group all the subscribers who are at the same message sequence together. We could ask redis for the current message sequence and list one time, then fan out the response to all the subscribers who are at the same message sequence.
We'd need to be careful with this approach though. A naïve implementation could make it such that no clients would get the next message until all clients had gotten the previous message. If that happened, then a single slow client could disrupt everyone. (Similar to a slowloris attack)
The Redis client should not skip callbacks.
The client won't skip callbacks in the pubsub.on(:message)
callback; we'd just need to expose the on message callback to the consumer instead of hiding it behind the deferrable that we create in the Channel
class. Relevant code for that is at https://github.com/firehoseio/firehose/blob/master/lib/firehose/server/subscriber.rb#L23.
I propose instead of passing messages
through the channel deferrable we pass some sort of MessagesEmitter
that can be called from ruby in the case of pulling message from the Redis array or via the Redis pubsub callback. This class would be Enumerable
or an Enumerator
so that WebSockets can simply call each
and HttpLongPoll could call take(1)
.
From @ProTip:
It appears to me that firehose pubsub occurs only once before Redis is polled again; am I reading this correctly?
If this is the case then the pubsubbing is way less efficient and Redis gets hit way more than what I would have expected for a pubsub fanout.
To following this along a bit:
For a channel with 6k subscribers:
Vote occurs -> Redis pubs 6k deferrables are resolved *Redis is pinged for the latest sequence and message_list 6k times(I think they are pipelined otherwise it would be 12k).
If not for the latency this situation appears to be trying really hard at N**2'ing Redis lookups.