ReactiveX / rxjs

A reactive programming library for JavaScript
https://rxjs.dev
Apache License 2.0
30.7k stars 3k forks source link

shareReplay with refCount stops working #6760

Open ghostlytalamaur opened 2 years ago

ghostlytalamaur commented 2 years ago

Describe the bug

When subscribe to shared observable multiple times internal connection to the source observable gets broken. It stops emit values on subscribe.

Expected behavior

Expected output:

first: 7.5.1
second: 7.5.1

Actual output

first: 7.5.1

Reproduction code

import { BehaviorSubject, merge, of, Subject } from 'rxjs';
import {
  shareReplay,
  take,
  takeUntil,
  materialize,
  filter,
} from 'rxjs/operators';

const shared$ = new BehaviorSubject('7.5.1').pipe(
  shareReplay({ refCount: true, bufferSize: 1 })
);

const isCompleted$ = shared$.pipe(
  materialize(),
  filter((n) => n.kind === 'C'),
  take(1)
);

const work = new Subject().pipe(takeUntil(isCompleted$));

merge(work, shared$)
  .pipe(take(1))
  .subscribe((value) => console.log('first: ', value));

shared$.subscribe((value) => console.log('second:', value));

Reproduction URL

7.5.1 6.4.0

Version

7.5.1

Environment

No response

Additional context

No response

EarthyOrange commented 2 years ago

Removing .pipe(take(1)) makes the 7.5.1 snippet work. So it looks like take is somehow affecting the shareReplay operator.

I had run in to something similar in my own project. So I ended up writing a simple operator that works like take but doesn't complete.

import { filter, map, scan } from 'rxjs/operators';

/**
 * Similar to the take operator, as in, it lets the first few values through.
 * However, unlike 'take' this operator doesn't complete.
 *
 * @param {int} few The number of emissions to let through
 * @returns {function(Observable): Observable}
 */
export default function first(few = 1) {
  return (source) => source.pipe(
    scan(
      (acc, value) => {
        if (acc.count < few) {
          acc.count += 1;
          acc.lastValue = value;
          return acc;
        }
        acc.lastValue = undefined;
        return acc;
      },
      { count: 0, lastValue: undefined },
    ),
    map(({ lastValue }) => lastValue),
    filter((lastValue) => !!lastValue),
  );
}

Replace take with first.

lincolnthree commented 2 years ago

Confirmed I am also seeing behavior where shareReplay({ bufferSize: 1, refCount: true }) stops emitting. My theory is also that it is causing the source observable to somehow complete when it should not, and it is not reconnecting.

Using first() did not solve my issue. Only setting refCount: false gets things working again (which of course introduces risk for a memory leak)

I feel like I am missing something basic here, but something definitely changed in the functionality between 6.x and 7.current.

For me there seems to be a commonality of using combineLatest([x$, y$]) in combination with shareReplay() that isn't working. Still tracking it down.

lincolnthree commented 2 years ago

I think this might be related: https://github.com/ReactiveX/rxjs/pull/5634/files#diff-44fa2a928f593fdd8b981dac7737a65f6b0ddc844300af18a539c59b683b5467R130

Looks like shareReplay now calls complete() on the subscribed subject/connector when the source completes. I don't think this this was in the original shareReplay behavior?

Am I missing something?

henry-alakazhang commented 2 years ago

I'm experiencing this as well. It happens when you subscribe multiple times, then unsubscribe and resubscribe again (is this called a reentrant subscription?). The code in the original post is a good example since multiple things subscribe to shared$.

Specifically, it happens if the code emits and unsubscribes immediately. If it emits after the first subscription, things seem to work as expected. It looks like it's a race condition in the setup code and the reset at refcount 0 logic.

Excuse my atrocious console logging, but this is what I see when a subscription is made and the source emits after a delay:

image

Whereas this is what happens if the source emits immediately:

image

As you can see, if a value emits immediately and both subscriptions unsubscribe (as you might get with take(1) or firstValueFrom()), the reset happens before the logic for the second subscription finishes. The shareReplay then gets into a bad state - it maintains a subscription to the source, and doesn't resubscribe when new results come in.

This started happening in 7 because of this new code, which prevents excess subscriptions to the source observable. There was also some rearranging of the code which may have prevented it in 6, I'm not sure.

https://github.com/ReactiveX/rxjs/blob/47fa8d555754b18887baf15e22eb3dd91bf8bfea/src/internal/operators/share.ts#L214-L220

Flipping the order of the code so the source subscription check runs before the refCount decrement subscription seems to fix the issue for me, but there are some comments explicitly wanting to do it in the other order, so I don't know if that solution will work.

lincolnthree commented 2 years ago

@henry-alakazhang Do you have a code reproduction you can share that demonstrates the issue? It would probably help the devs make sure it gets tested and fixed! (I haven't been able to reproduce this outside of my app - sporadically.)

henry-alakazhang commented 2 years ago

I haven't tried to reproduce it outside my app either, but the reproduction in the issue itself should demonstrate the issue fine.

I can simplify the reproduction a little more:

// Apply `replay` to an observable which emits immediately once
const shared$ = new BehaviorSubject('6.4.0').pipe(
  shareReplay({ refCount: true, bufferSize: 1 })
);

// Subscribe to it multiple times simultaneously, then unsubscribe simultaneously
combineLatest([shared$, shared$]).pipe(
  take(1)
).toPromise();

// It doesn't emit again.
const lateSub = shared$.subscribe((value) => console.log('late:', value));

This example specifically is minimal to the point of silliness (combineLatest on the same observable) but if you imagine doing a .pipe() and some transformations and merging it back in, it can be a real use case.

Note that also:

// If we reset it (via refCount = 0), the state gets fixed.
lateSub.unsubscribe();

shared$.subscribe((value) => console.log('later:', value)); // emits on both 7.5.1 and 6.4.0

Stackblitz 6.4.0: https://stackblitz.com/edit/q7u9jg-xxev1z?file=index.ts Logs "late:" and "later:" Stackblitz 7.5.1: https://stackblitz.com/edit/q7u9jg-p16qzd?file=index.ts Logs only "later:"

magyargergo commented 2 years ago

Hey guys,

Do you have any update on this?

jkossis commented 2 years ago

@benlesh it seems like part of the issue (or a tangential issue) is, with the refactor of share, we lost some of the granularity previously provided.

For example, publishReplay + refCount used to unsubscribe from source when the ref count dropped to 0, while still preserving the underlying ReplaySubject. With the newly configurable share, those two things are coupled with resetOnRefCountZero. So you can have the behavior of unsubscribe from source + generate new ReplaySubject, or neither. But not the aforementioned combination.

tl;dr resetOnRefCountZero has become a proxy for shareReplay's behavior with { refCount: true }. We have lost the functionality previously provided by publishReplay + refCount.

jkossis commented 2 years ago

@benlesh bumping this ^