Closed leonoel closed 1 year ago
(ns dustin.y2022.missionary-switch
(:require [missionary.core :as m]
[hyperfiddle.rcf :as rcf :refer [tests ! %]])
(:import [missionary Cancelled]))
(hyperfiddle.rcf/enable!)
(tests
(def >x (m/ap
(let [>a (m/?> (m/seed (range 5)))]
(m/?> (m/seed (range >a))))))
(def cancel ((m/reduce conj >x) ! !))
% := [0
0 1
0 1 2
0 1 2 3])
(tests
(def >x (m/ap
(try
(let [>a (m/?< (m/seed (range 5)))]
(m/?< (m/seed (range >a))))
(catch Cancelled _))))
(def cancel ((m/reduce conj >x) ! !))
; in current design of switch, we can see the terminal emission during cancellation
% := [nil
nil
nil
0
1
2
3])
TLDR: two problems with current design:
Released in b.27
The current behavior of
?<
is to cancel the current subprocess when the input flow is ready to transfer, then wait for the subprocess to terminate, then transfer value from input flow and create a new subprocess. Consequently :amb
inap
but not incp
)Alternative design :
?<
spawns a new subprocess immediately when the input flow is ready to transfer. The previous subprocess is detached from the main process, it is cancelled and flushed (ie consumed as fast as possible, values are discarded and exceptions ignored). This implies the switch parent process must keep track of all of its concurrent stale subprocesses and wait for all of them to terminate before terminating itself.Prior art : unknown. Rx has switchmap, but this subtlety is irrelevant here due to lack of graceful shutdown - each process is considered terminated immediately after cancellation.