ReactiveX / rxdart

The Reactive Extensions for Dart
http://reactivex.io
Apache License 2.0
3.37k stars 270 forks source link

Converting a PublishSubject to a ValueObservable doesn't work if not listened to first #200

Closed enyo closed 5 years ago

enyo commented 5 years ago

It's best explained with code:

final startGameSubject = PublishSubject<String>();
final activeGame = startGameSubject.stream.shareValue();

startGameSubject.add('New Game');

await Future.delayed(Duration());

print(activeGame.value); // -> prints null

In this case, the value 'New Game' added to startGameSubject is simply lost, because no one was listening to activeGame.

I would expect however, that a BehaviorSubject always keeps the last value and starts listening to the source stream immediately.


Here are a few workarounds to the issue:

// Listening to the stream *before* any input
final startGameSubject = PublishSubject<String>();
final activeGame = startGameSubject.stream.shareValue();

// Start listening to initialise the stream
activeGame.listen(null);

startGameSubject.add('New Game');

await Future.delayed(Duration());
print(activeGame.value); // Prints 'New Game'
// Listening to the stream with `.first`
final startGameSubject = PublishSubject<String>();
final activeGame = startGameSubject.stream.shareValue();

final first = activeGame.first;
startGameSubject.add('New Game');

print(await first); // Prints 'New Game'
// Using a BehaviorSubject as the source stream
final startGameSubject = BehaviorSubject<String>();
final activeGame = startGameSubject.stream; // Note that adding `.shareValue()` here doesn't work

startGameSubject.add('New Game');

print(activeGame.value); // Prints 'New Game'
frankpepermans commented 5 years ago

@brianegan Think the only solution here is to eagerly convert into a BehaviorSubject internally.

Currently, this only triggers on onListen, but we lose any added events on the parent Subject before that listener...

enyo commented 5 years ago

@frankpepermans from what I understand, this seems to be the right solution. The reason anybody would want to convert a PublishSubject into a BehaviorSubject is because they'd want the last value to be captured and available, so converting this immediately makes a lot of sense to me.

brianegan commented 5 years ago

Hey there -- thanks for writing in.

It sounds counterintuitive, but I really think shareValue is working as intended. Why do I say that? Because it's meant as a short-cut for the exact use-case you're describing!

shareValue is explicitly built to listen to a source Stream when it is first listened to, and cancel the source subscription when there are no more listeners! This is how share works across the Rx ecosystem. In fact, shareValue is just a shortcut for publishValue().refCount().

If you need to take full control over when you want to subscribe to the underling PublishSubject, you need to use publishValue instead. That will return a ConnectableObservable, which allows you to connect to the underling Stream when it makes sense for your use-case, and cancel the subscription returned from connect when you're done with it.

Your code would look like this instead:

final startGameSubject = PublishSubject<String>();
final activeGame = startGameSubject.stream.publishValue();
final connectionSubscription = activeGame.connect(); // Start listening to the PublishSubject

I'm not gonna say this is the most intuitive API -- in fact I'd also argue it's pretty confusing. However, it is the API that's supported around the Rx ecosystem.

That said, I guess I'm curious: What's the problem with using BehaviorSubject instead of PublishSubject if this is the functionality you'd like by default?

frankpepermans commented 5 years ago

@brianegan sounds right, but the value getter is very misleading though.

We allow a seed value when calling shareValue , but we do not reflect any further events that were added on the parent Stream , until the listen occurs.

Assume:

var controller = new PublishSubject();
var shared = controller.stream.shareValue(seedValue: 1);

controller.add(2);

shared.listen(print); // prints 1

The above is a bit confusing, no?

Maybe we can just throw a RTE when trying to call value before a first listen?

brianegan commented 5 years ago

Yah, I can definitely see why that's confusing, but making that an RTE would negate the benefit of the seedValue, which often needs to be accessed synchronously as the default value before the Stream emits any items. E.g. in Flutter

StreamBuilder(
  stream: myValueObservable, 
  initialData: myValueObservable.value, // This is really where seedValue is useful! Throwing an RTE would make it so this wouldn't work :/
  builder: ()...);

Overall, I think the reason this issue feels awkward relates again to the purpose of these operators (I should have mentioned this as well): These operators are built to take a Single Subscription Stream and turn it into a "multicast" or Broadcast Stream, so that multiple listeners can listen to a single source of data. They aren't built to convert one type of Subject into another. Rather than throwing a RTE when you access value, I think it makes more sense to throw an AssertionError if the Stream that's passed into the publish or share operators are already a broadcast stream.

frankpepermans commented 5 years ago

Regarding that value property. We could also just refactor it to be a Future instead, which completes either immediately, or after the first event (when seedValue is null) for example

brianegan commented 5 years ago

Hm, I dunno. I kinda think the sync access is really important here -- if we make it async, the above code sample I posted wouldn't work (same problem would arise if you need sync access in an AngularDart app).

enyo commented 5 years ago

@frankpepermans If I'm not mistaken, the sync .value property was specifically added to make it more usable in flutter and the StreamBuilder. Otherwise, you would always need to draw the widget without data first, until the stream fires its first event.

@brianegan mh, ok so they are meant to convert cold streams into hot streams. But then I might not understand how I'm supposed to be working with rx — specifically with flutter (and BLoC)...

If I want to build a reactive app in flutter, I need to build streams, that are going to be listened later on in my app (when my app navigates, and uses a stream with StreamBuilder). Right now I constantly run into two problems with RX:

  1. My streams aren't connected, because they don't have a listener yet. As you said, I can bypass this issue with .connect, but if I understood correctly, that will again, not deliver a ValueObservable which is required in the context of flutter, and it's quite a lot of unintuitive code.
  2. I need ValueObservable streams, otherwise I can't properly use them in my widgets, and building them has resulted in a lot of problems for me (one is related to this issue at hand, for the other one I'm creating a separate issue to discuss it, since I'm still not sure if this is a bug in rx, or if it also stems from something I don't understand).

I'm a bit unsure whether it makes sense to continue to explore the reactive programming (with rx) direction with my flutter app. I have been trying to port a small app that I had written in redux to BLoC & reactive programming, and I constantly ran into these issues. I really like the concept of reactive programming though, so I'd really give it a try. So maybe it's also me trying to do things that are contrary to the nature of the library, in which case I would be incredibly thankful for anything you could point out in my approach that is problematic.

I have looked at other examples, and how they solve "reactive programming" and the BLoC pattern (for example the Flutter TODO BLoC example but the way it's handled there is not something I find particularly elegant. The streams all get set ups with controllers, and then routed to an Interactor which uses functions that return Futures, which is (the way I understand it) not really reactive anymore. My dream was to program nearly every business logic with stream transformers, and build my app as a simple "flow of data" — streams connected to each other with transformers, but the more I try this approach the more I run into limitations and have the feeling that using controllers for each sink/stream I want as input/output is the best/easiest/only way to get the behaviour I want...

Anyway.. thanks for reading! I would be incredibly thankful to get some input on this. I have now spent the whole week trying to get into reactive programming & BLoC, and am not giving up yet, but I need to know whether it makes sense long term to continue in that direction.

I'd also like to mention that I'm a dart veteran and have been working dart exclusively since it was in its infancy, so I have a good grasp of the language in general. If I make it through this tunnel, I'll write up a blog with my experience and help for others to avoid the same issues I had.

frankpepermans commented 5 years ago

Personally, I tend to never use the sync value. However I do use BehaviorSubject a lot, because indeed views are lazily rendered while the Stream however could have played some events.

I dunno about Flutter, but in Angular having an async property is very easily dealt with.

I do think we need to tighten things up a bit, the assert proposition is good for example :)

frankpepermans commented 5 years ago

Having had some extra thought on this, I don't think we even need to assert. i.e. the implementation is good as-is.

With broadcast Streams, you ought to always be aware that past events may have played, the same is true here.

The value getter will be out of sync with the actual last played item, until the next event is played that is (and the shareValue target has been listened to of course), using seedValue helps to fill this "gap"

lukepighetti commented 5 years ago

My app would fall apart if .value were asynchronous. In fact, I cannot imagine a single scenario where the benefits of having it be async outweigh the benefits of it being synchronous. This is not specific to rxdart, rxjs also exhibits this behavior.

jaumard commented 5 years ago

That said, I guess I'm curious: What's the problem with using BehaviorSubject instead of PublishSubject if this is the functionality you'd like by default?

On my side the use case is:

final BehaviorSubject<double> _amount = BehaviorSubject(sync: true, seedValue: 0));
ValueObservable<double> get amount => _amount.distinct();

Problem is that distinct is returning now an Observable instead of a ValueObservable so if shareValue can't do that maybe another operator can ?

lukepighetti commented 5 years ago

.distinct().shareValue() is how I would normally resolve this

brianegan commented 5 years ago

Thanks for the discussion, all! Closing since I think this is "working as intended" according to the spec and no further action is required after discussion.