clj-commons / manifold

A compatibility layer for event-driven abstractions
1.02k stars 106 forks source link

put! is blocking if connection is blocking #155

Closed joeltio closed 6 years ago

joeltio commented 6 years ago

From the documentation, s/put! is stated to be non-blocking. So, I am guessing it spawns a separate thread (or adds to a thread pool's queue) to perform the s/put!. Then, it delivers the deferred.

However, I tried to experiment with s/put! in this manner and found my repl to be blocked:

(def x (s/stream))
(def a (s/stream))

(s/connect-via x #(do (Thread/sleep 10000) (s/put! a %)) a)

(s/put! x 3)

After I (s/put! x 3), it will pause for 10 seconds before returning the deferred.

Is this effect intended?

ztellman commented 6 years ago

All sources and sinks in Manifold are non-blocking, but in the default execution model it tries to propagate messages on the same thread as the put! call. This is because preemptively putting each new callback onto a thread pool adds significant overhead without any obvious value in most cases. If you want this behavior, you can use s/onto as described here.