dart-lang / sdk

The Dart SDK, including the VM, JS and Wasm compilers, analysis, core libraries, and more.
https://dart.dev
BSD 3-Clause "New" or "Revised" License
10.24k stars 1.57k forks source link

Warn on using generator as void function #42717

Open ride4sun opened 4 years ago

ride4sun commented 4 years ago

I am using a stream to read out location data in a bloc. I have a start and a stop event. In the stop method, I cancel the stream subscription. When I use listen to a stream to yield the state the inside where the yield statement is never gets called. What really bugs me that I don't get any error message warning or compiler error

    _locationSubscription = location.onLocationChanged.listen(
      (location) async* {
        if (location.isNotNull) {
          yield LocationState.sendData(location: updateLocation(location));
        }
      },
    );

When I replace the listen to await for I don't see any way to stop this from yielding events because the subscription handle is gone. Any ideas? Any explanations?

   Stream<LocationState> _start() async* {

    await Location()
        .changeSettings(accuracy: LocationAccuracy.high, interval: 1);

    await for (LocationData location in location.onLocationChanged) {
      if (location.isNotNull) {
        yield LocationState.sendData(location: updateLocation(location));
      }
    }

    //send one initial update to change state
    yield LocationState.sendData(
        location: updateLocation(await Location().getLocation()));
  }

This tracker is for issues related to:

lrhn commented 4 years ago

To stop an await for iteration, you need to break out of the loop. You can't do that from the outside. The most you can do is to set a flag, and on the next event, the loop can check that flag and break out. It doesn't cancel it immediately.

It's probably possible to create some Stream wrapper which allows all subscription created from that stream to be cancelled from somewhere else, and would then send a done event to the listener, but I'm not aware of such a class having been created yet.

I'd probably just use non-async code for this, keep the listen and use a StreamController for the resulting stream.

This code:

   _locationSubscription = location.onLocationChanged.listen(
      (location) async* {
        if (location.isNotNull) {
          yield LocationState.sendData(location: updateLocation(location));
        }
      },
    );

does not do what you expect. Each call to the inner function creates a new stream. Nobody listens to that stream (the onData callback to listen expects a void function, not one which returns a stream). The yield of an async* function only works if it's directly inside the async* function. It cannot be put into a nested function.

ride4sun commented 4 years ago

Ok, is it possible to get it working as I would have it expected? This is failing without any hint right now. Maybe Dart or the compiler/ analyzer can catch that? using listen in a stream is a pretty often used use case...

lrhn commented 4 years ago

We won't make the yield inside a nested function do a yield in the outer function.

As for the "no warning", maybe the analyzer could issue a warning if you use a sync* or async* generator function in a void-returning function context. That would suggest that you are doing something useless since a generator function doesn't really run until the returned iterator/stream is iterated/listened to, and something expecting a void function won't do that. I'll keep this issue as a feature request for this warning from the analyzer.

If you just want your code to keep working, I'd simply not switch to await for. If you need to cancel the subscription from the side, that's a use-case for using an explicit subscription object. If you can't write the code with .forEach, but have to use listen, then it probably also won't work with await for.

(You should use forEach over listen when you can, and then consider using await for if possible. Using listen should not be as common a use-case for Stream as it actually is because people use that instead of forEach).

Or you could write up a cancelable stream abstraction, like:

import "dart:async";

/// A stream which can be interrupted at any point.
///
/// Any subscription on [tream] can be cancelled at any
/// time using [interrupt].
abstract class InterruptibleStream<T> {
  /// Creates an interruptible stream which forwards to [source]
  ///
  /// Listening on the interruptible [stream] provides the same
  /// events as listening on [source] until either [source]
  /// is done or the [stream] is [interrupt]ed.
  factory InterruptibleStream(Stream<T> source) =>
      _InterruptibleStream<T>(() => source);

  /// Creates an interruptible stream which forwards to `source()`
  ///
  /// Listening on the interruptible [stream] creates a new
  /// stream by calling [source], then provides the same
  /// events as listening on that stream until either it
  /// is done or the [stream] is [interrupt]ed.
  factory InterruptibleStream.multi(Stream<T> source()) =>
      _InterruptibleStream<T>(source);

  /// The interruptible stream itself.
  ///
  /// All subscriptions on this stream can be interrupted using
  /// [interrupt].
  Stream<T> get stream;

  /// Interrupt all current subscriptions on [stream].
  ///
  /// An interrupted subscription cancels its underlying
  /// subscription, then emits a done event.
  ///
  /// If [error] is supplied, it is emitted as an error
  /// event after the cancel has completed, before
  /// emitting a done event.
  void interrupt([Object error, StackTrace stack]);
}

class _InterruptibleStream<T> implements InterruptibleStream<T> {
  final Stream<T> stream;

  /// Set of currently active subscriptions that [interrupt] should end.
  final Set<_InterruptibleSubscription<T>> _subscriptions;

  _InterruptibleStream(Stream<T> source()) : this._(source, {});

  _InterruptibleStream._(Stream<T> source(), this._subscriptions)
      : stream = Stream<T>.multi((StreamController<T> c) {
          _subscriptions.add(_InterruptibleSubscription<T>(
              _subscriptions, c, source().listen(null)));
        });

  void interrupt([Object? error, StackTrace? stack]) {
    for (var subscription in _subscriptions.toList()) {
      if (_subscriptions.remove(subscription)) {
        subscription.interrupt(error, stack);
      }
    }
  }
}

class _InterruptibleSubscription<T> {
  final StreamController<T> controller;
  final StreamSubscription<T> subscription;
  final Set<_InterruptibleSubscription<T>> owner;

  _InterruptibleSubscription(this.owner, this.controller, this.subscription) {
    controller.onPause = subscription.pause;
    controller.onResume = subscription.resume;
    controller.onCancel = _onCancel;
    subscription.onData(controller.add);
    subscription.onError(controller.addError);
    subscription.onDone(_onDone);
  }

  FutureOr<void> _onCancel() {
    owner.remove(this); // Can no longer be interrupted.
    return subscription.cancel();
  }

  void _onDone() {
    owner.remove(this); // Can no longer be interrupted.
    controller.close();
  }

  void interrupt(Object? error, StackTrace? stack) {
    var cancelResult = subscription.cancel();
    void end([_]) {
      if (error != null) controller.addError(error, stack);
      controller.close();
    }

    if (cancelResult is Future<void>) {
      cancelResult.then<void>(end, onError: (e, s) {
        controller.addError(e, s);
        end();
      });
    } else {
      scheduleMicrotask(end);
    }
  }
}

(The code as written requires a dev-release and null safety experiment for now).

petoknm commented 3 years ago

I was expecting this (cancelling a subscription from a stream using await-for in an async generator) to just work. Is this something that will be fixed in future versions of Dart?

lrhn commented 3 years ago

Not sure what "just work" means here. Cancelling the subscription of a stream being generated by an async* function running an await-for will cancel at the next yield. If there is no yield inside the await for, it will run to completion, just as a non-await for loop. There are no plans to change this.

idkq commented 3 years ago

Simply use break;

await for (final event in stream) {
      if (event.length==0) {
        yield Object();

        // Breaks the await for
        break;
     }
     //...
}
unicomp21 commented 3 years ago

if a break is used, it looks like something prevents the enclosing task from exiting. think this might be a bug.

however, if using a StreamIterator, instead of "await for", the "break" works as expected, and i don't get into trouble w/ the outer enclosing task exiting

as a general run, i "think" "await for" must always be allowed to run to completion, otherwise bad things happen.

lrhn commented 3 years ago

There might be a bug somewhere, but that's impossible to say without seeing the actual code which fails.

Breaking an await for is perfectly valid and the recommended way to stop listening to a stream in async code. (We are aware of exiting at a yield not always happening soon enough, #44616.)

idraper commented 2 years ago

I also ran into this problem (in a slightly more specific domain) and figured I'd share my work-around. I have a function which re-maps a never-ending stream (and cannot use .map since it cannot be built lazily). Then, if the listener closes their subscription, the stream doesn't close since it listens to (using await for) a never-ending stream. An example of where this could happen is with await getStream().first; internally the stream would never close. I came up with the following based on:

The most you can do is to set a flag, and on the next event, the loop can check that flag and break out. It doesn't cancel it immediately.

As long as it is not a broadcast stream, you can check to see if a controller has any subscribers, and if it doesn't you can break. For example:

Stream<S> getStream<S>() {
  final rtn = StreamController<S>(); // cannot be .broadcast()

  () async {
    await for (final val in <some never-ending stream, eg Stream.periodic>) {
      if (!rtn.hasListener) break;

      // add implementation here
      rtn.add(val);
    }
    rtn.close();
  }();

  return rtn.stream;
}

This wouldn't work for broadcast streams, since you could potentially cancel all listeners and then add a new one later. It also doesn't close immediately, but at least exists cleanly eventually (assuming another event is raised). This has the advantage (for my specific use case) that the client doesn't need to explicitly set a flag to clean up.

wangkaibule commented 2 years ago

Since the stream returned from an async* function is always single-subscription (reference), which means we cannot reattach to the stream once we cancelled the previous subscription on it. Why we need a dangling await for stream listener that probably iterate forever without any output?

hoc081098 commented 2 months ago

Hello,

When using await for with a infinite Stream, and yield inside loop, the finally block is not called when we cancel StreamSubsription (or take(1) for the same purpose):

import 'dart:async';

import 'package:rxdart/rxdart.dart';

void main() async {
  late StreamSubscription<void> sub;
  sub = getStream().listen((v) {
    print('Cancelling...');
    sub.cancel().then((_) => print('Cancelled...'));
  });

  await Future<void>.delayed(const Duration(seconds: 1));
  print('---');

  getStream().take(1).forEach(print);
}

Stream<String> getStream() async* {
  final s = Stream.value("Hello").concatWith([Rx.never()]);
  // or:  final s = BehaviorSubject.seeded('Hello');
  try {
    await for (final innerValue in s) {
      yield innerValue;
    }
  } finally {
    print('Done');
  }
}
lrhn commented 2 months ago

@hoc081098 this sounds like an unrelated problem, not fitting this issue which is now an analyzer enhancement request. Do file a separate new issue instead. (Also, do try to reproduce without using Rx streams, otherwise the bug may be in those, and it should be filled in the Rx-stream repo if it is. Although I'm guessing the bug is a known issue with the current async* implementation.)

hoc081098 commented 2 months ago

@hoc081098 this sounds like an unrelated problem, not fitting this issue which is now an analyzer enhancement request. Do file a separate new issue instead. (Also, do try to reproduce without using Rx streams, otherwise the bug may be in those, and it should be filled in the Rx-stream repo if it is. Although I'm guessing the bug is a known issue with the current async* implementation.)

I can reproduce it without rxdart lib 🙏

import 'dart:async';

void main() async {
  late StreamSubscription<void> sub;
  sub = getStream().listen((v) {
    print('Cancelling...');
    sub.cancel().then((_) => print('Cancelled...'));
  });

  await Future<void>.delayed(const Duration(seconds: 1));
  print('---');

  getStream().take(1).forEach(print);
}

Stream<String> getStream() async* {
  final controller = StreamController<String>();
  controller.onListen = () {
    controller.add('Hello');
  };

  try {
    await for (final innerValue in controller.stream) {
      yield innerValue;
    }
  } finally {
    print('Done');
  }
}

Console:

Cancelling...
---
Hello
lrhn commented 2 months ago

Cool. Do file as a new issue, it still isn't relevant to this issue. :wink: (Or maybe add it to https://github.com/dart-lang/sdk/issues/49621 or one of the issues linked from there, since it looks like the same issue if it only affects the dev-compiler.)

hoc081098 commented 2 months ago

Cool. Do file as a new issue, it still isn't relevant to this issue. 😉 (Or maybe add it to #49621 or one of the issues linked from there, since it looks like the same issue if it only affects the dev-compiler.)

I've just opened #56619