leonoel / missionary

A functional effect and streaming system for Clojure/Script
Eclipse Public License 2.0
636 stars 26 forks source link

observe must be guarded against subject/cleanup throwing #4

Closed lgrapenthin closed 5 years ago

lgrapenthin commented 5 years ago

In the following experiment,

(defn printer [t]
  (t (fn [res] (println "res" res))
     (fn [err] (println "err" err))))

(def inst
  (printer (->> (m/observe
                 (fn [event-fn]
                   (let [producer
                         (Thread.
                          (fn []
                            (dotimes [n 6]
                              (println "Producing" n)
                              (Thread/sleep 1000)
                              (event-fn n))))]
                     (.start producer)
                     (fn []
                       (println "cancelled")
                       (.stop producer)))))
                (m/transform (take 3))
                (m/aggregate conj))))
leonoel commented 5 years ago

The problem is the unsubscribe function throws, and observe doesn't expect this. Replace (.stop producer) with (.interrupt producer) and it should do what you expect (this the recommended way to kill a thread, Thread.stop is deprecated).

Intended or not : good question. The current behavior is obviously confusing, but at the time the unsubscribe function is called the process is already terminated (successfully). At first glance I'd say the whole process should fail, because proper cleanup of resources is part of the job.

leonoel commented 5 years ago

Post-hammock thoughts : no ambiguity here. Both subject and cleanup functions must be guarded against throwing, because they're user-provided. In both cases, observe must propagate error and terminate. In this example, cancellation is triggered by the early termination of transform so the error will be swallowed and this is intended behavior.

lgrapenthin commented 5 years ago

Interesting, I now understand that the problem is cancelled because a Thread Death is thrown by unsubscribe.

Wouldn't this example still have to print "err" ... Otherwise, how would I as a user ever understand that my unsubscribe function throwing is the reason?

leonoel commented 5 years ago

I acknowledge that swallowed errors is a source of frustration for users. The purpose of observe is to provide an easy way to interop with classic observer pattern where you register a callback on a event emitter, without backpressure concerns. The cleanup function must be provided to deregister the callback (forgetting it is a common source of memory leaks), and it is expected to do it synchronously in a non-blocking way. In practice, both subject and cleanup functions should be simple wrappers for a third-party observer framework. The example you provided is arguably contrived from this point of view.

Now if you need to be sure your subject/cleanup functions are correct, you can always test it in isolation. Another possibility could be to catch errors in an ap block :

(defn show-error [flow]
  (m/ap (try (m/?? flow)
             (catch Throwable e
                (prn e)
                (throw e)))))

Then you could insert this just before transform in your pipeline and the cleanup error would be guaranteed to be printed.

I considered two alternatives :