leonoel / missionary

A functional effect and streaming system for Clojure/Script
Eclipse Public License 2.0
620 stars 25 forks source link

`observe` - on JVM, consider parking the caller thread instead of throwing when pipeline is busy #93

Closed leonoel closed 2 months ago

leonoel commented 2 months ago

implemented in b.38

leonoel commented 2 months ago

The main motivation for this change is to allow relieved pipelines to work with multiple writer threads. Previously, the following implementation of watch could fail during concurrent writes. As of now, the later thread will be blocked briefly while the earlier thread performs the transfer.

(def watch
  (let [w (fn [! _ _ _] (! nil))]
    (fn [!x]
      (->> (fn [!]
             (add-watch !x ! w)
             #(remove-watch !x !))
        (m/observe)
        (m/relieve)
        (m/reductions
          (fn ([] @!x)
            ([_ _] @!x)))))))