clj-commons / manifold

A compatibility layer for event-driven abstractions
1.01k stars 106 forks source link

try-take! fills :pending-takes even after timeout #194

Closed dspearson closed 3 years ago

dspearson commented 3 years ago

When attempting to fetch from a stream, and expecting it to fail with a timeout with try-take!, :pending-takes is still incremented and results in an exception with excessive pending takes. Maybe I am using it incorrectly, but I would expect that timed-out takes would result in no mutated state of the stream since they would no longer be relevant.


enki.buffers> (def y (stream/stream))
#'enki.buffers/y
enki.buffers> (dotimes [n 16384] (stream/try-take! y 1))
nil
enki.buffers> y
<< stream: {:pending-puts 0, :drained? false, :buffer-size 0, :permanent? false, :type "manifold", :sink? true, :closed? false, :pending-takes 16384, :buffer-capacity 0, :source? true} >>
enki.buffers> (stream/try-take! y 1)
Jan 27, 2021 11:29:29 PM clojure.tools.logging$eval451$fn__455 invoke
WARNING: excessive pending takes (> 16384), closing stream
java.lang.IllegalStateException
    at manifold.stream.default.Stream.take(default.clj:234)
    at enki.buffers$eval38629.invokeStatic(form-init9188607286898067210.clj:17367)
    at enki.buffers$eval38629.invoke(form-init9188607286898067210.clj:17367)
    at clojure.lang.Compiler.eval(Compiler.java:7177)
    at clojure.lang.Compiler.eval(Compiler.java:7132)
    at clojure.core$eval.invokeStatic(core.clj:3214)
    at clojure.core$eval.invoke(core.clj:3210)
    at nrepl.middleware.interruptible_eval$evaluate$fn__29511$fn__29512.invoke(interruptible_eval.clj:87)
    at clojure.lang.AFn.applyToHelper(AFn.java:152)
    at clojure.lang.AFn.applyTo(AFn.java:144)
    at clojure.core$apply.invokeStatic(core.clj:665)
    at clojure.core$with_bindings_STAR_.invokeStatic(core.clj:1973)
    at clojure.core$with_bindings_STAR_.doInvoke(core.clj:1973)
    at clojure.lang.RestFn.invoke(RestFn.java:425)
    at nrepl.middleware.interruptible_eval$evaluate$fn__29511.invoke(interruptible_eval.clj:87)
    at clojure.main$repl$read_eval_print__9086$fn__9089.invoke(main.clj:437)
    at clojure.main$repl$read_eval_print__9086.invoke(main.clj:437)
    at clojure.main$repl$fn__9095.invoke(main.clj:458)
    at clojure.main$repl.invokeStatic(main.clj:458)
    at clojure.main$repl.doInvoke(main.clj:368)
    at clojure.lang.RestFn.invoke(RestFn.java:1523)
    at nrepl.middleware.interruptible_eval$evaluate.invokeStatic(interruptible_eval.clj:84)
    at nrepl.middleware.interruptible_eval$evaluate.invoke(interruptible_eval.clj:56)
    at nrepl.middleware.interruptible_eval$interruptible_eval$fn__29542$fn__29546.invoke(interruptible_eval.clj:152)
    at clojure.lang.AFn.run(AFn.java:22)
    at nrepl.middleware.session$session_exec$main_loop__29609$fn__29613.invoke(session.clj:202)
    at nrepl.middleware.session$session_exec$main_loop__29609.invoke(session.clj:201)
    at clojure.lang.AFn.run(AFn.java:22)
    at java.base/java.lang.Thread.run(Thread.java:832)#<SuccessDeferred@11f12ad4: false>
enki.buffers> (def y (stream/stream))
#'enki.buffers/y
enki.buffers> (dotimes [n 16384] (stream/try-take! y 1))
nil
enki.buffers> (stream/put! y 1)
#<Deferred@50a88030: :not-delivered>
enki.buffers> (stream/try-take! y 1)
#<SuccessDeferred@6429f73e: 1>
enki.buffers> y
<< stream: {:pending-puts 0, :drained? false, :buffer-size 0, :permanent? false, :type "manifold", :sink? true, :closed? false, :pending-takes 0, :buffer-capacity 0, :source? true} >>```

My use case is fetching from the stream, and if I encounter a timeout, I pass the data I collected up until that point to a function and then loop (not blocking on fetches, but flushing what I have and then waiting for more.) Currently this behaviour would result in the stream getting closed.

Apologies if this is incorrect usage, but it seems like maybe a bug?
KingMob commented 3 years ago

I think expecting timed-out takes to cease being counted as "pending" is reasonable. I checked with the core.async code base, and they periodically clean out pending takes and puts.

I wrote a fix for the takes, but I realized I still need to double-check the puts as well. They're probably affected by the same issue. Should have something shortly, anyway.

KingMob commented 3 years ago

Hey @dspearson, check out #195 when you get a chance, and let me know if it fixes your issue.

dspearson commented 3 years ago

Sorry for the delay. That does indeed appear to solve the issue, thank you! Tested with (dotimes [n 300000] (try-take! y 1)), where y is a stream. It no longer throws an exception.