clj-commons / manifold

A compatibility layer for event-driven abstractions
1.02k stars 108 forks source link

Communicating backpressure when using consume-async #63

Closed lvh closed 8 years ago

lvh commented 8 years ago

I have a paginated REST API, and I'm trying to shovel all of its data into a stream. Every request to the API contains n maps of data (that go into a stream), and, maybe, a link to the next page. I have something like this:

(defn scans!
  []
  (let [urls-stream (ms/stream 100) ;; absolutely no science here
        scans-stream (ms/stream 1000) ;; nor here
        shovel (fn [url]
                 (md/chain
                  (get-page! url)
                  (fn [{:keys [pagination scans]}]
                    (ms/put! urls-stream (:next pagination))
                    (ms/put-all! scans-stream scans))))]
    (ms/consume-async shovel urls-stream)
    reports-stream))

This somewhat communicates backpressure in that as soon as scans-stream is full or closed, put-all! will return false, and consume-async will interpret that as time to abort. What I really wanted (I think) is a deferred that fires with true once everything has successfully been put on scans-stream. That would stop consume-async from consuming another URL temporarily, which is the backpressure I was looking for.

I think that means I want something like:

(defn carefully-put!
  "Try putting msg on sink, waiting at most timeout msec. If it fails, try
  again indefinitely. Returns a deferred that fires true once successful.
  "
  [sink msg timeout]
  (md/loop [success? false]
    (if success?
      true
      (md/recur (ms/try-put! sink msg timeout)))))

(defn carefully-put-all!
  "Attempts to put msgs on sink, in order. Returns a deferred that
  fires with true when all msgs have successfully been put onto sink."
  [sink msgs timeout]
  (md/loop [[x & xs] (seq msgs)]
    (carefully-put! sink x timeout)
    (if (seq xs)
      (md/recur xs)
      true)))

(WARNING: I have not ran this, there might be stupid bugs or typos.)

Presumably this needs to be a bit more clever. For example, if the sink is closed, or becomes closed in the mean while, it should stop trying and just return false.

Thoughts?

lvh commented 8 years ago

Turns out I don't know how md/recur works: specifically, you can't give it a deferred and expect it to reify it for you. Not sure if that's intentional or a useful feature I should report in a different issue :)

ztellman commented 8 years ago

Oh, hmm. I can definitely see how it might be useful for recur to unwrap the deferred value. I'll think about it. As to the rest, carefully-put! would look something like:

defn carefully-put!
  "Try putting msg on sink, waiting at most timeout msec. If it fails, try
  again indefinitely. Returns a deferred that fires true once successful.
  "
  [sink msg timeout]
  (md/loop []
    (d/chain (ms/try-put! sink msg timeout ::timeout)
      (fn [result]
        (if (identical? ::timeout result)
          (md/recur)
          result)))))

This is also untested.

lvh commented 8 years ago

Here's the code I ended up with:

(defn carefully-put!
  "Try putting msg on sink, waiting at most timeout msec each time.
  If it times out, try again indefinitely. Returns a deferred that fires
  true once successful, or false if the put can't succeed (e.g. ).
  "
  [sink msg timeout]
  (md/loop [status ::try]
    (case status
      ::try (do (info "trying to put msg onto sink")
                (md/chain (ms/try-put! sink msg timeout ::try)
                          md/recur))
      (true false) status)))

(defn carefully-put-all!
  "Attempts to put msgs on sink, in order. Returns a deferred that
  fires with true when all msgs have successfully been put onto sink."
  [sink msgs timeout]
  (md/loop [remaining (seq msgs)]
    (let [[x & xs] remaining]
      (md/chain (carefully-put! sink x timeout)
                (fn [result]
                  (case result
                    false false
                    true (if (seq xs)
                           (md/recur xs)
                           true)))))))

... which does look pretty similar. If I end up testing this properly, would you like me to submit it back in a PR?

ztellman commented 8 years ago

Okay, sorry, I may not have read that carefully enough the first time around. put-all! will return false if any put fails, and true if all the puts succeed. If the stream is full midway through the puts, it will just wait. I'm not really sure what this is accomplishing that put-all! doesn't already. What am I missing?

lvh commented 8 years ago

If the stream is full midway through the puts, it will return false and therefore consume-async will give up and stop consuming, right?

ztellman commented 8 years ago

If the stream is closed midway through the puts, it will return false. If it's full, it will just pause.

lvh commented 8 years ago

Derp. I didn't realize that; I will submit a PR to amend the docs to clarify that point :)

lvh commented 8 years ago

I interpreted the non-blocking property as "will try to put everything synchronously right now"; I realize now that all it means is "this returns a deferred and will never do anything synchronously"