ReactiveX / rxdart

The Reactive Extensions for Dart
http://reactivex.io
Apache License 2.0
3.36k stars 272 forks source link

`PublishSubject` doesn't work with `concatWith` operator #750

Closed haashem closed 4 months ago

haashem commented 5 months ago

In the below example, concatWith should emits the values added by stream 2:

void exampleOf(String of, {required void Function() action}) {
  print('\n———Example $of———');
  action();
}

exampleOf('concatWith(Iterable<Stream>)', action: () {
    final stream1 = Stream.fromIterable([2, 3, 4]);
    final stream2 = PublishSubject<int>();

    stream1.concatWith([stream2]).listen(
      (event) => print('append: $event'),
      onDone: () => print('Done'),
    );
    stream2.add(5);
    stream2.close();
  });

Expected:

———Example concatWith(Iterable<Stream>)———
append: 2
append: 3
append: 4
append: 5
Done

Actual:

———Example concatWith(Iterable<Stream>)———
append: 2
append: 3
append: 4
Done

But if I replace PublishSubject with BehaviorSubject it works as expected. I think it's a bug.

hoc081098 commented 5 months ago

all elements from stream1 will be delivered to the result stream with microtask delays (async by default).

At this time, adding value to PublishSubject is ignored, because the stream1 is listening.

stream1: --2--3--4--|
stream2: 5
result     : --2--3--4--|

Sent from my 2201117TG using FastHub

hoc081098 commented 4 months ago

See https://github.com/ReactiveX/rxdart?tab=readme-ov-file#rx-observables-vs-dart-streams, and https://api.flutter.dev/flutter/dart-async/Stream-class.html for more details

A source of asynchronous data events.