ReactiveX / rxdart

The Reactive Extensions for Dart
http://reactivex.io
Apache License 2.0
3.36k stars 272 forks source link

BUG: non-broadcast Stream cannot be listened more than once when asyncMap is used #718

Open AlexDochioiu opened 1 year ago

AlexDochioiu commented 1 year ago

I attached a code snippet that crashes:

  final st = Stream.fromIterable([0, 1]).asyncMap((_) async => Future.delayed(Duration(seconds: 1), () => _));
  st.listen((event) => print("st: $event"), onError: (e) => print("st: error $e"), onDone: () => print("st: onDone"));
  await Future.delayed(Duration(seconds: 3));
  st.listen((event) => print("st2: $event"), onError: (e) => print("st2: error $e"), onDone: () => print("st2: onDone"));

Crash logs:

st: 0
st: 1
st: onDone
Unhandled exception:
Bad state: Stream has already been listened to.
#0      _StreamController._subscribe (dart:async/stream_controller.dart:676:7)
#1      _ControllerStream._createSubscription (dart:async/stream_controller.dart:827:19)
#2      _StreamImpl.listen (dart:async/stream_impl.dart:471:9)
#3      main (file:///Users/user/FlutterProjects/sample/example/streams_example.dart:55:6)
<asynchronous suspension>

Expected behaviour: A new stream should be created, and listened to.

hoc081098 commented 1 year ago

Use Rx.defer(() => ..., reusable: true)

AlexDochioiu commented 1 year ago

Hmm, I could have easily wrapped it inside a function too: st() => Stream.fromIterable([0, 1]).asyncMap((_) async => Future.delayed(Duration(seconds: 1), () => _));

which works relatively similar to defer (expect it doesn't wait on subscription to create the stream).

That's not very relevant though. The question is, is it correct that using asyncMap makes the stream non-reusable? Feels like a bug to me.

hoc081098 commented 1 year ago

Because Stream.fromIterable([0, 1]) is single-subscription stream, but allows listening to it multiple time, see also https://github.com/ReactiveX/rxdart/pull/694

hoc081098 commented 1 year ago
AlexDochioiu commented 1 year ago

I see your point. To me it still feels like unexpected behaviour. Wondering if it's worth trying to ask the people working on dart streams to expose some way of identifying MultiStreams?