clj-commons / manifold

A compatibility layer for event-driven abstractions
1.02k stars 106 forks source link

Consumer groups in event bus? #185

Closed jdormit closed 4 years ago

jdormit commented 4 years ago

I need an event bus where consumers can be grouped and each message on a topic is only delivered to one of the consumers within the group. My current implementation of this involves having each consumer in the group poll the event bus topic stream, taking turns pulling new messages off of the topic. Something like this:

(require '[manifold.bus :as bus]
         '[manifold.deferred :as d]
         '[manifold.stream :as s])

(def bus (bus/event-bus))

(def topic-stream (bus/subscribe bus :my-topic))

(defn make-consumer []
  (let [consumer-stream (s/stream)]
    (d/future
      (loop []
        (when-not (s/closed? consumer-stream)
          @(s/put! consumer-stream
                   @(s/take! topic-stream))
          (recur))))
    consumer-stream))

This approach works, but it feels really janky to poll to topic stream for new messages.

Is there a more idiomatic way to do this with Manifold?

jdormit commented 4 years ago

That code snippet actually has a bug - if the consumer group stream is closed after the take! from the topic stream occurs but before the put! into the consumer stream, the topic message is just lost. So any insight would be much appreciated 🙏

ztellman commented 4 years ago

Manifold doesn't have a built-in demux mechanism, and the polling you're doing there is a reasonable way to approach this. If you always have at least one consumer, you could do something like this (totally untested):

(def consumers (atom #{...}))

(def consumer-stream 
  (->> (repeatedly #(deref consumers))
    (apply concat)
   s/->stream))

(s/consume-async
  (fn [[consumer msg]] (s/put! consumer msg))
  (s/zip topic consumer-stream))

But this doesn't seem less janky to me. Let me know if you have any other questions.