leonoel / missionary

A functional effect and streaming system for Clojure/Script
Eclipse Public License 2.0
630 stars 26 forks source link

parallel processing #58

Closed leonoel closed 1 year ago

leonoel commented 2 years ago

Goal

Find the right pattern to parallelize processing on some part of a flow pipeline.

Solution 1 : emulate channels

Using rdv and dfv, we can build a channel-like primitive and use ap + ?= to run a single producer concurrently with an arbitrary amount of consumers. The producer feeds values in the channel, close it on termination and emits nothing. Each consumer reads values from the channel, passes them through a user-provided pipeline and emits resulting values.

cf POC

Problem : the pattern may be too complex to be implemented manually in user space.

Solution 2 : ?= + sem

via Panel on slack

(let [sem (m/sem 2)]
  (m/ap
   (let [batch (m/?= (->> (fetch-ids)
                          (m/eduction (partition-all 20))))]
     (m/holding sem (->> (fetch-projects batch)
                         (m/reduce conj [])
                         m/?)))))

Problem : while the semaphore effectively ensures no more than 2 fetch-projects instances run concurrently, the memory footprint is unbounded because ?= pulls input and spawns new branches as soon as possible. If fetch-ids is infinite and able to produce batches faster than the maximal processing thoughput (in this case, 2 divided by the average delay of fetch-projects), then the semaphore queue grows steadily.

Solution 3 : fix ?=

Add an optional argument to ?= to specify an upper bound on the number of concurrent branches. When this number is reached, values stop being pulled from input and resume again when a branch terminates.

(m/ap
  (let [batch (m/?= 2 (->> (fetch-ids)
                        (m/eduction (partition-all 20))))]
    (->> (fetch-projects batch)
      (m/reduce conj [])
      m/?)))

Prior art : ReactiveX - Flowable/flatMap

leonoel commented 2 years ago

I see zero downside to solution 3 so far, it seems to be the way. Moreover, assuming a mechanism to limit concurrency, ?> and ?= actually unify because they're special cases for bounds 1 and ##Inf respectively.

In fact it's unclear to me if there's any valid use case for unbounded concurrent forking. If the input size has a known upper bound then you can specify it, otherwise you don't want to pull values indefinitely so you have to choose a bound anyway.

leonoel commented 2 years ago

So here's a plan :

The bad news is, we have a naming problem again. ReactiveX tricked me into thinking there was 3 forking operators, and I thought it was a good idea to steal their naming scheme - cf #44 . It turns out there's only 2, one giving priority to the parent and the other giving priority to the children.

leonoel commented 1 year ago

Released in b.27