ReactiveX / rxjs

A reactive programming library for JavaScript
https://rxjs.dev
Apache License 2.0
30.7k stars 3k forks source link

GroupBy subscriptions not disposed in due time - durationSelector #2660

Closed crunchie84 closed 7 years ago

crunchie84 commented 7 years ago

RxJS version: 5.4.0 Code to reproduce:

const ticker = Rx.Observable.interval(100)
  .map(tick => ({
    key: tick
  }))
;

ticker
  .groupBy(
    keySelector => keySelector.key % 2,
    null,
    durationSelector => ticker
      .filter(tick => tick.key > durationSelector.key + 10)
      .do(val => console.log(`[durationSelector] emitting value for group ${durationSelector.key}: ${val.key}`), null, () => console.log(`[durationSelector] completed for group ${durationSelector.key}`))
      .finally(() => console.log(`[durationSelector] unsubcribed for group ${durationSelector.key}`))
//    .take(1) // <--THIS FIXES IT
  )
  .take(3)
  .subscribe(
    val => console.log(`grouping: ${val.key}`),
err => console.error(err),
() => console.log('completed'));

Live version can be found at t his jsbin https://jsbin.com/vilacos/16/edit?js,console

Expected behavior:

When the durationSelector emits a value the group is completed. This should make the GroupBySubscriber also unsubscribe from the durationSelector so it can be garbage collected.

We would expect the finally() to be invoked directly after the durationSelector has emitted a value because it completes the groupBy grouping thus creating the following console logs:

grouping: 0
grouping: 1
[durationSelector] emitting value for group 0: 11
[durationSelector] unsubcribed for group 0

but instead the unsubscribe from the durationSelectors are only emitted when the groupBy completes due to the take(3) in the outer stream.

Appending .take(1) to the durationSelector stream manually fixes this behaviour (note; this was implemented as such in rxjs4 groupbyuntil.)

Actual behavior:

The durationSelector is not unsubscribed when the group has been completed. Only when the GroupBy is completed itself it releases all its captured durationSelectors. This results in hoarding of memory until OOM (our case) or parent stream is completed.

hermanbanken commented 7 years ago

I added a test to verify the observables returned by the durationSelector where indeed not closed in 4bd3ea2aa. A simple fix is something like 1a03cc29.

lock[bot] commented 6 years ago

This thread has been automatically locked since there has not been any recent activity after it was closed. Please open a new issue for related bugs.