leonoel / missionary

A functional effect and streaming system for Clojure/Script
Eclipse Public License 2.0
630 stars 26 forks source link

Future interop #45

Open leonoel opened 2 years ago

leonoel commented 2 years ago

Trying to see if I've been paying attention in class, CompletableFuture to task should look something like:

(import '(java.util.concurrent Executor CompletableFuture)
        '(missionary.impl Thunk))

(defn cf->task
  [^CompletableFuture cf]
  (fn [success failure]
    (.handleAsync
     cf
     (reify java.util.function.BiFunction
       (apply [_ r e]
         (if (instance? Exception e)
           (failure e)
           (success r))))
     Thunk/cpu)
    (fn [] (.cancel cf true))))

(let [cf (CompletableFuture.)
      t (cf->task cf)
      success! (fn [res] (println 'yay! res))
      fail! (fn [e] (println 'Error! (ex-message e)))]
  (t success! fail!)
  (.complete cf 2))

(let [cf (CompletableFuture.)
      t (cf->task cf)
      success! (fn [res] (println 'yay! res))
      fail! (fn [e] (println 'Error! (ex-message e)))]
  (t success! fail!)
  (.completeExceptionally cf (Exception. "failed!")))

Originally posted by @bsless in https://github.com/leonoel/missionary/issues/41#issuecomment-940141828

leonoel commented 2 years ago

Your implementation is compliant with the task protocol, some remarks though :

(defn cf->task [do-cf]
  (fn [success failure]
    (let [^java.util.concurrent.CompletableFuture cf (do-cf)]
      (.handleAsync cf
        (reify java.util.function.BiFunction
          (apply [_ r e]
            (if (some? e)
              (failure e)
              (success r))))
        ^java.util.concurrent.Executor m/cpu)
      (fn [] (.cancel cf true)))))

(defmacro cf [& body]
  `(cf->task #(do ~@body)))

Also note that futures don't support graceful shutdown, which means the task will fail immediately when cancelled but the underlying process may still be alive at this point and you have no way to know when it will actually terminate. There's no solution to this problem, it's an fundamental limitation of the design of futures.

Note also that this is not the only possible way to turn a future into a task, you could also assume the underlying process is managed elsewhere and it's not your responsibility to cancel it, you just want to read the eventual value. In this case, cancelling the task should not cancel the process, just stop reading. You can represent the eventual result with dfv, and the error status can be represented with a thunk that returns normally or throws. Then we can use absolve to redirect thunk failures on the exception path.

(defn await-cf [^java.util.concurrent.CompletableFuture cf]
  (let [v (m/dfv)]
    (.handleAsync cf
      (reify java.util.function.BiFunction
        (apply [_ r e]
          (v #(if (some? e) (throw e) r)))))
    (m/absolve v)))

Cancellation is a tricky problem with futures because they're simultaneously two things : a reference to a running process, and a single assignment variable supporting error status. Do you cancel the process, or do you just stop waiting for the result ?

bsless commented 2 years ago

Thank you. First, in the wiki I turned the executor to a parameter, as we don't know if the task itself is blocking, so I should not have assumed anything regarding the pool Exception -> Throwable

(import '(java.util.concurrent Executor CompletableFuture)
        '(missionary.impl Thunk))

(defn cf->task
  [^CompletableFuture cf ^Executor ex]
  (fn [success failure]
    (.handleAsync
     cf
     (reify java.util.function.BiFunction
       (apply [_ r e]
         (if (instance? Throwable e)
           (failure e)
           (success r))))
     ex)
    (fn [] (.cancel cf true))))

(defn success! [res] (println 'yay! res))
(defn fail! [e] (println 'Error! (ex-message e)))

(let [cf (CompletableFuture.)
      t (cf->task cf Thunk/cpu)]
  (t success! fail!)
  (.complete cf 2))

(let [cf (CompletableFuture.)
      t (cf->task cf Thunk/cpu)]
  (t success! fail!)
  (.completeExceptionally cf (Exception. "failed!")))

I also looked at the Javadoc for CompletableFuture and indeed, cancelling it is meaningless in the default implementation wrt to interruption, so your the await solution is more correct and cleaner. Should I update the wiki with it?

leonoel commented 2 years ago

I have reworked the page and also added more examples, including the core.async helper we discussed with @mjmeintjes.