leonoel / missionary

A functional effect and streaming system for Clojure/Script
Eclipse Public License 2.0
630 stars 26 forks source link

execution model #30

Closed xificurC closed 3 years ago

xificurC commented 3 years ago

Opening a new issue to continue the discussion in #23 which was becoming off-topic for that issue. The initial question was

how does ?= run code concurrently without using a threadpool?

The answer was

Parking happens when the computation requires a value that is not immediately available. It doesn't use a threadpool, the code is run synchronously by the thread responsible for unparking.

This "value that is not immediately available" point is what alludes me. Consider a simple example

% clj -Sdeps '{:deps {missionary {:mvn/version "b.17"}}}'
Clojure 1.10.1
user=> (require '[missionary.core :as m])
nil
user=> (defn fetch [n] (Thread/sleep 100) (* 2 n))
#'user/fetch
user=> (time (m/? (m/aggregate conj (m/ap (let [n (m/?= (m/enumerate (range 10)))] (fetch n))))))
"Elapsed time: 1004.458757 msecs"
[0 2 4 6 8 10 12 14 16 18]

Consider fetch an IO operation that is a black box, i.e. outside of our reach to reimplement. Can the last example be rewritten to happen concurrently? The only solution I could come up with is


user=> (time (m/? (m/aggregate conj (m/ap (let [n (m/?= (m/enumerate (range 10)))] (m/? (m/via m/blk (fetch n))))))))
"Elapsed time: 102.950868 msecs"
[0 2 4 6 8 10 12 14 16 18]
leonoel commented 3 years ago

Yes, that's how it's meant to be done. missionary assumes no threads, and via is provided to interoperate with the blocking world.

xificurC commented 3 years ago

OK, thanks