leonoel / missionary

A functional effect and streaming system for Clojure/Script
Eclipse Public License 2.0
630 stars 26 forks source link

Equivalent of rxjs expand operator? #15

Closed tim-smart closed 4 years ago

tim-smart commented 4 years ago

https://rxjs-dev.firebaseapp.com/api/operators/expand

Is there an equivalent way of doing this in missionary right now?

leonoel commented 4 years ago

Recursive patterns are implemented with ap and clojure's loop/recur. The trick is to fork two times in the loop, the first time for each value of the flow, the second time to discriminate between value emission and recursion over derived flow.

The example from Rx documentation could look like this :

(defn expand [f flow]
  (m/ap
    (loop [flow flow]
      (let [x (m/?= flow)
            [k v] (m/?? (m/enumerate {:value x :recur (f x)}))]
        (case k
          :value v
          :recur (recur v))))))

(defn double-delay [x]
  (m/ap (m/? (m/sleep 1000 (* 2 x)))))

(m/? (->> (m/enumerate [1 2 3])
          (expand double-delay)
          (m/transform (take 10))
          (m/aggregate (fn [_ x] (prn x)) nil)))

1
2
3
2
6
4
4
12
8
8

I'm planning to write a piece of documentation about this pattern.

leonoel commented 4 years ago

This pattern is now explained here