dart-lang / sdk

The Dart SDK, including the VM, JS and Wasm compilers, analysis, core libraries, and more.
https://dart.dev
BSD 3-Clause "New" or "Revised" License
10.31k stars 1.59k forks source link

`MultiStreamController.isPaused` remains `true` after `resume()` #56915

Open sgrekhov opened 1 month ago

sgrekhov commented 1 month ago

The following test fails. Why?

import "dart:async";
import "../../../Utils/expect.dart";

var theController;

main() {
  asyncStart();
  var stream = Stream<int>.multi((controller) {
    theController = controller;
    Expect.isFalse(controller.isPaused);
    controller.add(1);
    controller.add(2);
    controller.add(3);
  });
  listen(stream);
}

void listen(Stream<int> stream) {
  late StreamSubscription ss;
  ss = stream.listen((v) {
    if (v == 1) {
      ss.pause(Future.delayed(Duration(milliseconds: 100)));
    } else {
      Expect.isFalse(theController.isPaused); //Expect.isFalse(true) fails.
    }
  }, onDone: asyncEnd);
}

cc @lrhn

dart-github-bot commented 1 month ago

Summary: The MultiStreamController.isPaused property remains true after calling resume() on a paused stream subscription, even though the stream itself is no longer paused. This leads to incorrect behavior in the provided test case.

lrhn commented 1 month ago

TL;DR: Because stream-controllers lie about pause. It's deliberate.

In the optimal case, there is never any, or at least more than one, enqueued events. Events are delivered as soon as possible (first available microtask slot), and an event producer shouldn't produce events faster than they can be consumed. That leads to buffering, and consumers are expected to pause if they can't keep up with events.

When a stream subscription is paused, the consumer has said that it can't keep up with events. The producer shouldn't add any events while it's paused. If they do so (or over-delivered before the pause), and the subscription is resumed, the subscription keeps telling the controller that it's paused ("don't send more events yet") until it has cleared its pending events, until it has caught up with events.

That's not special to multi-stream controllers, you get the same effect using:

main() {
  asyncStart();
  var controller = StreamController<int>();
  controller.onListen = () {
    Expect.isFalse(controller.isPaused);
    controller.add(1);
    controller.add(2);
    controller.add(3);
  };
  theController = controller;
  var stream = controller.stream;
  listen(stream);
}

(It's not unsurprising that they behave the same way, they all use the same code. The StreamSubscription code doesn't care what kind of Stream it comes from, it just receives events and sends flow-control requests — pause/resume/cancel — the other way.)

If you add

    Timer(Duration(milliseconds: 200), () {
      controller.add(4);
    });

after the controller.add(3), you can see that that event is sent after pending event queue has been processed, and the controller is told that it's OK to send more events.

A stream subscription has two flags internally: "isPaused" (a counter, actually) and "input-paused" which is what it reports to the controller. When the subscription user's pause is resumed, if the queue is not empty, the input-paused flag isn't cleared, the pending events are just scheduled. When the last pending event has been delivered, and the subscription isn't paused, the internal-paused flag is cleared. (The actual code is:

void resume() {
    if (_isCanceled) return;
    if (_isPaused) {
      _decrementPauseCount();
      if (!_isPaused) {
        if (_hasPending && !_pending!.isEmpty) {
          // Input is still paused.
          _pending!.schedule(this);
        } else {
          assert(_mayResumeInput);
          _state &= ~_STATE_INPUT_PAUSED;
          if (!_inCallback) _guardCallback(_onResume);
        }
      }
    }
  }

Testing a controller by filling up its pending event queue with everything during the onListen call dosen't cover all its behaviors. That's one kind of behavior. Another would be events occurring at fixed intervales, like incoming network packages, whether the subscription is listening or not, or a stream which only produces new events while the subscription isn't paused.

(Also remember to close the controller.)

sgrekhov commented 1 month ago

Thank you for the explanation. Then, I'm going to change the test to:

...
    if (v == 1) {
      ss.pause(Future.delayed(Duration(milliseconds: 100)));
    } else {
      Expect.isTrue(theController.isPaused); // It's Ok. See https://github.com/dart-lang/sdk/issues/56915 for more details.
    }
...
lrhn commented 1 month ago

That should be correct. If we ever change behavior (and we haven't so far, so we probably won't) then this test will catch it.