clj-commons / manifold

A compatibility layer for event-driven abstractions
1.02k stars 108 forks source link

Use a manifold stream as a core.async channel #31

Closed hadronzoo closed 9 years ago

hadronzoo commented 9 years ago

Core.async channels can be coerced into manifold streams, but are there plans to allow a manifold stream to be used as a core.async channel?

ztellman commented 9 years ago

Check out the final example at https://github.com/ztellman/manifold#streams.

hadronzoo commented 9 years ago

I was thinking that streams could implement ReadPort and WritePort and be used as channels directly, but connect works as well. Thank you for the response and for this library.

ztellman commented 9 years ago

I'd be willing to consider that once core.async escapes its eternal alpha status, but for now I'm sticking as much to the public API as I can for interop.

hadronzoo commented 9 years ago

Can connect be used to connect core.async channels to duplex streams like SplicedStream? It seems like this would require two separate channels (source and sink). The reason I'm using core.async is that I'm not sure how to reproduce the behavior of alts! using manifold.

ztellman commented 9 years ago

Sorry for the slow reply on this. Sure, you can just connect two different core.async channels like so:

(connect source-chan spliced-stream)
(connect spliced-stream sink-chan)
vspinu commented 7 years ago

Any news on this?

In my tests both connected and coerced chans perform 20x slower than raw core async operations.

ztellman commented 7 years ago

Can you share your benchmarks? Implementing ReadPort and WritePort is not only risky from an API change point-of-view, but it also means that Manifold will have a permanent dependency on core.async, which I'm not wild about.

vspinu commented 7 years ago

This is what I have tried:

(let [N 100000]

    (println "Raw IO:")
    (let [ch0  (async/chan 1)
          st0 (s/stream)]
      (time (doseq [i (range N)]
              (s/put! st0 i)
              @(s/take! st0)))
      (time (doseq [i (range N)]
              (>!! ch0 i)
              (<!! ch0)))
      (async/close! ch0)
      (s/close! st0))

    (println "Chained IO:")
    (let [ch     (async/chan 1)
          st     (s/stream)
          sink   (s/connect st ch)
          source (s/connect ch st)]
      (time (doseq [i (range N)]
              (s/put! st i)
              @(s/take! st)))
      (async/close! ch))

    (println "Coerced IO:")
    (let [ch     (chan)
          sink   (s/->sink ch)
          source (s/->source ch)]
      (time (doseq [i (range N)]
              (s/put! sink i)
              @(s/take! source)))
      (async/close! ch)))
;; Raw IO:
;; "Elapsed time: 77.223922 msecs"
;; "Elapsed time: 97.548761 msecs"
;; Chained IO:
;; "Elapsed time: 2181.753988 msecs"
;; Coerced IO:
;; "Elapsed time: 1680.171187 msecs"

which I'm not wild about

Are there any major flaws in core.async that you are not happy with?

From an end user prospective jumping back and forth and constantly re-weighting features between core.async and manifold hasn't been quite fulfilling process so far. So I wonder, wouldn't it be actually beneficial in long run to make core.async first-class and the default citizen and thus reap all the associated benefits such as cljs?

ztellman commented 7 years ago

core.async is one solution to a problem, which makes a bunch of indelible choices w.r.t. the execution model that may be appropriate for a given application, but not for library or other higher-order pieces of code. I expand on this here: http://aleph.io/manifold/rationale.html.

As for the performance difference, what Manifold is doing in put! and take! is not equivalent to >!! and <!!, which are blocking operations. A better analogue would be:

 (time (doseq [i (range N)]
              (<!! (go (>! ch0 i)))
              (<!! (go (<! ch0)))))

This does an asynchronous operation on the channel, and then blocks on the result. This is, more or less, what Manifold is doing at the interop level when you call connect.

Since I wrote this integration, I see that core.async has added put! and take! methods of its own, which I could potentially use, and would likely improve the performance. If you'd like to open an issue describing the performance gap, I'll see what I can do.