ReactiveX / rxjs

A reactive programming library for JavaScript
https://rxjs.dev
Apache License 2.0
30.78k stars 3k forks source link

bufferTime and bufferCount will mutate current buffer if a source is a subject that emits in the subscription for that buffer #3001

Open MorleyDev opened 7 years ago

MorleyDev commented 7 years ago

RxJS version: 5.5.2

Code to reproduce: rxfiddle.net

const x = Rx.Observable.interval(1000);

const z = new Rx.Subject();
Rx.Observable.merge(x, z)
    .bufferTime(5000)
    .subscribe((value) => {
        console.log("Before", value)
                z.next("HELLO")
        console.log("After", value)
    });

rxfiddle.net

const x = Rx.Observable.interval(1000);

const z = new Rx.Subject();
Rx.Observable.merge(x, z)
    .bufferCount(3)
    .subscribe((value) => {
        console.log("Before", value)
                z.next("HELLO")
        console.log("After", value)
    });

Expected behavior: That "HELLO" would be logged out as one of the buffered values for every console.log(value) barring the first, and the array of value would remain a constant set through each individual subscribe.

Actual behavior: The value of "HELLO" is pushed onto the end of the current buffer array, mutating it.

Additional information:

This came up whilst attempting to use rxjs to develop simple physics simulations with a Redux-like scan/reducer. A step for that scan is to emit any follow-up events to be processed in the next 'frame', and I discovered those events were being silently thrown away by the bufferTime operator. Since discovered they aren't so 'thrown away', but 'mutated onto the current buffer'. Which isn't useful if emitting them is the last part of the next step.

Whilst there are better ways to architect the code that avoid doing a subject emit like this, which is what I did after I confirmed this was the current behaviour, this still seems like either a bug or a 'gotcha' which I could not find documented.

Jerry-Hong commented 7 years ago

@MorleyDev

because you execute the subject's next('Hello') is synchronous

you can try this code to see it

const x = Rx.Observable.interval(1000);

const z = new Rx.Subject();
Rx.Observable.merge(x, z)
    .bufferTime(5000)
    .subscribe((value) => {
                console.log(JSON.stringify(value)) // [0,1,2,3]
        z.next("HELLO")
        console.log(JSON.stringify(value)) // [0,1,2,3, 'HELLO']
    });

maybe this code as below is what you want

const x = Rx.Observable.interval(1000);

const z = new Rx.Subject().observeOn(Rx.Scheduler.async);
Rx.Observable.merge(x, z)
    .bufferTime(5000)
    .subscribe((value) => {
                console.log(JSON.stringify(value))
        z.next("HELLO")
        console.log(JSON.stringify(value))
    });
MorleyDev commented 7 years ago

So if a synchronous update happens inside the buffered observables subscription, it will mutate that buffer? That does not feel right to me. Is this really the desired and specified behaviour for rx?

I'd expect when the array gets past the observable and into my subscription, that array isn't going to be touched anymore by Rx and any emitted values would go into the next buffer.

Mutating the buffer seems to break the 'functional' part of 'functional reactive programming', and considering a buffer emits upon the end of it's time window, I'd expect any new values to come in as considered a part of the next time window instead of a mutation of that emitted time window, which I would have thought would be considered as 'finished' at the point it is emitted.

A key reason to use FRP and RX is to avoid that kind of stateful behaviour.

I've written my actual code to recursive instead of emitting for the next 'frame' (which I think is cleaner, anyway), but this definitely seems like a 'gotcha situation' to me that goes against what I think people would expect the behaviour of the buffer operators to be.

I've updated the title and my original post to reflect.

martinsik commented 7 years ago

I personally think this is expected behavior and it's a tradeoff to ensure performance consistency.

Of course, this doesn't matter if you have an array of 5 items but if you had one million items it would mean to create a copy of this entire array on every emission from bufferCount/bufferTime. This would degrade performance very quickly and you wouldn't be able to do anything about it. However when it's passing just an array reference it'll be still as fast as with 5 items.

Note that you can actually "fix" this for yourself by manually creating a copy of the buffer:

Rx.Observable.merge(x, z)
  .bufferCount(3)
  .map(buffer => Array.from(buffer))  
  .subscribe((value) => { ... });
MorleyDev commented 7 years ago

I understand if it's a performance optimisation, though I do think it is something that goes against expectations of a user even if it's the expected behaviour of the people who wrote it. If so, a mention in the docs, at least, I think would be valuable to prevent people tripping up.

I actually ended up writing my own noddy little lettable to do this, though rather than copy the buffer, I 'swap' the current one with an empty one and emit that, that way any next's go onto that empty buffer. So whilst there isn't the performance hit of a copy, you obviously are creating multiple Arrays that need to be garbage collected.

benlesh commented 5 years ago

This is a bug, and is still a bug in 6.x and 7.x alpha. Should be an easy fix.

jdussouillez commented 1 year ago

Any updates ?