clj-commons / manifold

A compatibility layer for event-driven abstractions
1.02k stars 106 forks source link

joining streams - best practices #144

Closed mccraigmccraig closed 6 years ago

mccraigmccraig commented 6 years ago

i have a need to join some streams

the best approach i've come up with so far (which doesn't blow the stack with callbacks), is to use an intermediate stream and have a consumer of that forward output values to the output stream and put the next value on to the intermediate stream

this works, but it seems awkward - am i missing something ?

here's the core fn

https://gist.github.com/mccraigmccraig/67bd74b4bb306631cd7e868d3a854b32

and the context

https://gist.github.com/mccraigmccraig/12027eb01be94459240193e69d7c1519

dm3 commented 6 years ago

I did things similar to that before - here's something that seems to work in my REPL:

(require '[manifold.deferred :as d])
(require '[manifold.stream :as s])

(defn- connect-pairs [pairs op]
  ;; need this in order not to lose values to other consumers of `srcs` streams
  (doseq [[a b] pairs]
    (s/connect-via a #(s/put! b %) b {:description {:op op}})))

(defn merge-sorted-by
  "Returns a stream containing elements from sorted `srcs` streams in order
  determined by `keyfn` and optional `comp`. Returned stream is closed when all
  of the inputs are drained."
  ([keyfn srcs] (merge-sorted-by keyfn compare srcs))
  ([keyfn ^java.util.Comparator comp srcs]
   (let [cnt (count srcs)]
     (cond
       (= 0 cnt) (s/->source [])
       (= 1 cnt) (first srcs)
       :else
       (let [intermediates (repeatedly (count srcs) s/stream)
             dst (s/stream)
             ^java.util.Queue q
             (java.util.PriorityQueue.
               (reify java.util.Comparator
                 (compare [_ [_ a] [_ b]]
                   (. comp (compare (keyfn a) (keyfn b))))))]

         (connect-pairs (map list srcs intermediates) "merge-sorted-by")

         (-> (->> (map #(s/take! % ::done) intermediates)
                  (apply d/zip))
             (d/chain
               (fn [xs]
                 (->> (map vector intermediates xs)
                      (remove (fn [[_ v]] (= v ::done)))
                      (.addAll q)))
               (fn [_]
                 (d/loop []
                   (if-let [[src v] (.poll q)]
                     (-> (s/put! dst v)
                         (d/chain
                           (fn [ok?]
                             (if ok?
                               (s/take! src ::done)
                               (do (.clear q) ::finished)))
                           (fn [src-v]
                             (when-not (#{::done ::finished} src-v)
                               (.add q [src src-v]))
                             (d/recur))))
                     (s/close! dst))))))

         (s/source-only dst))))))
mccraigmccraig commented 6 years ago

ah, d/loop would be what i was needing to avoid blowing the stack with callbacks... otherwise your approach is quite similar to how mine looked before i started using an intermediate stream to avoid the blown stack

neither of our approaches involves any connection between the upstream sources and the downstream output though - so presumably errors on the upstream channels will need some explicit handling?

mccraigmccraig commented 6 years ago

ha, this looks very familiar after reading your code @dm3 :D https://github.com/ztellman/manifold/blob/master/src/manifold/stream.clj#L644

dm3 commented 6 years ago

neither of our approaches involves any connection between the upstream sources and the downstream output though - so presumably errors on the upstream channels will need some explicit handling?

not sure what you mean by that? If one of srcs gets closed - it will not continue to get joined into the dst. If you have some transformations beforehand, the errors will have to be handled separately.

ha, this looks very familiar after reading your code @dm3 :D

Yeah, this pattern is very common when doing stream combinators.

mccraigmccraig commented 6 years ago

i mean that if there is an error take!ing from a src then that error doesn't get propagated to the dst - or am i missing something ?

dm3 commented 6 years ago

Hmm, I didn't think take! ever throws - when does that happen?

mccraigmccraig commented 6 years ago

hmm. i think i am misunderstanding something about how errors propagate across streams...

i was expecting

(def s (->> [(d/error-deferred (ex-info "boo" {}))] (st/->source) (st/realize-each)))
@(st/take! s ::drained)

to propagate the error, but it doesn't - the stream just gets closed and ::drained returned

mccraigmccraig commented 6 years ago

ok, so does that mean there is no way of telling whether an upstream stream was closed because of an error ? i would ideally like errors on upstream streams to be able to propagate errors downstream, but i can't see how to do that now

dm3 commented 6 years ago

No, there's not currently. You have to add your own error propagation logic of some sorts. See #95 for the errors issue.

mccraigmccraig commented 6 years ago

cool - thanks for your help @dm3