dart-lang / sdk

The Dart SDK, including the VM, JS and Wasm compilers, analysis, core libraries, and more.
https://dart.dev
BSD 3-Clause "New" or "Revised" License
10.3k stars 1.59k forks source link

Is a stream created by `Stream.multi()` always a multi-subscription stream? #56903

Open sgrekhov opened 1 month ago

sgrekhov commented 1 month ago

According to Stream.multi documentation https://github.com/dart-lang/sdk/blob/0342791d3374446b32be4ed7ec795be0c88c40d9/sdk/lib/async/stream.dart#L398

It's not quite true in the case when a source of MultiStreamController.addStream() is a single-subscription stream. For example:

main() {
  var source = StreamController<int>();
  source.add(1);
  source.add(2);
  source.add(3);
  source.close();

  var stream = Stream<int>.multi((controller) {
    controller.addStream(source.stream).then((_) {
      controller.close();
    });
  });
  listen(stream);
  listen(stream); // Bad state: Stream has already been listened to.
}

void listen(Stream<int> stream) {
  int eventsCounter = 1;
  stream.listen((v) {
    Expect.equals(eventsCounter++, v);
  }, onDone: () {
    Expect.equals(4, eventsCounter);
  });
}

Is this expected? If yes, then, probably, it makes sense to reflect it in the Stream.multi() constructor documentation.

cc @lrhn

dart-github-bot commented 1 month ago

Summary: The issue reports that a stream created by Stream.multi() is not always a multi-subscription stream when the source of MultiStreamController.addStream() is a single-subscription stream, leading to a "Bad state" error when attempting to listen to the stream multiple times. The user suggests updating the Stream.multi() documentation to reflect this behavior.

lrhn commented 1 month ago

The documentation should probably be expanded.

The Stream.multi stream can support a wide variety of behaviors, including being listend to multiple times, and the code that provides events can choose to limit what will work.

A "multi-subscription stream" here means a stream which you can call listen on multiple times. The stream itself doesn't prevent that.

If the code that's supposed to provide events to the stream decides to throw during listen, it gets what it asks for. That makes the stream behave like a single-subscription stream. But by that measure, a StreamController(onListen: () { throw "Nah-ah!"; }) is not a single-subscription stream either, but a zero-subscription stream.

More text is probably the way to go.

If nothing else then:

/// Creates a steam which can respond to multiple listeners.
///
/// Each call to [Stream.listen] on the created stream
/// will call [onListen] with a new 
/// [MultiStreamController] which can be used to 
/// send events to that one subscription.