ReactiveX / rxdart

The Reactive Extensions for Dart
http://reactivex.io
Apache License 2.0
3.36k stars 272 forks source link

AnyLatestStream combiner #749

Closed equescodebelike closed 4 months ago

equescodebelike commented 5 months ago

So the problem was, that using CombineLatestStream the stream will not emit until all Streams have emitted at least one item. I found an answer on https://stackoverflow.com/questions/70361479/rx-dart-combine-multiple-streams-to-emit-value-whenever-any-of-the-streams-emit by Code Spirit user. And I think it is rather cool to add this feat by default in Rx lib. I have written the main idea in the documentation. No test coverage and no static method in Rx class, because I want to know if this is a good idea or i misunderstanding something.

hoc081098 commented 5 months ago

Hi @equescodebelike, IMO, CombineAnyLatestStream([a, b], combiner) is similar to CombineLatestStream([a.startWith(null), b.startWith(null)], combiner). Could you please point out the difference between them 🙏 ?

equescodebelike commented 5 months ago

Hi @equescodebelike, IMO, CombineAnyLatestStream([a, b], combiner) is similar to CombineLatestStream([a.startWith(null), b.startWith(null)], combiner). Could you please point out the difference between them 🙏 ?

Implementation equals the original CombineLatestStream implementation except that it does not wait for all streams to emit before emitting

hoc081098 commented 5 months ago

Hi @equescodebelike, IMO, CombineAnyLatestStream([a, b], combiner) is similar to CombineLatestStream([a.startWith(null), b.startWith(null)], combiner). Could you please point out the difference between them 🙏 ?

Implementation equals the original CombineLatestStream implementation except that it does not wait for all streams to emit before emitting

We can achieve this behavior of CombineAnyLatestStream by using Rx.combineLatestList + startWith(null) + skip(1).

void main() async {
  print('${DateTime.now()} -----');
  await CombineAnyLatestStream([
    Rx.timer(1, Duration(seconds: 1)),
    Rx.timer(2, Duration(seconds: 2)),
  ], (l) => l).forEach((v) => print('${DateTime.now()} $v'));

  print('${DateTime.now()} -----');

  await Rx.combineLatestList([
    Rx.timer(1, Duration(seconds: 1)).cast<int?>().startWith(null),
    Rx.timer(2, Duration(seconds: 2)).cast<int?>().startWith(null),
  ]).skip(1).forEach((v) => print('${DateTime.now()} $v'));

  print('${DateTime.now()} -----');
}

image

equescodebelike commented 5 months ago

Hi @equescodebelike, IMO, CombineAnyLatestStream([a, b], combiner) is similar to CombineLatestStream([a.startWith(null), b.startWith(null)], combiner). Could you please point out the difference between them 🙏 ?

Implementation equals the original CombineLatestStream implementation except that it does not wait for all streams to emit before emitting

We can achieve this behavior of CombineAnyLatestStream by using Rx.combineLatestList + startWith(null) + skip(1).

void main() async {
  print('${DateTime.now()} -----');
  await CombineAnyLatestStream([
    Rx.timer(1, Duration(seconds: 1)),
    Rx.timer(2, Duration(seconds: 2)),
  ], (l) => l).forEach((v) => print('${DateTime.now()} $v'));

  print('${DateTime.now()} -----');

  await Rx.combineLatestList([
    Rx.timer(1, Duration(seconds: 1)).cast<int?>().startWith(null),
    Rx.timer(2, Duration(seconds: 2)).cast<int?>().startWith(null),
  ]).skip(1).forEach((v) => print('${DateTime.now()} $v'));

  print('${DateTime.now()} -----');
}

image

I see, thank you for your response. It would be nice if you write this example in Rx.combineLatestList documentation. Or I can deal with it in pull request

hoc081098 commented 5 months ago

Hi @equescodebelike, IMO, CombineAnyLatestStream([a, b], combiner) is similar to CombineLatestStream([a.startWith(null), b.startWith(null)], combiner). Could you please point out the difference between them 🙏 ?

Implementation equals the original CombineLatestStream implementation except that it does not wait for all streams to emit before emitting

We can achieve this behavior of CombineAnyLatestStream by using Rx.combineLatestList + startWith(null) + skip(1).

void main() async {
  print('${DateTime.now()} -----');
  await CombineAnyLatestStream([
    Rx.timer(1, Duration(seconds: 1)),
    Rx.timer(2, Duration(seconds: 2)),
  ], (l) => l).forEach((v) => print('${DateTime.now()} $v'));

  print('${DateTime.now()} -----');

  await Rx.combineLatestList([
    Rx.timer(1, Duration(seconds: 1)).cast<int?>().startWith(null),
    Rx.timer(2, Duration(seconds: 2)).cast<int?>().startWith(null),
  ]).skip(1).forEach((v) => print('${DateTime.now()} $v'));

  print('${DateTime.now()} -----');
}

image

I see, thank you for your response. It would be nice if you write this example in Rx.combineLatestList documentation. Or I can deal with it in pull request

Feel free to add this example to docs 🙏