ReactiveX / rxdart

The Reactive Extensions for Dart
http://reactivex.io
Apache License 2.0
3.37k stars 270 forks source link

Allow sync access to value of combined BehaviorSubjects #496

Open smkhalsa opened 4 years ago

smkhalsa commented 4 years ago

I'm composing multiple BehaviorSubjects using CombineLatestStream, and I'd like to be able to get sync access to the combined result immediately after adding an event to one of the origin BehaviorSubjects.

final subject1 = BehaviorSubject.seeded(0);
final subject2 = BehaviorSubject.seeded(0);
final combined =
    CombineLatestStream.combine2(subject1, subject2, (a, b) => a + b)
        .shareValue();

subject1.add(1);

expect(combined.value, equals(1));
hoc081098 commented 4 years ago

Currently not supported :(

HannibalKcc commented 3 years ago

@hoc081098 Hi, this issue has been opened so many days. Do you think we need support combine BehaviorSubjects?

I want to try to achieve it.

frankpepermans commented 3 years ago

It might be hard to implement, since rxdart builds upon plain Dart streams, however, if both BehaviorSubjects are sync=true, then it might be doable,

Did you test the example with that sync value set to true?

HannibalKcc commented 3 years ago

@frankpepermans
set BehaviorSubject.sync = true will not be doable.

ValueStream must be listened and wait nextTick e.g. await Future.delayed(Duration(seconds: 0), cb).

    final s1 = BehaviorSubject.seeded(0, sync: true);
    final s2 = BehaviorSubject.seeded(0, sync: true);

    final combined =
        Rx.combineLatest2(s1, s2, (int a, int b) => a + b).shareValue();
    s1.add(1);
    combined.listen(expectAsync1((res) => expect(res, 1)));

    final stream =  Stream.fromIterable([1, 2, 3]).shareValue();
    stream.listen((_) {});

    print('combined ${combined.value}'); // null
    print('stream ${stream.value}'); // null
    await Future.delayed(Duration(seconds: 0), () {
      print('combined ${combined.value}'); // 1
      print('stream ${stream.value}'); // 3
    });