Reactive-Extensions / RxJS

The Reactive Extensions for JavaScript
http://reactivex.io
Other
19.48k stars 2.1k forks source link

Strange bugs in zip #958

Open benlesh opened 9 years ago

benlesh commented 9 years ago

While checking into your (really) impressive perf gains, it seems I've found a couple of bugs in zip:

Here is a jsbin demonstrating the behavior

The code to replicate is follows

Rx.Observable.zip(
  Observable.interval(100).take(3),
  Observable.interval(100).take(4),
  Observable.interval(100).do(::console.log),
  Observable.interval(100).do(::console.log)
).subscribe(::console.log, null, () => console.log('done'));

The resulting observable should just take 3 zipped values, then complete and unsubscribe from all underlying observables, but instead it stop emitting, doesn't complete, and the inner observables keep chugging along.

With some experimentation, I found that zipping more than 3 observables, where the last one doesn't complete, it will do this strange behavior. However if the last one does complete, even if one prior to it does not, it will work as expected.

paulpdaniels commented 9 years ago

Looks like there was a scenario where some subscriptions could complete but the operator wasn't recognizing that their queues were empty as well so it just kept plugging along caching the values for the Observables that didn't complete.

tomlarkworthy commented 8 years ago

So I been investigating zip behaviour too (http://jsbin.com/xiyocozeta/1/edit?html,js,console,output)

take immediately fires the onCompleted when the sequence ends.

const source = Observable.from([1, 2])
  .tap(next => console.log("source.next", next))

Rx.Observable.zip(
  source.take(1).tapOnCompleted(
    () => console.log("completed")),
  source)
.subscribe(
  next => console.log("final next", next), 
  null, 
  () => console.log('final completed')
);
"source.next"
1
"completed"
"source.next"
1
"final next"
[1, 1]
"source.next"
2
"final completed"

My aim for zip was to lockstep drawing values until one sequence completed. However it overdraws in my opinion. It is fine that although zip receives an onCompleted it waits until the second stream to delivers it's next. However, I think that on that value, it should immediately fire the onCompleted for the zip, as it has received 1 value on each stream, and received an onCompleted for the second signal of one stream. However it draws another value from the second stream before realizing the first has called onCompleted and raises an onCompleted itself. Its a pointless call and wrecks some logic downstream in my application, as the completed now gets delayed out of order with what is happening before the zip.

benlesh commented 8 years ago

However it overdraws in my opinion.

Yeah, it should probably have completed when the second observable emits, clears the first observable's buffer, and notices that the first observable is already completed.

tomlarkworthy commented 8 years ago

I think it can only be solved if the "done" flags in the implementation has positional information relative to the buffer queue (i.e. they are numbers not booleans). Receipt of an onComplete would set the flag to the current buffer length (e.g. encoding that with when a buffer with 5 onNext objects queued, an onComplete indicates done would be live in 5 shifts time). These integer flags would decrement when the buffers are consumed, and the onComplete callback for the zip would fire if an onDone anywhere reaches zero. If you initialize them to a negative number you can avoid special casing the initialization.

paulpdaniels commented 8 years ago

I believe my PR would work in that case (though it is rather out of date at this point). A zip should always complete once it becomes impossible to emit a new value from all the source sequences. This would be the case as soon as at least one source completes and has an empty buffer.

@tomlarkworthy your implementation sounds more clear than mine, would you want to put together a PR for it?

tomlarkworthy commented 8 years ago

Oh I see, I did not see that logical path of (empty AND onDone) => onComplete, that works too and doesn't involve arithmetic (sounds better). Yeah I can open a PR on this.

tomlarkworthy commented 8 years ago

According to this (old) developer video, (https://channel9.msdn.com/Blogs/J.Van.Gogh/Reactive-Extensions-API-in-depth-Zip), this is not following the contract of zip. Weirdly, the tests seem to have been written such that zips on infinite length streams are enforced never to complete!?

https://github.com/Reactive-Extensions/RxJS/blob/master/tests/observable/zip.js#L205 (reminder, the onNext occurs before the subscription at 200, so the empty is just an onCompleted event).

Anyway its a change in semantics so not sure if its ok.

tomlarkworthy commented 8 years ago

OK so it is somewhat complicated for me to sign a CLA so I am am unwinding that PR.

mmcgahan commented 8 years ago

This issue appears to be fixed in RxJS v5