ReactiveX / rxdart

The Reactive Extensions for Dart
http://reactivex.io
Apache License 2.0
3.37k stars 271 forks source link

How can we combine multiple streams if they depends on each other? #730

Closed desmeit closed 1 year ago

desmeit commented 1 year ago

I have two streams. The first stream fetches an ID that I need for the second stream. How can I wait for the first stream and combine both?

I tried:

CombineLatestStream combineStream = CombineLatestStream.list([
    FirebaseService(uid: userData!.uid, category: userData!.currentCategory).getPostIdStream,
    FirebaseService(postId: postId).articleStream, // here I need the postId which I get from the first stream
]);
hoc081098 commented 1 year ago

You should use flatMap/asyncExpand, exhaustMap, switchMap

FirebaseService(uid: userData!.uid, category: userData!.currentCategory)
    .getPostIdStream
    .switchMap((postId) =>
         FirebaseService(postId: postId)
            .articleStream
            .map((article) => Tuple2(article, postId))
     );

Sent from my 2201117TG using FastHub

desmeit commented 1 year ago

Very nice. Just one question: Is that also possible to have two streams under the switchmap?

stream1
--> stream 2 (depends on stream 1)
--> stream 3 (depends on stream 1)

thanks a lot!

hoc081098 commented 1 year ago

Yes. Inside switchMap, you can use all operators such as Rx.zip, Rx.combineLatest, Rx.forkJoin, ... Keep in mind, streams are composable 🙏 Eg.

postIdStream.switchMap((postId) {
    final firstStream = getFirstStream(postId);
    final secondStream = getSecondStream(postId);

     return Rx.combineLatest(firstStream, secondStream, (a, b) => ...);
});
desmeit commented 1 year ago

Amazing! Thanks again for your help.

hoc081098 commented 1 year ago

I'm glad to help you. Feel free to create a new issue to discuss, report a bug, or request a feature.

desmeit commented 1 year ago

Now I tried with switchMap but the issue is, that I get only the result from the second stream.

stream 1
--> switchMap(data)
   --> setState data
      --> stream 2
         --> result of stream 2 (stream 1 is not there)

Same for flatMap. I can see no difference between switchMap and flatMap.

Because of that I tried with asyncMap. asyncMap should keep the order and should output both streams. I get

Unhandled Exception: type 'ZipStream<List<testModel>, List<testModel>>' is not a subtype of type 'List<dynamic>?'

How can I read the ZipStream? Or is there a better solution to my problem?