0no-co / wonka

🎩 A tiny but capable push & pull stream library for TypeScript and Flow
MIT License
709 stars 29 forks source link

Inconsistent behaviour when adding buffer() inside concatMap() #102

Closed laurisvan closed 3 years ago

laurisvan commented 3 years ago

First of all my apologies, this might be a stupid user error problem or a real bug - I am new to using Wonka and cannot yet tell which one it is. :)

I am implementing my own bufferCount() operator by concatMap and nested buffer inside. At some point I wondered why the buffers always emit with size of 1 and resorted back to the original buffering example. I suspect there is a bug somewhere or a concept I don't understand, so here is a simplified example - this time with interval timers instead of a counter.

Please help me figure out what am I missing. Alternatively, I would appreciate a bufferCount() implementation as that is my target anyway. :)

Wonka example on buffering works alright:

pipe(
  interval(100),
  buffer(interval(1000))
  // prints [1, 2, 3, 4, 5, 6, 7, 8, 9, 10], [11, 12, ...], etc...
  subscribe(console.log)
)

However, when nested inside concatMap, the arrays emit one item at time

pipe(
  interval(100),
  concatMap(value =>
    pipe(
      fromValue(value),
      buffer(interval(1000))
    )
  ),
  // prints [1], [2], [3] etc...
  subscribe(console.log)
)
kitten commented 3 years ago

Sorry, I must've forgotten to come back to this ✌️

In the second code snippet you've applied buffer only to items inside the concatMap (which runs one mapped stream at a time in series). This means that only your fromValue output of one item will be buffered at a time.

I'm not quite sure what bufferCount does but does it just collect a maximum number of items until emitting? Because if so you nay be able to implement it using scan aggregating values into an array and filter