dart-lang / language

Design of the Dart language
Other
2.65k stars 201 forks source link

await for ... and ... syntax #1440

Open lukepighetti opened 3 years ago

lukepighetti commented 3 years ago

I'm not aware of any way to subscribe to two streams using await for in an async* block. There are stream combining tools made available in rx_dart but I believe there is a lot of benefit to making this available with the core async tools.

As it stands today, I am not aware of any way to combine these two streams in an async* block.

Stream<BBQStatus> deviceStatus() async* {
  var status = BBQStatus.initial();

  /// Temperature probe events
  await for (var e in service.probeEvents()) {
    status = status.copyWith(probes: e);
    yield status;
  }

  print('This is not reachable until probeEvents stream closes');

  /// Battery voltage events
  await for (var e in service.batteryEvents()) {
    status = status.copyWith(battery: e);
    yield status;
  }

  print('This is not reachable until both streams close');
}

You can combine streams using solutions like rx_dart's CombineLatestStream.combine2, but having syntax to allow us to handle this would be really nice.

Here is a concept that is similar to syntax proposed by https://github.com/dart-lang/language/issues/1441

Stream<BBQStatus> deviceStatus() async* {
  var status = BBQStatus.initial();

  /// Temperature probe events and battery voltage events
  await for (var e in service.probeEvents()) {
    status = status.copyWith(probes: e);
    yield status;
  } and (var e in service.batteryEvents()) {
    status = status.copyWith(battery: e);
    yield status;
  }

  print('This is reachable when probeEvents and batteryEvents subscriptions complete');
}
lukepighetti commented 3 years ago

Combined with syntax proposed by https://github.com/dart-lang/language/issues/1441

Stream<BBQStatus> deviceStatus() async* {
  var status = BBQStatus.initial();

  /// Temperature probe events and battery voltage events
  await for (var e in service.probeEvents()) {
    status = status.copyWith(probes: e);
    yield status;
  } and (var e in service.batteryEvents()) {
    status = status.copyWith(battery: e);
    yield status;
  } on StreamException {
    ///
  } catch(e) {
    ///
  }

  print('This is reachable when probeEvents and batteryEvents subscriptions complete '
      'or either error handling block breaks');
}
lrhn commented 3 years ago

What you're asking for is a non blocking await for loop. You start the loop, but you also continue executing code after the loop.

The big question is: When do you stop looping? Can you break it from the outside, or does it keep running until the stream ends (or the code inside the loop breaks out).

What happens if your code inside the loop contains a return? Or a break of more than just that loop? It probably can't be allowed to do that, we can only have one flow of control at a time inside the same function invocation, so it's not a real loop, just a fancy way to write a callback function. (Fancy can be fine, it looks better in otherwise async code).

The code must keep running event if the main control flow reaches the end of the function. What if it reaches that by throwing? That would normally emit an error on the stream and close it, but now it can't close it any more. (That could be a way to have streams with embedded errors, if the loops can also emit errors and events later).

Basically, a way to do something in reaction to events, while staying inside the scope where you can still yield to the current stream. Could also work for futures, so you can await two futures in parallel.

It's not clear how to makes consistent semantics for something like this.

lukepighetti commented 3 years ago

When I first posted this issue it was fairly broad, but I honed via edits it to a syntax which I believe handles the majority of the concerns raised.

What you're asking for is a non blocking await for loop. You start the loop, but you also continue executing code after the loop.

The currently proposed syntax is still blocking, but it uses the and keyword to link multiple await for loops together.

The big question is: When do you stop looping? Can you break it from the outside, or does it keep running until the stream ends (or the code inside the loop breaks out).

The proposed syntax "unblocks" when the await for ... and ... loops complete naturally (source stream closes) or break. It errors when any loop throws an error.

What happens if your code inside the loop contains a return? Or a break of more than just that loop? It probably can't be allowed to do that, we can only have one flow of control at a time inside the same function invocation, so it's not a real loop, just a fancy way to write a callback function. (Fancy can be fine, it looks better in otherwise async code).

Can't return a value from a generator function (using the 'async*' modifier). How this works in a regular async block is not something I have spent time on.

The code must keep running event if the main control flow reaches the end of the function. What if it reaches that by throwing? That would normally emit an error on the stream and close it, but now it can't close it any more. (That could be a way to have streams with embedded errors, if the loops can also emit errors and events later).

The currently proposed syntax is blocking.

Basically, a way to do something in reaction to events, while staying inside the scope where you can still yield to the current stream. Could also work for futures, so you can await two futures in parallel.

It's not clear how to makes consistent semantics for something like this.

Please feel free to let me know if anything is still not handled.

lrhn commented 3 years ago

The change makes it two (or more) parallel loops.

I wouldn't just allow it in a generator function, it should work in plain async functions too. If it's useful, it's probably useful in more ways than one. It only works for async loops (await for), so at least it's async or async* only.

There are multiple ways to exit a loop:

Breaking the loop itself or completing normally are fine cases to just wait for the other loop. Control has only left one of the loops, and wants to continue right after the loop. We can wait for the other loop(s) and see if they get there too.

In all the other cases, it should probably break both/all loops when control flow exits the combined await for ( ) { } and ( ) { } statement. Control flow with a target shouldn't be ignored, but you can't do more than one of them.

If you combine it with #1441 (and even #171), I'd probably want individual exception handlers on each loop, so:

await for (var e in stream) {
  doSomething(e);
} catch (e) {
  doSomethingElse(e);
} else {
  doSomethingAtTheEnd();
} 
and for (var e in otherStream) {
  doWhatNot(e);
} catch (e) {
  doWhatNotElse(e);  
} else {
  doWhatNotAtTheEnd();
}

so the grammar would be:

<asyncLoop> ::= `await` <forInLoop> (`and` <forInLoop>)*

where it's a compile-time error if the expression of the for/in is not assignable to a Stream type.

The catch/on clauses apply to all for/in loops (it's less common to catch errors from get current in non-async loops than to get error events from streams, but there is no need to prevent it).

We'd probably have to make and a built-in identifier.

lukepighetti commented 3 years ago

Looks good to me! Is there anything left to figure out?