leonoel / missionary

A functional effect and streaming system for Clojure/Script
Eclipse Public License 2.0
630 stars 26 forks source link

Potential bugs w reactor in cljs #21

Closed lgrapenthin closed 3 years ago

lgrapenthin commented 3 years ago

While playing around with reactor I stumbled across a couple bugs (on CLJS)

;; Setup
(def my-interval
    (m/observe (fn [evt]
                 (println "Starting interval")
                 (let [ctr (let [ct (atom 0)]
                             #(swap! ct inc))
                       id
                       (js/setInterval (fn [t]
                                         (try
                                           (evt (ctr))
                                           (catch :default e
                                             (println "Can't put!")
                                             (.log js/console e))))
                                       3000)]
                   (fn []
                     (println "Cancelling interval")
                     (js/clearInterval id))))))

1

(def t
    (a/run-task
     (m/reactor
      (let [i (m/signal! my-interval)]
        (m/stream!
         (m/ap (println "pr1" (m/?? i))
               (println "pr2" (m/?? i))))

        ))))

(t)

Starting interval pr1 1 pr2 1 pr2 1 pr2 2 pr2 3 pr2 4 Cancelling interval pr1 4

Expected: "pr2 1" is printed only once.

2


(def t
    (a/run-task
     (m/reactor
      (let [i (m/signal! my-interval)
            i2 (m/signal! my-interval)]
        (m/stream!
         (m/ap (println "pr1" (m/?? i))
               ))

        ))))
(t)

Printout

Starting interval Starting interval pr1 1 pr1 2 Can't put! pr1 3 Can't put! pr1 4 Can't put! Cancelling interval Cancelling interval success #object[missionary.impl.Pub]

Expected: signal i2 wouldn't backpressure. N. b. core.asyncs mult doesn't backpressure when there are no takers - but independently I would expect i2 only to backpressure my-interval if it was a m/stream!

3

4


(def t
    (a/run-task
     (m/reactor
      (let [i (m/stream! my-interval)
            i2 (m/stream! my-interval)]
        (m/stream!
         (m/ap (println (m/?? (m/zip vector i i2)))
               ))

        ))))

Starting interval Starting interval Cancelling interval Cancelling interval error #object[Error Error: No such element.]

leonoel commented 3 years ago

Could you provide the implementation of run-task as well ?

lgrapenthin commented 3 years ago

Its just #(% (partial println "success") (partial println "error"))

On 12/7/20, Léo NOEL notifications@github.com wrote:

Could you provide the implementation of run-task as well ?

-- You are receiving this because you authored the thread. Reply to this email directly or view it on GitHub: https://github.com/leonoel/missionary/issues/21#issuecomment-739829303

leonoel commented 3 years ago

1 and 4 are certainly bugs, I'll investigate that. 2 is intended behavior. signal! propagates lazy sampling semantics to its subscribers, so a signal! with no subscriber won't consume anything at all. Assuming my-interval represents the discrete stream of successive states of an identity, you can turn it into a continuous behavior using (relieve {} my-interval), effectively disabling backpressure upstream and exposing only the latest value downstream.