Closed leonoel closed 4 months ago
Could it be the reason why this locks up for me?
(let [begin (System/currentTimeMillis)
;; create a flow of values generated by asynchronous tasks
inputs (repeat 100000 (m/via m/cpu "hi")) ;; a task has no identity, it can be reused
values (m/ap
(let [flow (m/seed inputs) ;; create a flow of tasks to execute
task (m/?> ##Inf flow)] ;; from here, fork on every task in **parallel**
(m/? task))) ;; get a forked task value when available
;; drain the flow of values and count them
n (m/? ;; tasks are executed, and flow is consume here!
(m/reduce (fn [acc v]
(assert (= "hi" v))
(inc acc))
0 values))]
(println "Read" n "msgs in" (- (System/currentTimeMillis) begin) "ms"))
It's from the Tasks & Flows Walkthrough. But with 100K inputs instead of 1K.
Race condition fixed in b.37
ap
issue moved to #108
In
Thunk.java
: if the run method terminates concurrently with a cancellation, interruption flag may be reset before the runner thread is interrupted by the canceller thread, resulting with the interruption flag set when the runner thread starts the next task.Not a problem with executors exposed by missionary because
j.u.c.ThreadPoolExecutor
resets the flag between successive tasks anyways, but could be problematic ifvia
is used with another executor implementation.