leonoel / missionary

A functional effect and streaming system for Clojure/Script
Eclipse Public License 2.0
630 stars 26 forks source link

dealing with blocking iterators #23

Open xificurC opened 3 years ago

xificurC commented 3 years ago

This is highly specific to my codebase so feel free to decline any help.

I have a pipeline implementation that weaves a bunch of java.util.Iterators together. It's just a bunch of functions composed together, where each one takes the previous Iterator + some options and returns a new Iterator. So something like

(-> (read-from-db nil {:connection "..."}) (transform {:fn some-fn}) (buffer {:size 32}) (write-to-db {:connection "..."}))

is a composition of a bunch of functions that each return a new Iterator. To run it you need to walk (drain) the final Iterator.

This is purely sequential, so there's a lot of stalling happening, i.e. the next read is waiting for the previous write to finish. I'd like to inject a prefetch function into the pipeline, just before the write-to-db call so that the next value in the pipeline is eagerly pulled from upstream.

Here's an implementation of that strategy, previously named fork, without missionary:

(defn fork [in opts]
  (let [n (:n opts (.availableProcessors (Runtime/getRuntime)))
        tq (u/q n), q (u/q n), stop (volatile! false)
        _th (u/thread nil "pg-fork-coordinator"
              (try
                (loop []
                  (when-not @stop
                    (u/take tq)
                    (if (it/next? in)
                      (do (u/put q [:ok (it/next in)]) (recur))
                      (vreset! stop true))))
                (catch Throwable e (u/put q [:ex e]))))
        ini (volatile! nil)]
    (reify Iterator
      (hasNext [_]
        (try (when (nil? @ini) (vreset! ini true) (dotimes [_ n] (u/put tq ::go)))
             ;; TODO possible race while not stopped and q is empty but tq is not and we're waiting for a value
             (boolean (or (not @stop) (u/has? q)))
             (catch Throwable e (vreset! stop true) (throw e))))
      (next [_] (try (let [[st vl] (u/take q)] (if (= :ok st) (do (u/put tq ::go) vl) (throw vl)))
                     (catch Throwable e (vreset! stop true) (throw e)))))))

The other functions in that defition are for the alias it:

(defn next [it] (.next ^Iterator it))
(defn next? [it] (.hasNext ^Iterator it))

and from u:

(defn q ([] (LinkedTransferQueue.)) ([n] (LinkedBlockingQueue. (int n))))
(defn put   [q v] (if (instance? TransferQueue q) (.transfer ^TransferQueue q v) (.put ^BlockingQueue q v)) v)
(defn take  [q]   (.take ^BlockingQueue q))
(defn has? [q] (not (.isEmpty ^java.util.Collection q)))
(defmacro with   {:style/indent 1} [[s v] & body] `(let [rt# ~v ~s rt#] ~@body rt#))
(defmacro thread {:style/indent 2} [^ThreadGroup tg nm & code]
  `(with [t# (Thread. (or ~tg (.getThreadGroup (Thread/currentThread))) (fn [] ~@code) ~nm)] (.start t#)))

It basically forks a thread and uses 2 queues, 1 "token queue" that unblocks the loop to prefetch another value and 1 result queue where the computed values get put. There's also 2 volatiles to coordinate the initialization and stopping. As it stands it's pretty tricky code that took some time to get right.

I wonder if a missionary solution would look cleaner? Do you have an idea of how you would go about implementing the function? I haven't written any missionary code yet so I'm having a hard time deciding which primitives would be useful. I guess a similar solution could be built with a mailbox and calculating in blk?

xificurC commented 3 years ago

I had to fix some edge cases, the current version:

(defn fork [in opts]
  (let [n (opts :n 1), f (opts :fn identity), tq (u/q n), q (u/q n), cl! #(run! future-cancel q)
        _th (u/thread nil "pg-fork-coordinator"
              (try (loop []
                     (u/take tq)
                     (if (it/next? in)
                       (let [v (it/next in)] (u/put q [:ok (future (f v))]) (recur))
                       (u/put q [:no])))
                   (catch Throwable e (cl!) (u/put q [:ex e]))))
        ini (volatile! nil), cur (volatile! :none)]
    (reify Iterator
      (hasNext [_]
        ;; (print (if (future? @cur) @@cur "nil")) (print " ") (pr (vec tq)) (print " ") (prn (mapv first q))
        (when (nil? @ini) (vreset! ini true) (dotimes [_ n] (u/put tq :go)))
        (if (nil? @cur)
          false
          (let [[st vl] (u/take q)]
            (case st
              :ok (do (u/put tq :go) (vreset! cur vl) true)
              :no (do (vreset! cur nil) false)
              :ex (throw vl)))))
      (next [_] (let [v @cur] (if (nil? v) (it/bad!) (try @v (catch Throwable e (cl!) (throw e)))))))))

I need to spawn a separate thread to do the processing (that's the whole point), which brings me to sending messages between the threads, erlang-style. I don't see how to get rid of that though. If I could swap the futures out for tasks that would be dependent on one another I could get rid of the cl! function, don't know how to model that though. Apart from that little piece I have no clue how to rewrite the rest into missionary.

leonoel commented 3 years ago

If you need to interop with a blocking java.util.Iterator you can define a flow consuming its values, like so :

(defn iterator-consumer [^java.util.Iterator iterator]
  (m/ap
    (while (m/?? (m/enumerate (when (m/? (m/via m/blk (.hasNext iterator))) [false true]))))
    (.next iterator)))

Then you can use missionary's operators to build your pipeline, each stage defines its own logical process so prefetching becomes a non-issue.

Turning a flow into a blocking iterator is a bit trickier, here's a possible implementation :

(deftype FlowIterator [^:unsynchronized-mutable iterator
                       ^:unsynchronized-mutable pending?]
  clojure.lang.IFn
  (invoke [this]
    (set! iterator
          (iterator (partial this false)
                    (partial this true)))
    this)
  (invoke [this done?]
    (locking this
      (set! pending? false)
      (when done? (set! iterator nil))
      (.notify this)))
  java.util.Iterator
  (hasNext [this]
    (locking this
      (while pending?
        (try (.wait this)
             (catch InterruptedException _
               (iterator))))
      (some? iterator)))
  (next [this]
    (locking this
      (set! pending? true)
      @iterator)))

(defn flow->iterable [flow]
  (reify Iterable
    (iterator [_]
      ((->FlowIterator flow true)))))
xificurC commented 3 years ago

It took me a day to understand that while loop :) This forking with ?? takes a while to internalize. I have no clue what's happening in the second snippet since I don't understand what (iterator (partial this false) (partial this true)) and @iterator do. If I'm reading this correctly iterator is bound to a flow, so I would need to look up their IFn and IDeref implementations respectively.

each stage defines its own logical process so prefetching becomes a non-issue.

What do you mean by that? Right now with the fork function I posted above I have complete control over how many pages reside in memory, each fork adds max 1 page. Since some of my jobs are handling large amounts of data in bulk I need to really understand the upper bound of each pipeline memory-wise. Since missionary doesn't give out anything willy-nilly to some thread pool I understand I can retain that same level of control, but not sure how would I go about implementing it. I wanted to try and define the function with missionary, which as you noted means reading from a possibly blocking java Iterator and returning one in the end.

Thank you for your reply! I am closing this since this isn't an issue per se.

leonoel commented 3 years ago

It took me a day to understand that while loop :) This forking with ?? takes a while to internalize.

TBH I'm not sure it's the best way to write that. It could be a case where an amb operator would help, e.g :

(defmacro amb [& forms]
  `(case (m/?? (m/enumerate (range ~(count forms))))
     ~@(interleave (range) forms)))

(defn iterator-consumer [^java.util.Iterator iterator]
  (m/ap
    (loop []
      (if (m/? (m/via m/blk (.hasNext iterator)))
        (amb (.next iterator) (recur))
        (amb)))))

I have no clue what's happening in the second snippet since I don't understand what (iterator (partial this false) (partial this true)) and @iterator do. If I'm reading this correctly iterator is bound to a flow, so I would need to look up their IFn and IDeref implementations respectively.

That's fine, this code relies on the low-level details of the flow protocol, you don't have to understand how it works unless you're writing a library. I'm still considering adding a flow->iterable operator to missionary.core, so if you find it useful feel free to elaborate.

What do you mean by that? Right now with the fork function I posted above I have complete control over how many pages reside in memory, each fork adds max 1 page. Since some of my jobs are handling large amounts of data in bulk I need to really understand the upper bound of each pipeline memory-wise. Since missionary doesn't give out anything willy-nilly to some thread pool I understand I can retain that same level of control, but not sure how would I go about implementing it. I wanted to try and define the function with missionary, which as you noted means reading from a possibly blocking java Iterator and returning one in the end.

With blocking iterators, each thread is associated to a pipeline stage, you can add more stages with fork, this will increase parallelism and also memory footprint because stage synchronization requires to buffer at least 1 item, as you said. With missionary, each flow transformation creates a new pipeline stage, stages are decoupled from threads but the parallelism/buffering tradeoff is pretty much the same.

xificurC commented 3 years ago

After understanding your example I would have probably come up with

(m/ap (loop []
        (when (m/? (m/via m/blk (.hasNext it)))
          (if (m/?? (m/enumerate [true false]))
            (.next it)
            (recur)))))

With that written I see a direct resemblance with unix forking and would have come up with:

(defmacro fork [] `(m/?? (m/enumerate [true false])))
(m/ap (loop []
        (when (m/? (m/via m/blk (.hasNext it)))
          (if (m/fork) (.next it) (recur)))))

Since lisp has macros the if can be nicely hidden, leading to:

(defmacro fork [a b] `(if (m/?? (m/enumerate [true false])) ~a ~b))
(m/ap (loop []
        (when (m/? (m/via m/blk (.hasNext it)))
          (m/fork (.next it) (recur)))))

At this point I can see amb as a generalization of the unix fork.

leonoel commented 3 years ago

At this point I can see amb as a generalization of the unix fork.

Right, and the other way round. BTW to fully emulate unix fork, ?? should be replaced by ?= to allow both branches to run concurrently.

xificurC commented 3 years ago

That's fine, this code relies on the low-level details of the flow protocol, you don't have to understand how it works unless you're writing a library. I'm still considering adding a flow->iterable operator to missionary.core, so if you find it useful feel free to elaborate.

I like to understand the mechanics of a library to get a feeling for the abstractions, the tradeoffs, the performance etc. For now missionary remains a black box. I managed to understand how cloroutine works, so I guess that's 1 step in the right direction :)

BTW to fully emulate unix fork, ?? should be replaced by ?= to allow both branches to run concurrently.

This also begs the question, how does ?= run code concurrently without using a threadpool? Is it parking the forked tasks at safepoints? What are those safepoints?

leonoel commented 3 years ago

I like to understand the mechanics of a library to get a feeling for the abstractions, the tradeoffs, the performance etc.

I suggest you take some time to understand task and flow. They're basically callback-based protocols wrapped in constructor functions, so you can easily see what happens at the REPL.

Is it parking the forked tasks at safepoints?

Parking happens when the computation requires a value that is not immediately available. It doesn't use a threadpool, the code is run synchronously by the thread responsible for unparking.

I'll keep this issue open until the blocking iterator case is settled, please open a new one if you want to discuss the execution model further.

leonoel commented 3 years ago

The object returned by this function could reasonably implement java.lang.Iterable, clojure.lang.IReduceInit and clojure.lang.Sequential, and a possible name could be educe to match clojure.core/eduction's semantics (lazy but not memoized).

The clojurescript version should be hardly useful because we can't block, but we can still provide a degraded version of the same function that would throw an exception when a result is not immediately available.

dustingetz commented 1 year ago
(ns dustingetz.scratch
  (:require [missionary.core :as m]
            [hyperfiddle.rcf :refer [tests]]))

(defn iterator-consumer "blocking iterable pattern"
  [^java.util.Iterator it]
  ; why not one thread tied to the iterator extent?
  ; (future (while (.hasNext it) (! (.next it))))
  (m/ap
    (loop []
      (if (m/? (m/via m/blk (.hasNext it)))
        (m/amb (m/? (m/via m/blk (.next it))) (recur))
        (m/amb)))))

(defn seq-consumer [xs]
  (m/ap
    (loop [xs xs]
      (if (m/? (m/via m/blk (seq xs)))
        (m/amb (m/? (m/via m/blk (first xs))) (recur (rest xs)))
        (m/amb)))))

(tests
  (def !it (.iterator (.keySet (java.lang.System/getProperties))))
  (->> (iterator-consumer !it)
       (m/eduction (take 3))
       (m/reduce conj []) m/?)
  := ["java.specification.version" "sun.jnu.encoding" "java.class.path"]

  ; careful, Java iterator is stateful

  (def xs (iterator-seq (.iterator (.keySet (java.lang.System/getProperties)))))
  (take 3 xs) := ["java.specification.version" "sun.jnu.encoding" "java.class.path"]

  (->> (seq-consumer xs)
       (m/eduction (take 3))
       (m/reduce conj []) m/?)
  := ["java.specification.version" "sun.jnu.encoding" "java.class.path"])