leonoel / missionary

A functional effect and streaming system for Clojure/Script
Eclipse Public License 2.0
630 stars 26 forks source link

Exception from reactor source flow breaks missionary (b.21) #39

Closed mjmeintjes closed 2 years ago

mjmeintjes commented 2 years ago

Running the following code breaks missionary in some way. For example, after I run the code at [1] it throws a null exception, and from then on (m/? (m/sleep 1000 :abc)) freezes indefinitely until I restart the REPL.

(defn test-flow-breaks []
  (m/ap
   (let [r (m/?> (m/seed [(m/sleep 1 :res)]))]
     (m/? r)
     (m/? (m/sp (throw (ex-info "TEST" {})))))))

(defn test-flow-works []
  (m/ap
   (let [r (m/?> (m/seed [(m/sp :res)]))]
     (m/? r)
     (m/? (m/sp (throw (ex-info "TEST" {})))))))

(comment

  ;; [1]
  (m/? (m/reactor
         (let [state (m/stream! (test-flow-breaks))]
           (m/stream! (m/ap (println (m/?> state)))))))

  (m/? (m/sleep 1000 :a)))

Running this throws an unhandled exception:

Exception in thread "missionary scheduler" java.lang.NullPointerException
    at missionary.impl.Reactor.signal(Reactor.java:93)
    at missionary.impl.Reactor.emit(Reactor.java:187)
    at missionary.impl.Reactor.leave(Reactor.java:210)
    at missionary.impl.Reactor$1.invoke(Reactor.java:401)
    at missionary.impl.Ambiguous$3.invoke(Ambiguous.java:115)
    at missionary.impl.Ambiguous$4.invoke(Ambiguous.java:128)
    at missionary.impl.Sequential.step(Sequential.java:52)
    at missionary.impl.Sequential$1.invoke(Sequential.java:32)
    at missionary.impl.Sleep$Scheduler.trigger(Sleep.java:61)
    at missionary.impl.Sleep$Scheduler.run(Sleep.java:75)

This causes further operations like (m/? (m/sleep 1000 :test)) to hang indefinitely.

leonoel commented 2 years ago

minimal repro : ((m/reactor (m/stream! (m/stream! (m/ap (assert (m/? (m/sleep 0))))))) prn prn)

leonoel commented 2 years ago

Fixed in b.22