leonoel / missionary

A functional effect and streaming system for Clojure/Script
Eclipse Public License 2.0
620 stars 25 forks source link

behavior of `ap` with huge parallelism #108

Open leonoel opened 3 months ago

leonoel commented 3 months ago

Duplicating comment from #35 as it's likely a separate issue

(let [begin (System/currentTimeMillis)
        ;; create a flow of values generated by asynchronous tasks
        inputs (repeat 100000 (m/via m/cpu "hi")) ;; a task has no identity, it can be reused
        values (m/ap
                (let [flow (m/seed inputs)     ;; create a flow of tasks to execute
                      task (m/?> ##Inf flow)]  ;; from here, fork on every task in **parallel**
                  (m/? task)))                 ;; get a forked task value when available

        ;; drain the flow of values and count them
        n (m/? ;; tasks are executed, and flow is consume here!
           (m/reduce (fn [acc v]
                       (assert (= "hi" v))
                       (inc acc))
                     0 values))]
    (println "Read" n "msgs in" (- (System/currentTimeMillis) begin) "ms"))

It's from the Tasks & Flows Walkthrough. But with 100K inputs instead of 1K.