dart-lang / sdk

The Dart SDK, including the VM, JS and Wasm compilers, analysis, core libraries, and more.
https://dart.dev
BSD 3-Clause "New" or "Revised" License
10.22k stars 1.57k forks source link

"await for" errors are handled too late #45645

Open nex3 opened 3 years ago

nex3 commented 3 years ago

Errors that are thrown in the body of an await for within an async function aren't forwarded through that function's future until the next event is emitted by the stream in question. For example:

Stream<int> generate() async* {
  yield 1;
  await Future.delayed(Duration(seconds: 5));
  yield 2;
}

Future<void> wrapper() async {
  await for (var value in generate()) {
    print("handling ${value}");
    throw "oh no";
  }
}

void main() {
  wrapper().catchError((error) {
    print("caught $error");
  });
}

This prints 1, and then only after five seconds does it print caught oh no. This is a pretty substantial problem: certain streams may emit events separated by a large amount of time, and some (such as stdin) may never emit another event. This could cause the error to be silently ignored, potentially leading to deadlocks.

Instead of this behavior, I would expect an error to immediately cancel the subscription to the stream and forward the error to the outer future.

nex3 commented 3 years ago

Note that this also happens when using Stream.forEach(), so the underlying issue may be in dart:async. I'm currently using the following extension to work around it:


extension StreamExtension<T> on Stream<T> {
  /// Workaround for dart-lang/sdk#45645.
  CancelableFuture<void> forEachFixed(FutureOr<void> callback(T element)) {
    var completer = new Completer<void>();

    var subscription = this.listen(null, onError: completer.completeError, onDone: completer.complete);
    subscription.onData((event) async {
      subscription.pause();
      try {
        await callback(event);
        subscription.resume();
      } catch (error, stackTrace) {
        subscription.cancel();
        completer.completeError(error, stackTrace);
      }
    });

    return completer.future;
  }
}
lrhn commented 3 years ago

The issue with await for is likely an issue with async*. When you cancel the subscription, you wait for the future returned by cancel to complete. It does that when the async* function completes (after running through any finally blocks).

Our async* functions are implemented incorrectly on some platforms (maybe all, not sure any more). They do not check after the yield has synchronously delivered an event whether they have been cancelled or paused, they just continue execution. That means that the subscription.cancel call must wait until the next yield to exit the async* function.

The reason your "fixed" forEach appears to work is that it forgets to await the subscription.cancel(). If it did that, as it should, it too would be delayed until the cancel has properly completed and the async* method has terminated.

See: https://github.com/dart-lang/sdk/issues/34775

mraleph commented 3 years ago

/cc @mkustermann @cskau-g