dart-lang / core

This repository is home to core Dart packages.
https://pub.dev/publishers/dart.dev
BSD 3-Clause "New" or "Revised" License
12 stars 2 forks source link

Inconsistent behavior of `Stream.listen` on broadcast streams #370

Closed talesbarreto closed 8 months ago

talesbarreto commented 8 months ago

listen function has different behaviors depending on how the stream is accessed.

Case 1

import 'dart:async';

final streamController = StreamController<int>.broadcast();

Future<void> main(List<String> arguments) async {
  final subscription = streamController.stream.listen(print);
  streamController.add(1);
  streamController.add(2);
  streamController.add(3);
  await Future.delayed(Duration.zero);
  streamController.add(4);
  streamController.add(5);
  streamController.add(6);
  await Future.delayed(Duration.zero);
  subscription.cancel();
}

Output is 1 2 3 4 5 6

Case 2

Delegating the stream in an async* function

import 'dart:async';

final streamController = StreamController<int>.broadcast();

Stream<int> yieldedStream() async* {
  yield 0;
  yield* streamController.stream;
}

Future<void> main(List<String> arguments) async {
  final subscription = yieldedStream().listen(print);
  streamController.add(1);
  streamController.add(2);
  streamController.add(3);
  await Future.delayed(Duration.zero);
  streamController.add(4);
  streamController.add(5);
  streamController.add(6);
  await Future.delayed(Duration.zero);
  subscription.cancel();
}

Output is 0 4 5 6

Case 3

Creating another stream controller

import 'dart:async';

final streamController = StreamController<int>.broadcast();

Stream<int> streamUsingController() {
  final innerController = StreamController<int>();
  innerController.onListen = (() {
    innerController.add(0);
    innerController.addStream(streamController.stream);
  });
  innerController.onCancel = innerController.close;
  return innerController.stream;
}

Future<void> main(List<String> arguments) async {
  final subscription = streamUsingController().listen(print);
  streamController.add(1);
  streamController.add(2);
  streamController.add(3);
  await Future.delayed(Duration.zero);
  streamController.add(4);
  streamController.add(5);
  streamController.add(6);
  await Future.delayed(Duration.zero);
  subscription.cancel();
}

Output is 0 1 2 3 4 5 6

Thoughts

I think that case 2 and 3 should produce the same output but it seams like yield* doesn't get along with broadcast streams and it needs to break the event loop to work properly.

As mentioned in this issue (via Stack Overflow), it's understandable that Dart may need to run another event loop to effectively listen to the stream.

The problem I see here is that listen does not return a Future , witch would make clear that this is an expected behavior. In a purely synchronous codebase, it should be semantically guaranteed by the language that we don't need to concern ourselves with event loops, especially when it is not explicitly pointed out in its documentation.

talesbarreto commented 8 months ago

I'm sorry. My bad