clj-commons / manifold

A compatibility layer for event-driven abstractions
1.02k stars 106 forks source link

Unclear how `:timeout` is supposed to work in `stream/connect` #130

Closed dm3 closed 7 years ago

dm3 commented 7 years ago

The docstring on stream/connect says:

| timeout | if defined, the maximum time, in milliseconds, that will be spent trying to put a message into the sink before closing it. Useful when there are multiple sinks downstream of a source, and you want to avoid a single backed up sink from blocking all the others.

I always read this as "a time in milliseconds which the source will spend trying to put the item in the sink. When the time is elapsed, the sink will be closed". However it doesn't seem to work that way - the items are dropped and the sinks remain open.

On the latest 0.1.7-alpha3:

user=> (def src (s/stream))
#'user/src
user=> (def sink (s/stream))
#'user/sink
user=> (s/connect src sink {:timeout 10})
nil
user=> (def consumer (future (loop [] (let [r (s/take! sink)] (println "GOT" @r) (Thread/sleep 20) (recur)))))
#'user/consumer
user=> (def producer (future (dotimes [i 5] @(s/put! src i)) (println "DONE")))
#'user/producer
GOT 0
GOT 2
DONE
GOT 4
user=> (s/closed? sink)
false

Is this a bug?

ztellman commented 7 years ago

Yes, it is. This is down to me not writing a damn test for this, I've pushed 0.1.7-alpha4 with a fix and a test. Thank you for catching this.

ztellman commented 7 years ago

Ugh, I left a debug print statement in there. Use 0.1.7-alpha5 instead.