dart-lang / sdk

The Dart SDK, including the VM, JS and Wasm compilers, analysis, core libraries, and more.
https://dart.dev
BSD 3-Clause "New" or "Revised" License
10.21k stars 1.57k forks source link

[doc/async] Stream documentation should take a multi-subscription streams into account #54265

Open sgrekhov opened 10 months ago

sgrekhov commented 10 months ago

According to the Stream documentation

There are two kinds of streams: "Single-subscription" streams and "broadcast" streams.

A single-subscription stream allows only a single listener during the whole lifetime of the stream. It doesn't start generating events until it has a listener, and it stops sending events when the listener is unsubscribed, even if the source of events could still provide more. The stream created by an async* function is a single-subscription stream, but each call to the function creates a new such stream.

Listening twice on a single-subscription stream is not allowed, even after the first subscription has been canceled.

Single-subscription streams are generally used for streaming chunks of larger contiguous data, like file I/O.

A broadcast stream allows any number of listeners, and it fires its events when they are ready, whether there are listeners or not.

It isn't quite so. Multi-subscription streams created by Stream.multi() and Stream.fromIterable() allow multiple listeners and can be listened more than once but aren't broadcast streams. This leads to a confusion.

For example, see Stream.first and Stream.firstWhere documentation

Stops listening to this stream after the first element has been received.

Internally the method cancels its subscription after the first element. This means that single-subscription (non-broadcast) streams are closed and cannot be reused after a call to this getter.

So, according to this statement we should get an error when we are trying to listen a non-broadcast stream after a call of the first

import "dart:async";

void test(Stream<int> s) {
  s.first.then((_) {
    if (!s.isBroadcast) {
      s.listen((_) {}); // According to the documentation expect an exception here
    }
  });
}

main() {
  test(Stream.fromIterable([1, 2, 3])); // No exception
  test(Stream.fromFutures([4, 5, 6].map((int x) => new Future.value(x)))); // Ok, throws "Stream has already been listened to."
}

cc @lrhn

lrhn commented 10 months ago

Yep.

I was working on that at some point. Found it: https://dart-review.googlesource.com/c/sdk/+/248121/2/sdk/lib/async/stream.dart

Does need to be updated.