leonoel / missionary

A functional effect and streaming system for Clojure/Script
Eclipse Public License 2.0
620 stars 25 forks source link

missionary.Cancelled not thrown #83

Closed den1k closed 1 year ago

den1k commented 1 year ago

this code properly throws Cancelled

(time
   (let [delays [{:id    1
                  :delay 1000}
                 {:id    2
                  :delay 2000}
                 {:id    1
                  :delay 500}
                 {:id    1
                  :delay 200}]]
     (m/?
       (->>
         (m/ap
           (let [[k vs] (m/?> ##Inf (m/group-by :id (m/seed delays)))
                 m (m/?< vs)]
             (try
               (m/? (m/sleep (:delay m) (assoc m :slept true)))
               (catch missionary.Cancelled c
                 (println :cancelled m)
                 (m/amb)))))
         (m/reduce
           (fn [out v]
             (println v)
             (conj out v))
           [])))))

this one does not

(time
  (let [delays [{:id    1
                 :delay 1000}
                {:id    2
                 :delay 2000}
                {:id    1
                 :delay 500}
                {:id    1
                 :delay 200}]]
    (m/?
      (->>
        (m/ap
          (let [[k vs] (m/?> ##Inf (m/group-by :id (m/seed delays)))
                m (m/?< vs)]
            (try
              (m/? 
                (m/via m/blk (Thread/sleep (:delay m)) (assoc m :slept true))) ; <---- difference
              (catch missionary.Cancelled c
                (println :cancelled m)
                (m/amb))
              (catch Throwable t ; <----- added to catch exception
                (println :thrown)
                (m/amb))
              )))
        (m/reduce
          (fn [out v]
            (println v)
            (conj out v))
          [])))))

How can cancellation be detected in the latter case?

den1k commented 1 year ago

Fixed!

(time
  (let [delays [{:id    1
                 :delay 1000}
                {:id    2
                 :delay 2000}
                {:id    1
                 :delay 500}
                {:id    1
                 :delay 200}]]
    (m/?
      (->>
        (m/ap
          (let [[k vs] (m/?> ##Inf (m/group-by :id (m/seed delays)))
                m (m/?< vs)]
            (try
              (m/!) ; <---- fixed
              (m/?
                (m/via m/blk (Thread/sleep (:delay m)) (assoc m :slept true)))
              (catch missionary.Cancelled c
                (println :cancelled m)
                (m/amb)))))
        (m/reduce
          (fn [out v]
            (println v)
            (conj out v))
          [])))))
den1k commented 1 year ago

Reopening as Cancelled still does not throw when run downstream from m/observe

(declare put)
(def c
  (let [observe-flow (m/observe (fn [!]
                                  (defn put [x]
                                    (! x))
                                  #(do)))
        task         (->>
                       (m/ap
                         (let [[k vs] (m/?> ##Inf (m/group-by :id observe-flow))
                               m (m/?< vs)]
                           (try
                             (m/!)                          ; <---- fixed
                             (m/?
                               (m/via m/blk (Thread/sleep (:delay m)) (assoc m :slept true)))
                             (catch missionary.Cancelled c
                               (println :cancelled m)
                               (m/amb))
                             (catch Throwable t
                               (println :throwing-instead-of-cancelling)
                               (m/amb)
                               ))))
                       (m/reduce
                         (fn [out v]
                           (println v)
                           (conj out v))
                         []))
        cancel       (task (fn [s] (println :success s))
                           (fn [f] (println :fail f)))]
    cancel

    ))

(put {:id    1
      :delay 200})
=> nil
{:id 1, :delay 200, :slept true}

(do (put {:id    1
          :delay 200})
    (put {:id    1
          :delay 500}))
:throwing-instead-of-cancelling
=> nil
{:id 1, :delay 500, :slept true}
leonoel commented 1 year ago

On cancellation, m/via interrupts the thread. The final result depends how the body reacts to thread interruption, in this case Thread/sleep will stop and throw InterruptedException. The right fix is to catch InterruptedException. In the first example, m/! works by accident because the switch happens immediately, before the sleep even starts.

den1k commented 1 year ago

hmm, the issue is that various exceptions can be thrown as arbitrary sandboxed code is run by the user. Is there any other way to detect Cancellation by m/?<?

dustingetz commented 1 year ago

Hey can you switch to the slack for support please, this emails 22 people

leonoel commented 1 year ago

@den1k more details here #51