leonoel / missionary

A functional effect and streaming system for Clojure/Script
Eclipse Public License 2.0
630 stars 26 forks source link

Question: Best way to limit concurrency of `?=`? #14

Closed tim-smart closed 4 years ago

tim-smart commented 4 years ago

Hi, I'm still quite new to clojure - so be nice lol.

What is the best way to limit concurrency with the ?= forking function? I'm currently using core.async channels to do this, but it feels hackish.

(defn mergemap
  [func concurrency flow]
  (let [c (async/chan concurrency)]
    (m/ap
     (let [value (m/?= flow)]
       (async/>!! c true)
       (let [r (m/? (func value))]
         (async/<!! c)
         r)))))

;; Creates a flow that multiplies the numbers by 10 after 500ms.
;; Max concurrency of 5
(mergemap #(m/sleep 500 (* % 10)) 5 (m/enumerate (range 10000)))

Any suggestions?

leonoel commented 4 years ago

Your implementation uses a core.async channel to emulate a semaphore. missionary has an asynchronous semaphore implementation sem and a macro holding to run a block of asynchronous code against a semaphore instance. mergemap would look like this :

(defn mergemap [f n flow]
  (let [s (m/sem n)]
    (m/ap (let [x (m/?= flow)]
            (m/holding s (m/? (f x)))))))

Note : missionary expects all code to be non-blocking. If you need to interop with a blocking operation such as async/>!! or async/<!!, you must bind it to a dedicated thread pool, e.g (m/via m/blk (blocking-op))

tim-smart commented 4 years ago

Thanks for the explanation!

leonoel commented 2 years ago

This solution is flawed, cf #58