ReactiveX / rxjs

A reactive programming library for JavaScript
https://rxjs.dev
Apache License 2.0
30.81k stars 3.01k forks source link

Synchronous emit to `switchMap` during the processing of previous emit prevents the first from unsubscribing #7455

Closed atscott closed 7 months ago

atscott commented 8 months ago

Describe the bug

I think this may be a duplicate of #7230 but not entirely sure.

When a switchMap is in the process of subscribing to one next notification, it will not unsubscribe if another next happens synchronously.

Expected behavior

switchMap should always unsubscribe from previous subscriptions if a new value comes in.

Reproduction code

https://stackblitz.com/edit/rxjs-a21xi5?devtoolsheight=60&file=index.ts


import { switchMap, Subject, finalize, from, filter } from 'rxjs';

const sub = new Subject<number>();
const finalizations = new Subject<number>();

sub
  .pipe(
    switchMap((id) => {
      return from(Promise.resolve(id)).pipe(
        finalize(() => {
          console.log(`finalize for ${id}`);
          finalizations.next(id);
        })
      );
    })
  )
  .subscribe((id) => {
    console.log(`next emit for ${id}`);
  });

finalizations.pipe(filter((id) => id === 1)).subscribe(() => {
  sub.next(3);
});
sub.next(1);
sub.next(2);

This outputs the following

finalize for 1
next emit for 3
finalize for 3
next emit for 2
finalize for 2

I would expect it to not produce "next emit for 2"

Reproduction URL

No response

Version

8.0.0-aplha-12

Environment

No response

Additional context

This is not a regression. I'm seeing this as far back as rxjs 6.

benlesh commented 7 months ago

This tricked me for a few minutes. This isn't a bug, it's expected behavior.

finalize will ALWAYS be called whether the source is completed, errored, or unsubscribed. In this case, the finalize callback is called synchronously during the unsubscription to the first observable, causing value 3 to fall through.

What you're looking for is tap({ complete: () => { /* do stuff */ } }) not finalize... finalize is like a "finally" block. It's GUARANTEED to be called no matter what. This is useful for tearing things down or logging.

atscott commented 7 months ago

Hi @benlesh thanks for the response. I think I'm a bit confused or not totally following the explanation. I do expect the finalize for 2 to emit. What I did not expect was that 2 value comes out of the pipe to the subscriber and logs next emit for 2. Shouldn't a new value coming into the switchMap cause the previous inner subscriber to always unsubscribe?

benlesh commented 7 months ago
const sub = new Subject<number>();
const finalizations = new Subject<number>();

sub
  .pipe(
    switchMap((id) => {
      return from(Promise.resolve(id)).pipe(
        finalize(() => {
          console.log(`finalize for ${id}`);
          finalizations.next(id);
        })
      );
    })
  )
  .subscribe((id) => {
    console.log(`next emit for ${id}`);
  });

finalizations.pipe(filter((id) => id === 1)).subscribe(() => {
  sub.next(3);
});
sub.next(1);
sub.next(2);
  1. sub.next(1)
  2. Synchronously call switchMap's mapping function with 1
  3. Which maps into the from(Promise.resolve(1)).pipe(finalize(STUFF)) observable, we'll call that innerObservable1. At this point the promise has already scheduled a microtask to emit on.
  4. switchMap synchronously subscribes to innerObservable1. Nothing else happens, as we have to wait for the promise to resolve.
  5. sub.next(2), again synchronously. (We don't get back to this point for a bit)
  6. We unsubscribe from the subscription to innerObservable1. This triggers the finalize callback!.
  7. The finalize callback logs finalize for 1, then calls finalizations.next(1) synchronously.
  8. The value 1 passes the filter in our second observable, and then synchronously hits the next handler in the subscription, calling sub.next(3).
  9. We don't have to unsubscribe any inner subscriptions, because we already did that on 6.
  10. Synchronously call the switchMap's mapping function with 3.
  11. Which maps into the from(Promise.resolve(3)).pipe(finalize(STUFF)) observable, we'll call that innerObservable3. At this point the promise has already scheduled a microtask to emit on.
  12. switchMap sends the value 3 from the resolved promise inside of innerObservable3 to the consumer logging next emit for 3.
  13. innerObservable3 completes, triggering the finalize. Which logs finalize for 3, then calls finalizations.next(3).
  14. 3 doesn't pass the filter in the other subscription, so nothing is done there.
  15. Back to the step after 5 above, innerObservable3 has cleaned up, so there's no inner subscription to unsubscribe from, and we call switchMap's mapping function with the value 2.
  16. Which maps into the from(Promise.resolve(2)).pipe(finalize(STUFF)) observable, which we'll call innerObservable2. At this point the promise has already scheduled a microtask to emit on.
  17. We synchronously subscribe to innerObservable2.
  18. When the promise resolves, we emit 2 to the consumer, logging next emit for 2.
  19. Then innerObservable2 completes, calling finalize's callback.
  20. The finalize callback synchronously logs finalize for 2, then calls finalizations.next(2).
  21. 2 does not pass the filter so nothing happens in the other observable.
benlesh commented 7 months ago

Moral of the story: Don't do too much synchronous stuff in observables, it's really weird. :P