clj-commons / manifold

A compatibility layer for event-driven abstractions
1.02k stars 107 forks source link

Transduced streams not completing fully #135

Closed biiwide closed 7 years ago

biiwide commented 7 years ago

Closing a transduced stream doesn't trigger a completion in transducers with accumulated state. partition-all provides a good example:

(let [s (s/stream 0 (partition-all 5))]
  (-> (s/put-all! s (range 12))
      (d/finally (partial s/close! s)))
  (s/stream->seq s))

Expected: ([0 1 2 3 4] [5 6 7 8 9] [10 11]) Actual: ([0 1 2 3 4] [5 6 7 8 9])

The documentation on creating transducers indicates three arities: 0 / Init, 2 / Step, and 1 / Close. It looks like the close() method of the manifold.stream.default.Stream class is calling it's add! function with the init arity (0) instead of the close arity (1).

ztellman commented 7 years ago

Oh, thank you for catching this. Please feel free to open a PR, or I can fix this myself within the next few days.