leonoel / missionary

A functional effect and streaming system for Clojure/Script
Eclipse Public License 2.0
630 stars 26 forks source link

topic-based flow dispatch #37

Closed leonoel closed 2 years ago

leonoel commented 3 years ago

Goal : split a flow across several branches.

Prior art :

mjmeintjes commented 3 years ago

This would be useful to have. Something like this:

;; Basic grouping
  (->> (m/seed [{:key 1 :val 11} {:key 2 :val 22} {:key 1 :val 33} {:key 3 :val 44}])
       (m/group-by-key
        :key
        (fn [key flow]
          (m/ap
           (println [key (m/?> flow)])))))

;; Allow grouping function to return a task
  (->> (m/seed [{:key 1 :val 11} {:key 2 :val 22} {:key 1 :val 33} {:key 3 :val 44}])
       (m/group-by
        (fn [{:keys [key]}]
          (m/sp key))
        (fn [key flow]
          (m/ap
           (println [key (m/?> flow)])))))       

A few use cases:

With my imaginary group-by:

  (ms/ap
   (->> (observe-event-flow s "update" (fn [_ _ [min max]] [min max]))
        (m/group-by
         nth
         (fn [k flow]
           (ms/ap
            (.currentTime player (ms/?> flow)))))))
leonoel commented 3 years ago

This pattern can be split in two separate parts, the fan-out and the fan-in. ap already covers the fan-in, therefore it should be enough to implement fan-out. The missing part is actually very close to clojure's group-by, if we treat the map as a sequence of entries then there is a natural translation to flows. Here is a possible implementation :

(defn group-by "Same as clojure's group-by, but takes a flow instead of a sequence as input,
and returns a flow of key-flow pairs instead of a map from keys to vectors."
  [kf >xs]
  (m/ap (let [x (m/?> (m/eduction
                        (fn [rf]
                          (let [topics (object-array 1)]
                            (fn
                              ([] (rf))
                              ([r]
                               (transduce (map #(% %)) rf r (vals (aget topics 0))))
                              ([r x]
                               (let [k->t (aget topics 0)
                                     k (kf x)]
                                 (if-some [t (get k->t k)]
                                   (rf r (t x))
                                   (let [t (m/rdv)]
                                     (aset topics 0 (assoc k->t k t))
                                     (rf r [k (m/eduction (take-while (complement #{t}))
                                                (m/ap (m/? (m/?> (m/seed (cons (m/sp x) (repeat t)))))))])))))))) >xs))]
          (if (vector? x) x (do (m/? x) (m/amb>))))))

(comment
  (def words ["Air" "Bud" "Cup" "Awake" "Break" "Chunk" "Ant" "Big" "Check"])
  (m/?
    (m/reduce conj
      (m/ap
        (let [[k >x] (m/?= (group-by (juxt first count) (m/seed words)))]
          (println "processing group" k)
          (let [word (m/?> >x)]
            (m/? (m/via m/cpu (str/upper-case word))))))))
  ;; processing group [A 3]
  ;; processing group [B 3]
  ;; processing group [C 3]
  ;; processing group [A 5]
  ;; processing group [B 5]
  ;; processing group [C 5]
  #_=> ["AIR" "BUD" "CUP" "AWAKE" "BREAK" "ANT" "BIG" "CHUNK" "CHECK"])

Regarding API design, I think this is an interesting simplification compared to ZIO :

leonoel commented 3 years ago

RX - groupBy has well defined semantics.

I like : inner branch cancellation can be used to prevent the group-by internal state to grow indefinitely.

I don't like :

Needs more thinking : only one run allowed for a given inner flow.

leonoel commented 2 years ago

@mjmeintjes could you elaborate on the UI slider component use case ? using nth as the key function of group-by seems wrong to me.

mjmeintjes commented 2 years ago

@mjmeintjes could you elaborate on the UI slider component use case ? using nth as the key function of group-by seems wrong to me.

The slider can have 1 or more handles, and it fires an update event each time any of the handles change, but just passes along all the current values with no indication of which one changed (eg in the case that there are 3 handles, I would get something like [3 8 10] to indicate the current value of each of the handles). So to figure out which handle actually changed I have to compare the current list of values with the previous list of values (eg [3 8 10], then [3 8 13], indicates that the third handle changes from 10 to 13).

The alternative, using group-by by nth, means I can have a flow for each handle's changes.

leonoel commented 2 years ago

Thanks, I understand better now. I think it's missing an intermediate stage to decompose the input vector into a succession of focused updates. Here is how I think group-by could be used to solve this problem.

  (def !slider (atom [3 8 10]))
  (def cancel
    ((m/ap
       (let [[i >x] (->> (m/watch !slider)
                      (m/eduction (mapcat (partial map-indexed vector)))
                      (m/group-by first)
                      (m/?=))]
         (println "handle" i "changed to"
           (m/?> (m/eduction (map second) (dedupe) >x)))
         (m/amb>))) #() #()))
  ;; handle 0 changed to 3
  ;; handle 1 changed to 8
  ;; handle 2 changed to 10
  (swap! !slider assoc 2 13)
  ;; handle 2 changed to 13
  (swap! !slider assoc 0 5)
  ;; handle 0 changed to 5
leonoel commented 2 years ago

As an aside, group-by is not needed if topics are known ahead of time.

(m/ap
  (let [i (m/?= (m/seed (range 3)))]
    (println "handle" i "changed to"
      (m/?> (m/eduction (map #(nth % i)) (dedupe) (m/watch !slider))))
    (m/amb>)))
leonoel commented 2 years ago

Implemented in b.23