leonoel / missionary

A functional effect and streaming system for Clojure/Script
Eclipse Public License 2.0
630 stars 26 forks source link

Concurrent forking within Preemptive forking not being cancelled #38

Open mjmeintjes opened 2 years ago

mjmeintjes commented 2 years ago

Hope I can explain this properly:

  (require '[missionary.core :as ms])
  (def cancel
    (do
      (when (bound? (resolve 'cancel))
        (cancel))
      (def mbx (ms/mbx))
      (let [mbx-flow (ms/ap
                      (->> (repeat mbx)
                           ms/seed
                           ms/?>
                           ms/?))
            flow (ms/ap
                  (let [next (ms/?< mbx-flow)]
                    (try
                      (let [task-id (ms/?= (ms/seed next))]
                        (loop []
                          (println "RUNNING TASK" (ms/? (ms/sleep 1000 task-id)))
                          (recur)))
                      (catch Exception ex
                        (ms/?> ms/none)))))
            task (->> (ms/reduce (constantly nil) flow))]
        (task println println))))

  (cancel)
  (mbx [1 2 3])  ;; [1]
  (mbx [4 5 6])  ;; [2]

Running [1] starts printing as expecting:

RUNNING TASK 1
RUNNING TASK 2
RUNNING TASK 3

I would expect that running [2] would stop printing 1,2,3 and instead start printing only 4,5,6. However, currently it continues printing 1,2,3 in addition to 4,5,6.

E.g. running [2] after running [1] causes following output:

RUNNING TASK 5
RUNNING TASK 4
RUNNING TASK 6
RUNNING TASK 3
RUNNING TASK 1
RUNNING TASK 2

Calling (cancel) cancels everything as expected.

If I replace ?= with ?>, then it works as expected (of course running only the first of each mbx value).

Is this expected behavior or a bug?

mjmeintjes commented 2 years ago

It seems to work properly if I wrap the inner flow in a task:

  (require '[missionary.core :as ms])
  (def cancel
    (do
      (when (bound? (resolve 'cancel))
        (cancel))
      (def mbx (ms/mbx))
      (let [sub-flow (fn [next]
                       (ms/reduce
                        (constantly nil)
                        (ms/ap
                         (let [task-id (ms/?= (ms/seed next))]
                           (loop []
                             (println "RUNNING TASK" (ms/? (ms/sleep 1000 task-id)))
                             (recur))))))
            mbx-flow (ms/ap
                      (->> (repeat mbx)
                           ms/seed
                           ms/?>
                           ms/?))
            flow (ms/ap
                  (let [next (ms/?< mbx-flow)]
                    (try
                      (ms/? (sub-flow next))
                      (catch Exception ex
                        (ms/?> ms/none)))))
            task (->> (ms/reduce (constantly nil) flow))]
        (task println println))))

  (cancel)

  (mbx [1 2 3])  ;; [1]
  (mbx [4 5 6])  ;; [2]
leonoel commented 2 years ago

I would expect that running [2] would stop printing 1,2,3 and instead start printing only 4,5,6. However, currently it continues printing 1,2,3 in addition to 4,5,6.

You're right, switching should cancel all child branches, including concurrent ones.