ReactiveX / rxjs

A reactive programming library for JavaScript
https://rxjs.dev
Apache License 2.0
30.77k stars 3.01k forks source link

bufferTime race condition drops events #5146

Closed kschaefe closed 4 years ago

kschaefe commented 4 years ago

Bug Report

Current Behavior If you set positive values to bufferCreationInterval and maxBufferSize when initializing bufferTime, bufferTime can drops events while creating a new backing buffer.

Reproduction

import { interval } from 'rxjs'; 
import { bufferTime, map, take } from 'rxjs/operators';

const example = interval(200).pipe(
  take(10),
//  bufferTime(1000, 1000, 1), //produces 0, 4, 9
//  bufferTime(1000, 500, 1), //produces 0, 2, 4, 7, 9
  bufferTime(1000, 1000, 3), //produces [0, 1, 2], [4, 5, 6], 9
  map(x => `${x}`)
); 

example.subscribe(x => console.log(x));

Expected behavior bufferTime should produce buffers that contain all supplied events.

Environment

Additional context/Screenshots Possibly related to #4847

cartant commented 4 years ago

TBH, I don't know what the expected behaviour is. The documentation says this:

When the optional argument maxBufferSize is specified, the buffer will be closed either after bufferTimeSpan milliseconds or when it contains maxBufferSize elements.

It doesn't say anything about opening a new buffer if the maximum size is reached. Reading the documentation, my understanding is that only bufferTimeSpan and (optionally) bufferCreationInterval control the opening of a buffer.

Whether this is correct, IDK. I note that it is possible to have overlapping buffers - by specifying a bufferCreationInterval less than the bufferTimeSpan - so perhaps it's not that strange that elements can be dropped if a maximum size is specified.

I did have a look at some other Rx implementations, but neither Rx.NET nor RxJava seem to have a similar operator - that is, a buffer operator that take a creation interval and a count - so there is no guidance to be found there. Also, neither allow overlapping buffers, AFAICT.

🤷‍♂

I think #4847 is a bug - because it crashes - but I'm not so sure that the element-dropping behaviour is a bug.

benlesh commented 4 years ago

Is it confusing behavior? Yes. Is it expected? At the moment yes.

When this was created in RxJS 5, it was mimicking what could be done with the RxJS 2-4 version of the operator, which was a single buffer operator that did everything buffer, bufferWhen, bufferTime, and bufferWhile do today.

The expected behavior, TMK, is that we're saying "if you don't specify an interval on which to create a new buffer, then we'll open one for you when the old one closes, otherwise, we'll assume you only want one to be opened at the interval you've specified. In this case it is dropping events because it thinks you only want a maximum of 3 events, which arrive faster than the 1000ms buffer opening interval.

Happy to entertain other ideas of how this could work, or ways to improve the API. (I for one think it should accept an options object that we validate), but this is expected behavior for this operator.

Closing this for now, but happy to check another thread with ideas if that's something people want to toss out there.

kschaefe commented 4 years ago

To be honest, I think the documentation could improve. We used bufferCreationInterval because we thought it was necessary when using maxBufferSize.

To me it seems likely that no one wants to use both of those parameters at the same time and warning users of potential dropped events by doing so seems like an important documentation task.