leonoel / cloroutine

Coroutine support for clojure
Eclipse Public License 2.0
228 stars 10 forks source link

how to write this macro using await/async example #17

Open zcaudate opened 3 years ago

zcaudate commented 3 years ago

I've been playing around with the cr macro looking to create a js-like syntax for async/await https://github.com/leonoel/cloroutine/blob/master/doc/02-async-await.md

There's a couple of issues I'm having with the example. One is that when there is a nested async block, the tags go missing and the compiler reports reflection errors:

This is fine:

(let [a (async 1)
        b (async (+ (await a) 2))
        c (async (+ (await a)
                    (await b) 3))]
    c)

but once it's put in an async block

(async
 (let [a (async 1)
       b (async (+ (await a) 2))
       c (async (+ (await a)
                   (await b) 3))]
   c))

this happens:

Screen Shot 2020-08-17 at 2 20 27 am

Also I'd like to have some sort of async scope where within the scope, await is automatically detected for async variables:

(defasync hello
  ([]
   (let [a (async 1)
         b (async (+ a 2))
         c (async (+ a b 3))]
     c)))

;; outside of async block, use await
(await (hello))
=> 

;; inside of async block, await is automatic
(async 
  (+ 1 2 3 (hello)))

This kind of mirrors the language that Js/python/go/kotlin and a bunch of other languages implement. I'd love to figure out how to get it working.

leonoel commented 3 years ago

compiler reflection errors : please file a separate issue with a minimal repro and compiler information.

inferred await : I've not ever tried to do that before, but I can share some thoughts here. I'm not familiar enough with python and go, and I'm not sure how js async/await relates to your problem, but kotlin coroutines seem close to what you describe indeed. However, note that kotlin coroutines are much higher level than what cr does. cr does code splitting only, whereas kotlin coroutines also carry a fully-fledged fiber system with structured concurrency and so on. Suspending functions are also first-class citizens in the type system. Automagic await inferring kind of makes sense in this framework because there's only one reasonable thing to do when calling a suspending function. It may not be as relevant with other coroutine flavors, e.g missionary's ap blocks support 4 different suspending operators and the type doesn't convey enough information to infer the operator so it must be explicit.

That being said, it may be possible to emulate kotlin-style coroutines on top of cr, but given clojure's dynamic nature it's probably hard to implement in a robust way. One possible approach could be to add an analysis pass before cr expansion, that pass could use tools.analyzer to check for tags in function calls, detect if the function is suspendable and wrap the call in an (await ,,,) form. Roughly :

(defn await []
;; unchanged
)

(defn thunk []
;; unchanged
)

(defn fiber ^CompletableFuture [coroutine]
;; unchanged
)

(defmacro infer-await [form]
  (let [ast (analyze form)]
    ;; walk ast, detect CompletableFuture tags, wrap them in (await ,,,), emit transformed code
))

(defmacro async [& body]
  `(fiber
    (cr {await thunk}
      (infer-await (do ~@body)))))
zcaudate commented 3 years ago

I managed to figure out how to do it without the cr macro. I didn't really know what ssa was, watched the Tim Baldridge talk on core.async and then compared the cloroutine code to the core.async code. It was really helpful but it struck me that the channels implementation can be done better with just a single block structure modelling the go block instead of a state machine.

Anyways, here's the gist of my implementation of async/await. There's an Async block type that does the switching using bindings.

(defmacro fn:supplier
  "creates a java supplier"
  {:added "3.0"}
  ([& body]
   `(reify Supplier
      (get [_] ~@body))))

(defn future:run [f]
   (CompletableFuture/supplyAsync (fn:supplier (f))))

;;
;; async/await
;;

(def ^:dynamic *async* nil)

(defn await
  "added for semanitic compatibility same as (.get future)"
  {:added "3.0"}
  ([future]
   (deref future)))

(defn async:resolve
  "resolves future if called within an async context"
  {:added "3.0"}
  ([future]
   (if *async* @future future)))

(defmacro async:gentype
  "generates the AsyncFn type"
  {:added "3.0"}
  ([]
   (let [async-fn (fn [arglist]
                    `(~'invoke [~'_ ~@arglist]
                      (async:resolve (future:run (fn [] (~'handler ~@arglist)) ~'options))))
         args (map #(symbol (str "a" %)) (range 21))
         arglists (for [i (range 21)]
                    (take i args))]
     `(deftype ~'AsyncFn [~'handler ~'options]
        clojure.lang.IFn
        ~@(map async-fn arglists)
        (applyTo [~'_ ~'args] (async:resolve (future:run (fn [] (apply ~'handler ~'args))
                                                         ~'options)))))))

(async:gentype)

(defmacro async
  "macro for defining async blocks"
  {:added "3.0"}
  ([opts? & body]
   (let [[opts body] (cond (map? opts?) [opts? body]
                           :else [{} (cons opts? body)])]
     `((AsyncFn. (fn []
                   (binding [*async* true]
                     ~@body))
                 ~opts?)))))

(defmacro defasync
  "macro for defining functions returning futures"
  {:added "3.0"}
  ([name bindings & body]
   `(defn ~name ~bindings
      (async
       (binding [*async* true]
         ~@body)))))

(defasync -test- []
     (let [a (async (Thread/sleep 10)
                    1)
           b (async (+ a 1))
           c (async (Thread/sleep 10)
                    (+ a b 2))]
       c))

@(-test-)
;;; => 5
leonoel commented 3 years ago

Your implementation changes the semantics of async. In an async block, a nested async will be immediately derefed, blocking current thread until result is available, so it effectively becomes a do.

zcaudate commented 3 years ago

okay. I see what you are saying. In that example, it does the expected thing (sleeps for 20ms) are there some counter examples of the behaviour not working?

zcaudate commented 3 years ago

as in, currently b runs after a instead of waiting within the async block. but when would this affect the actual time taken?

leonoel commented 3 years ago
@(async
  (+ (async (Thread/sleep 1000) 1)
     (async (Thread/sleep 1000) 2)))

I would definitely expect this to run in one second.

zcaudate commented 3 years ago

haha. gotcha. so is this the reason why the ssa transform is needed?

zcaudate commented 3 years ago

one other way I can see is that the async block walks through the code and wrap each non-macro form with an additional consume semantic so it becomes:

@(async
  ((wrap-consume +) 
     (async (Thread/sleep 1000) 1)
     (async (Thread/sleep 1000) 2)))

and the wrap-consume is then responsible for combining the arguments (which is not that trivial)

zcaudate commented 3 years ago

anyways, I'm just a bit overwhelmed with how much code to necessary to do the transforms. I'll wait for the reflection issue to be solved as i've spent way more time than I would like reading code.