ReactiveX / rxdart

The Reactive Extensions for Dart
http://reactivex.io
Apache License 2.0
3.37k stars 270 forks source link

.skip() doesn't allow listeners #342

Closed gbaldeck closed 4 years ago

gbaldeck commented 5 years ago

I have a BehaviorSubject who's ValueObservable I am subscribing to and trying to skip the first value. Basically I don't want the value that's already there, I only want the values that come afterward. Here's what I have:

myBehaviorSubject.shareValue().skip(1).listen(myFunction);

When I do this, myFunction is never called no matter how many times I trigger it. The below works fine though:

myBehaviorSubject.shareValue().listen(myFunction); //omitted the skip(1)

Am I doing something wrong or is this a bug?

frankpepermans commented 5 years ago

Not sure if I completely understand the problem:

For me this works:

import 'package:rxdart/rxdart.dart';

void main() {
  final subject = BehaviorSubject<int>();

  subject.shareValue().skip(1).listen(print); // prints 2 3 4

  subject.add(1);
  subject.add(2);
  subject.add(3);
  subject.add(4);
}
gbaldeck commented 5 years ago

Ok, I have narrowed it down further. What I have is a page in Flutter that when created does the below. The bloc.snackObs is the ValueObservable for my BehaviorSubject.

if (bloc.snackObs.value == null) {
      _snackBarSubscription = bloc.snackObs.listen(showSnackBar);
} else {
      _snackBarSubscription = bloc.snackObs.skip(1).listen(showSnackBar); //todo: broken here, not listening
}

Then in the dispose of the page I have this:

void dispose() {
    _snackBarSubscription.cancel();
}

When I first open the page the if portion of the if-else block above is run. Then I do something on the page that triggers the subscription and my snackbar shows. When I leave the page the dispose method is called and the subscription canceled.

Then I go back into the page and the else portion of the if-else block is run. This is the subscription that uses .skip(1). At this point no matter what I do will trigger the snackbar. However, I went back and commented out the _snackBarSubscription.cancel(); in the dispose method and it works.

It seems that if I cancel the first subscription and use the .skip(1) the second time I open the page then the .skip(1) subscription doesn't work. I also verified that _snackBarSubscription.cancel(); wasn't getting called the second time I opened the page (just in case).

But if I don't cancel the first subscription then the .skip(1) subscription does work.

It only seems to do this with the .skip(1) subscription. If I get rid of the if-else block and only do:

_snackBarSubscription = bloc.snackObs.listen(showSnackBar);

And keep the _snackBarSubscription.cancel(); in the dispose function, then the subscription works.

So there seems to be something wrong with subscribing to a ValueObservable, canceling the subscription, then re-subscribing using .skip(1).

frankpepermans commented 5 years ago

I don't mind taking a look, but could we maybe reproduce the issue in a more isolated environment?

I.e. like above, just a main Function where we can build a BehaviorSubject and work with some subscriptions?

gbaldeck commented 5 years ago

Sorry if what I wrote is confusing. But what I've found is extremely weird behavior. Here it is in a more step-by-step form.

Steps that cause the problem

  1. Go to the page and subscribe with _snackBarSubscription = bloc.snackObs.listen(showSnackBar);
  2. Leave the page and cancel in the dispose function using _snackBarSubscription.cancel();.
  3. Go to the page again and subscribe again, but instead do _snackBarSubscription = bloc.snackObs.skip(1).listen(showSnackBar);.

Doing it that way the showSnackBar function never gets called by the second subscription even when I'm actively triggering it. This has to be an issue with Flutter because I was not able to replicate this in plain Dart.

Steps that fix the problem

  1. Go to the page and subscribe with _snackBarSubscription = bloc.snackObs.listen(showSnackBar);
  2. Leave the page, dispose function is called and a flag is set to true called _hasBeenDisposed, but we didn't cancel the subscription yet.
  3. Go back to page, a new subscription is created with _snackBarSubscription = bloc.snackObs.skip(1).listen(showSnackBar);.
  4. Trigger the subscriptions, since we didn't unsubscribe from the first subscription both the first listener and second listener are called. The listener functions check to see if _hasBeenDisposed is true. For the first subscription it is true so we cancel the first subscription. For the second subscription, it is false so the second subscription stays connected.

So what I've determined is, if I cancel the first subscription before creating the second subscription I get behavior where the second subscription is never called. When I cancel after the second subscription has been created then everything works fine.

I have no idea what it is doing this. Since I can't replicate it in plain Dart I assume something very weird is going on with Flutter, but I have no idea what it could be.

gbaldeck commented 5 years ago

@frankpepermans I have a question. If all the StreamSubscriptions on an observable are canceled, is the stream automatically closed and will not allow any more listeners to subscribe?

brianegan commented 5 years ago

@gbaldeck Yep, that's exactly what shareValue does! It will convert the current Stream into a Broadcast Stream that shuts down after you cancel all subscriptions.

In your case, however, you shouldn't need to call behaviorSubject.shareValue() -- the behaviorSubject is already a ValueStream! You can just write behaviorSubject.skip(1).

As Frank mentioned, it'd be great to get an isolated case, specifically a test, since it feels a bit hard to narrow this one down.