clj-commons / manifold

A compatibility layer for event-driven abstractions
1.02k stars 108 forks source link

Splicing a channel into another channel, one result at a time #70

Closed lvh closed 8 years ago

lvh commented 8 years ago

I have a piece of software that I'm porting from core.async to manifold. It has a simple interpreter that interprets "plans", which consist of:

This is implemented with a simple dispatch multimethod:

(defmulti execute
  (fn [x] (cond (set? x) ::unordered-plans
                (vector? x) ::ordered-plans
                :else (:type x))))

executing a step returns a chan (soon: stream) of results. Usually there's only 1 result on that stream, but e.g. an HTTP request for a large file might send progress reports every 10%.

The first implementation in core.async was:

(defmethod execute ::unordered-plans
  [plans]
  (async/merge (map execute plans)))

which was easy to port to:

(defmethod execute ::unordered-plans
  [plans]
  (ms/concat (ms/map execute plans)))

The second part was trickier. It was originally spelled:

(defmethod execute ::ordered-plans
  [plans]
  (let [out (chan)]
    (go (loop [plans plans]
          (let [results (<! (async/into [] (execute (first plans))))]
            (<! (async/onto-chan out results false)))
          (if (seq (rest plans))
            (recur (rest plans))
            (async/close! out))))
    out))

My first thought was to write:

(defmethod execute ::ordered-plans
  [plans]
  (let [out (ms/stream)]
    (ms/connect-via (ms/->source plans)
                    (fn [plan]
                      (ms/connect (execute plan) out))
                    out)
    out))

... but connect doesn't tell me when (execute plan) is done, so that doesn't work. Instead:

(defn impl-b
  [plans]
  (let [out (ms/stream)]
    (ms/connect-via (ms/->source plans)
                    (fn [plan]
                      ;; stream->seq's seq will block :(
                      (ms/put-all! out (ms/stream->seq (->source (execute plan)))))
                    out)
    out))

... which works except for the seemingly unnecessary seq with blocking backpressure mechanics. I think I can just write it with deferred tricks:

(defn impl-c
  [plans]
  (let [out (ms/stream)]
    (ms/connect-via (ms/->source plans)
                    (fn [plan]
                      (let [s (execute plan)]
                        (md/loop [v (ms/take! s)]
                          (when (some? v)
                            (ms/put! out v)
                            (recur (ms/take! s)))
                          (ms/close! s))))
                    out)
    out))

... but that seems more involved than it needs to be, since it's spelling most of put-all!. Am I missing something, or is this how you write it? I guess what I want is put-all! for streams instead of seqs?

Is this a pattern you've noticed before? On a related note, (let [...] (connect(-via) in f out) out) seems to be a recurring pattern; do you have any opinions on the return values of connect(-via)?

ztellman commented 8 years ago

I think the analogue for core.async's into is (manifold.stream/reduce conj []), which I think will make this a lot simpler. If I'm missing something about what you're trying to do, let me know, but hopefully that's enough of a push in the right direction.

lvh commented 8 years ago

Ah; right. That's more of an eyesore from the core.async thing than really desired behavior. I was hoping to forward elements to the "out" stream as they came in; hence the connect-via. Contrast that with the async/into or reduce conj [], which will first collect all the results, and then send them out.

According to the tests, the ::ordered-plans thing works fine. That surprised me, because the deferred in that inner loop evaluates to nil (from close!, which appears to generally return nil), and according to connect-via that "assumes the downstream is closed" -- I'm not sure what that means, but I guess it doesn't close downstream by itself; so that's fine, I just want the connection to be severed.

I ended up writing the following function separately:

(defn drain-to
  "Drains stream in into out; returning a deferred when done."
  [in out]
  (md/loop [d (ms/take! in)]
    (md/chain'
     d
     (fn [v]
       (when v
         (ms/put! out v)))
     (fn [success?]
       (when success?
         (md/recur (ms/take! in)))))))

... which I guess is what I was asking for originally. Is that something better spelled some other way? If not, something useful to add to manifold? That last callback fn might need to differentiate between nil and false, or at least be smarter about failed takes vs nil values; but at least it should communicate what I'm trying to achieve :)

Surprisingly, the tests seem to be failing because of the (ms/concat (ms/map execute plans)) bit -- if I'm reading the test failure correctly; that doesn't have all of the results you should get from executing all the plans. I'll have to take a look at that tomorrow or later today :)

lvh commented 8 years ago

Not sure if this should be a separate ticket, but I think I've found a minimal reproducing case:

(->> (range 10)
     (ms/map (fn [i] (ms/->source [i]))) ;; => stream of streams (range n)
     (ms/concat) ;; stream of (range n)
     (ms/stream->seq)) ;; I was expecting (range n); is (butlast (range n))

Am I wrong in what I was expecting that to do?

ztellman commented 8 years ago

That looks to be a bug in connect-via-proxy, which I'm currently trying to figure out a good fix for. Thanks for the report.

ztellman commented 8 years ago

I've also added a drain-into function, implemented here: https://github.com/ztellman/manifold/blob/master/src/manifold/stream.clj#L513

lvh commented 8 years ago

Awesome! Thanks :D

lvh commented 8 years ago

FYI; this fix worked for icecap, and icecap master is going to run on manifold about 30s from now.