leonoel / missionary

A functional effect and streaming system for Clojure/Script
Eclipse Public License 2.0
620 stars 25 forks source link

task succeeds on cancel when task body is wrapped in a try/catch #82

Closed den1k closed 1 year ago

den1k commented 1 year ago
(let [task   (m/via m/blk
                    (try
                      (Thread/sleep 1000)
                      (catch Throwable t
                        :cancelled-and-should-not-succeed-or-fail)))
      cancel (task #(println :success %)
                   #(println :fail %))]
  (println :cancelling)
  (cancel))

;; :cancelling
;; :success :cancelled-and-should-not-succeed-or-fail

Not sure if this is a bug. However, I'm looking to prevent callbacks of either success/fail after cancellation since they cause unwanted side effects in my application. Is there a way to achieve this?

dustingetz commented 1 year ago

Can we see the business situation you have, that has the unwanted side effects?

den1k commented 1 year ago

I can explain it simply: it's part of a spreadsheet app that evaluates code in cells. When a cell is still evaluating but the user has updated the underlying code, the current execution should be cancelled in favor of the next one.

Presently evaluation is cancelled but because the task calls success with the error, the error value becomes the cell value until the next evaluation is complete.

xificurC commented 1 year ago

If you need the catch-all then maybe catch missionary.Cancelled before and rethrow it?

den1k commented 1 year ago

Thanks. I have it working now with a version of

(let [task            (m/via m/blk
                             (try
                               (Thread/sleep 1000)
                               (catch Throwable t
                                 :cancelled-and-should-not-succeed-or-fail)))
      ignore-success? (volatile! false)
      cancel          (task #(when-not @ignore-success?
                               (println :success %))
                            #(println :fail %))]
  (println :cancelling)
  (vreset! ignore-success? true)
  (cancel))
leonoel commented 1 year ago

Not sure if this is a bug.

It is expected. The task protocol mandates calling either callback exactly once because the cancellation may not be immediate and the caller must be able to know when the process was completely shut down.

Is there a way to achieve this?

Sorry for this answer, but you should not do that. Calling tasks and flows directly is low-level and dangerous, it's OK at the app entrypoint but otherwise you should always prefer functional composition.

When a cell is still evaluating but the user has updated the underlying code, the current execution should be cancelled in favor of the next one.

Depending on the cell state you expect between the user update and the resolution of the next task, ?< with ap or cp looks like a good fit, you would need to turn the task into a flow first (cf electric's offload)

den1k commented 1 year ago

Interesting. This is starting to make sense. However, in my case evals are triggered on the backend as part of a transaction listener which means cells are always received as collections, e.g. similar to

(let [cells [{:cell/form-str ":foo"}
             {:cell/form-str ":bar"}]]
  (m/?
    (->>
      (m/ap
        ;; fork process for every cell
        ;; to exec in **parallel**
        (let [cell        (m/?> ##Inf (m/seed cells))
              evaled-cell (m/? (m/timeout
                                 (m/via m/blk
                                        (let [{:cell/keys [cell/form-str]} cell]
                                          (assoc cell :cell/ret (eval (read-string form-str)))))
                                 1000
                                 ::timeout))]
          (if (= evaled-cell ::timeout)
            (-> cell
                (dissoc :cell/ret-pending?)
                (assoc :cell/exception? true
                       :cell/ret evaled-cell
                       :cell/ret-str (pr-str evaled-cell)))
            evaled-cell)))
      (m/reduce
        (fn [out v]
          (println v))
        []))))

; prints
{:cell/form-str :bar, :cell/ret :bar}
{:cell/form-str :foo, :cell/ret :foo}

But code would need to be aware of each cell's identity (by :db/ids) so that only evaluations of the same cell cancels its previous evaluation if one exists. Any way to achieve this?

dustingetz commented 1 year ago

Electric for-by is m/group-by under the hood. Have you considered modeling the spreadsheet with Electric for loops? Each cell is a signal. e/offload lets you call side effects (managing the task, the bridge to flow, the cancellation when something upstream changes).

den1k commented 1 year ago

I haven’t considered electric because this needs to run regardless of a client viewing the spreadsheet. I’d like to keep backend flows separate. The frontend still updates when clients are connected thanks to a watch on the db.

On Sun, Mar 26, 2023 at 10:31 AM Dustin Getz @.***> wrote:

Electric for-by is m/group-by under the hood. Have you considered modeling the spreadsheet with Electric for loops? Each cell is a signal. e/offload lets you call side effects (managing the task, the bridge to flow, the cancellation when something upstream changes).

— Reply to this email directly, view it on GitHub https://github.com/leonoel/missionary/issues/82#issuecomment-1484114123, or unsubscribe https://github.com/notifications/unsubscribe-auth/ABDPWIWRUPTANDMZR4VRPILW6BHMRANCNFSM6AAAAAAWHWPK2A . You are receiving this because you authored the thread.Message ID: @.***>

den1k commented 1 year ago

So what would be an approach using functional composition that would allow cell evaluations while also cancelling evals of the same cells as new cell values transfer?

den1k commented 1 year ago

Got it!

(time
  (let [delays [{:id    1
                 :delay 1000}
                {:id    2
                 :delay 2000}
                {:id    1
                 :delay 500}
                {:id    1
                 :delay 200}]]
    (m/?
      (->>
        (m/ap
          (let [[k vs] (m/?> ##Inf (m/group-by :id (m/seed delays)))
                m (m/?< vs)]
            (try
              (m/? (m/sleep (:delay m) (assoc m :slept true)))
              (catch missionary.Cancelled c
                (println :cancelled m)
                (m/amb)))))
        (m/reduce
          (fn [out v]
            (println v)
            (conj out v))
          [])))))
:cancelled {:id 1, :delay 1000}
:cancelled {:id 1, :delay 500}
{:id 1, :delay 200, :slept true}
{:id 2, :delay 2000, :slept true}
"Elapsed time: 2004.552279 msecs"
=> [{:id 1, :delay 200, :slept true} {:id 2, :delay 2000, :slept true}]