clj-commons / manifold

A compatibility layer for event-driven abstractions
1.02k stars 108 forks source link

`consume` breaks on a source with an expanding(?) transducer #83

Closed DerGuteMoritz closed 8 years ago

DerGuteMoritz commented 8 years ago

When a transducer which may call the step function more than once (expanding transducer?) is attached to a source, consume callbacks seem to not be called anymore. For example, mapcat gives us such a transducer and it has the following effect:

user> (def s (s/stream 0 (mapcat #(range % (+ % 5)))))
#'user/s
user> (s/put! s 3)
<< … >>
user> (s/consume prn s) ; for items put into the stream before the callback was attached, it works fine
3
4
5
6
7
nil
user> (s/put! s 3) ; but further puts don't trigger the callback anymore
<< … >>
user> (s/take! s) ; note that the 3 got lost
<< 4 >>
user> 
ztellman commented 8 years ago

Thanks for the report, I'll look into it.

dm3 commented 8 years ago

This happens because the transducer is piped into manifold.stream.default/add!. In an "expanding" transducer this means add! will get called multiple times while only the last result of add! gets used in Stream.put.

ztellman commented 8 years ago

Yeah, that's definitely the problem. Sorry this fell off my radar so completely, I'm trying to figure out a reasonable way to fix this now.

ztellman commented 8 years ago

I have a fix for this, which adds about 10% overhead relative to the previous implementation, since we have to allocate an accumulator for the result. I should probably split out the transducer and non-transducer implementations, so we only have this overhead in the former case, but for now everything seems to work, at least.