dart-lang / sdk

The Dart SDK, including the VM, JS and Wasm compilers, analysis, core libraries, and more.
https://dart.dev
BSD 3-Clause "New" or "Revised" License
10.12k stars 1.57k forks source link

async stream computation has stack unwinding before cancelled? #51241

Open phr34k opened 1 year ago

phr34k commented 1 year ago

Steps to Reproduce

Run the sample code I provided below, it's a small unit test and the code just uses basic dependencies.

To give a brief description. What I basically did was create an generator function that generates a infinite stream of websockets, when the socket is closed, it'll reconnect while respecting randomized timeouts. I then use asyncExpand function, to expand all of the websockets data and expose into a single Stream<dynamic> so that the caller will just have to listen once, and then get some notifications when a socket is disconnected or opened.

Expected results:

In the code above I expect two connects and two disconnects to happen. The second (last) disconnect should be because the StreamIterator closes, and it'll close the websocket normally, logically I'd expect it close normally with 1005.

Actual results:

However the actual results, seem that the moment I call iterator.cancel(), the onCancel function is the last function to get called, instead it seems like it first terminates first the active _generate() computation which is the source. What I observe is it'll break out of loop, unwinding the stack executing any remaining finally statements. The best analogy I think is as-if an invisible exception is thrown, and only after that it'll actually call the websocket.close().

The problem that this gives is that _generate gets unwinded, websocket.close() hasn't actually been called yet and the errorCode and errorStatus properties of the websocket are still null. It feels like these ordering of events are counter intuative- at least to me.

import 'dart:async';
import 'dart:io';
import 'package:async/async.dart';
import 'dart:developer' as developer;
import 'package:flutter_test/flutter_test.dart';

class WSocketSink extends DelegatingStreamSink {
  final WebSocket _webSocket;

  WSocketSink._(WebSocket webSocket)
      : _webSocket = webSocket,
        super(webSocket);
  @override
  Future close([int? closeCode, String? closeReason]) =>
      _webSocket.close(closeCode, closeReason);
}

class WSocket {
  late Stream<dynamic> _data;
  WSocketSink? _sink;
  WebSocket? innerWebsocket;

  Duration Function({required int attempts}) reconnect;
  String Function() uri;

  Function(Object e, {StackTrace stack}) onbeforeconnect; //expose callbacks
  Function() onconnect; //expose callbacks
  Function(int? closeCode, String? closeReason) ondisconnect; //expose callbacks
  Stream<dynamic> get stream => _data; //expose the stream
  WSocketSink? get sink => _sink; //expose the sink when websocket is alive

  Stream<WebSocket> _generate() async* {
    WebSocket? ret;

    try {
      int attempt = 0;
      while (true) {
        try {
          //connect to the websocket and after an successfull connection reset attempts back to 0
          ret = await WebSocket.connect(uri());
          attempt = 0;
        } catch (e, stack) {
          // absorb the error, and notify callbacks that the socket failed to connect
          onbeforeconnect(e, stack: stack);
          await Future.delayed(reconnect(attempts: attempt));
          attempt = attempt + 1;
          continue;
        }

        ret.pingInterval = const Duration(seconds: 30);

        // notify callbacks that the socket has succesfully connected
        onconnect();
        yield ret;

        // when the websocket get's cancelled because the stream subscription deattaches, this code isn't hit anymore. Instead
        // it breaks out of the loop altogether but it'll still execute finally clause.
        ondisconnect(ret.closeCode, ret.closeReason);
        developer.log(
            "randomized delay before reconnect, this isn't triggered when the iterator closes",
            name: 'wsocket');
        await Future.delayed(reconnect(attempts: attempt));
        attempt = attempt + 1;
      }
    } finally {
      if (ret != null) {
        developer.log("disconnecting because cancelled", name: 'wsocket');
        ondisconnect(ret.closeCode, ret.closeReason);
      }
    }
  }

  WSocket(
      {required this.reconnect,
      required this.uri,
      required this.onbeforeconnect,
      required this.onconnect,
      required this.ondisconnect}) {
    StreamController<dynamic> dataController =
        StreamController<dynamic>(sync: true);
    _data = dataController.stream;

    var s = _generate();

    late Future done;

    dataController.onPause = () {
      developer.log("onpause", name: 'wsocket');
    };

    dataController.onResume = () {
      developer.log("onresume", name: 'wsocket');
    };

    // if the listener get's cancelled, we need to cancel the socket to force the
    // existing asyncExpand to cancel.
    dataController.onCancel = () async {
      developer.log("perform iterator on cancel", name: 'wsocket');
      innerWebsocket!.close();
      return done;
    };

    // use _generate() to an generate a stream of websockets, and asyncExpand to merge all
    // their data into a new exposed stream.
    done = dataController.addStream(s.asyncExpand((event) {
      var s = event;
      innerWebsocket = s;
      _sink = WSocketSink._(s);
      return s;
    }), cancelOnError: true).then((value) => dataController.close());
  }
}

void main() {
  late HttpServer server;
  late StreamSubscription<WebSocket> subscription;
  setUp(() async {
    server = await HttpServer.bind(InternetAddress.anyIPv6, 0);
    subscription =
        server.transform(WebSocketTransformer()).listen((WebSocket webSocket) {
      developer.log('Server webSocket opened.', name: 'server');
      webSocket.add("hello");
      webSocket.listen((_) {
        webSocket.add("hello");
      }, onError: (err) {
        developer.log('Server webSocket error.', name: 'server');
      }, onDone: () {
        developer.log('Server webSocket closed.', name: 'server');
      });
    }, onError: (error, stack) {
      developer.log('Server webSocket error.', name: 'server');
    }, onDone: () {
      developer.log('Server webSocket error.', name: 'server');
    });
  });

  tearDown(() async {
    await subscription.cancel();
    await server.close(force: true);
  });

  test('accept websocket', () async {
    var errors = 0;
    var connects = 0;
    var disconnects = 0;

    var socks = WSocket(
        reconnect: ({required int attempts}) =>
            const Duration(milliseconds: 100),
        uri: () => "ws://127.0.0.1:${server.port}",
        onbeforeconnect: (Object error, {StackTrace? stack}) {
          developer.log("error connecting", name: 'client');
          errors = errors + 1;
        },
        onconnect: () {
          developer.log("onconnect", name: 'client');
          connects = connects + 1;
        },
        ondisconnect: (closeCode, closeReason) {
          developer.log("ondisconnect $closeCode $closeReason", name: 'client');
          disconnects = disconnects + 1;
          if (closeCode == WebSocketStatus.internalServerError) {
            throw "exit webserver";
          }
        });

    var iterator = StreamIterator(socks.stream);

    developer.log("start client websocket",
        time: DateTime.now(), name: 'client');

    // read fist message from the websocket stream (server websocket sends hello, on connect)
    developer.log("wait for hello", name: 'client');
    await iterator.moveNext();
    expect(iterator.current, "hello");

    expect(errors, 0);
    expect(connects, 1);
    expect(disconnects, 0);

    // emulate an disconnect on the websocket i.e. error through add/addStream, or ping/pong frame not responding etc.
    developer.log("disconnect", name: 'client');
    await socks.sink?.close();

    // read second message from the websocket stream (server websocket sends hello, on connect)
    developer.log("wait for hello", name: 'client');
    await iterator.moveNext();
    expect(iterator.current, "hello");

    expect(errors, 0);
    expect(connects, 2);
    expect(disconnects, 1);

    // disconnect the socket permanently by canceling the stream subscription e.g. won't reconnect
    developer.log("disconnect perm", name: 'client');
    await iterator.cancel();

    //await iterator.moveNext();
    //expect(iterator.current, "hello");

    expect(errors, 0);
    expect(connects, 2);
    expect(disconnects, 2);
  });
}
Connecting to VM Service at http://127.0.0.1:55120/APCRwMp8xRU=/ws
[client] start client websocket
[client] wait for hello
[server] Server webSocket opened.
[client] onconnect
[client] disconnect
[wsocket] onpause
[client] wait for hello
[wsocket] onresume
[server] Server webSocket closed.
[client] ondisconnect 1005
[wsocket] randomized delay before reconnect, this isn't triggered when the iterator closes
[server] Server webSocket opened.
[client] onconnect
[client] disconnect perm
[wsocket] disconnecting because cancelled
[client] ondisconnect null null
[wsocket] perform iterator on cancel
[server] Server webSocket closed.
✓ accept websocket
Exited
> flutter analyze
Analyzing core...                                                       
No issues found! (ran in 16.4s)
> flutter doctor -v
[√] Flutter (Channel master, 3.6.0-4.0.pre.29, on Microsoft Windows [Version 10.0.19044.2486], locale en-NL)
    • Flutter version 3.6.0-4.0.pre.29 on channel master at D:\SDK\flutter
    • Upstream repository https://github.com/flutter/flutter.git
    • Framework revision bf6d0f3980 (3 months ago), 2022-11-17 02:48:25 -0500
    • Engine revision 5bbf070c3f
    • Dart version 2.19.0 (build 2.19.0-406.0.dev)
    • DevTools version 2.19.0

[√] Windows Version (Installed version of Windows is version 10 or higher)

[√] Android toolchain - develop for Android devices (Android SDK version 32.1.0-rc1)
    • Android SDK at D:\SDK\android-sdk
    • Platform android-33, build-tools 32.1.0-rc1
    • Java binary at: D:\SDK\Android\Android Studio 2021.1.1.21\jre\bin\java
    • Java version OpenJDK Runtime Environment (build 11.0.11+9-b60-7590822)
    • All Android licenses accepted.

[√] Chrome - develop for the web
    • Chrome at C:\Program Files\Google\Chrome\Application\chrome.exe

[√] Visual Studio - develop for Windows (Visual Studio Community 2022 17.4.0)
    • Visual Studio at C:\Program Files\Microsoft Visual Studio\2022\Community
    • Visual Studio Community 2022 version 17.4.33103.184
    • Windows 10 SDK version 10.0.19041.0

[√] Android Studio (version 2021.1)
    • Android Studio at D:\SDK\Android\Android Studio 2021.1.1.21
    • Flutter plugin can be installed from:
       https://plugins.jetbrains.com/plugin/9212-flutter
    • Dart plugin can be installed from:
       https://plugins.jetbrains.com/plugin/6351-dart
    • Java version OpenJDK Runtime Environment (build 11.0.11+9-b60-7590822)

[√] VS Code (version 1.74.3)
    • VS Code at C:\Users\phpfr\AppData\Local\Programs\Microsoft VS Code
    • Flutter extension version 3.50.0

[√] Connected device (3 available)
    • Windows (desktop) • windows • windows-x64    • Microsoft Windows [Version 10.0.19044.2486]
    • Chrome (web)      • chrome  • web-javascript • Google Chrome 109.0.5414.76
    • Edge (web)        • edge    • web-javascript • Microsoft Edge 109.0.1518.70

[√] HTTP Host Availability
    • All required HTTP hosts are available

• No issues found!

moved from here: https://github.com/flutter/flutter/issues/119790

lrhn commented 1 year ago

I have not grokked the code yet, but unwinding sounds like correct and expected behavior.

When you cancel a stream subscription from a stream of an async* function, the effect is that all later yield statements behave as a return, going through every finally on the way out of the method body. That's deliberate, to allow the function to release resources that it no longer needs. The Future returned by StreamSubscription.cancel is completed when the method body is exited.

phr34k commented 1 year ago

If you don't have an oppurtunity to gracefully cancel resources, I'd argue that the order of events is incorrect. Isn't the whole point of having an oncancel callback, so you can make whatever state changes that will set-it up to break out of existing computations, and then clean-up resources.

var f = StreamSubscriber.cancel()
method body unwinding
StreamController.oncancel()
await f;

In this case it means that because of the method body unwinding, the websocket is unsubsribed (since running in asyncExpand) which causes it not process close when issued later when oncancel is called. I feel like it's more intuetive that oncancel should actually be first to be called- because from what I can tell in my situation of the async computation isn't unwinded first the close would have worked as intended.

phr34k commented 1 year ago

@lrhn eventually I made this (https://gist.github.com/phr34k/620c9af014e763e6be5591eeb1d36868#file-websocket_test-dart-L78) variation of the code, to guarentee that ondisconnect callback happens in the right semantical ordering, throwing the finalizer body into it's own deferred microtask.

But the only way to correctly get the status codes, is throwing an delay of 5 seconds (websocket implementation detail). When there's no listener anymore (stack unwinding does this) it's not processing any data more incl. websocket frames in response to termination issued, instead it uses a (5s) timer to defer setting the closeReason/closeCode.

I would argue that most is just simple cause and effect from the fact it simply cannot attempt an gracefull shutdown first.

lrhn commented 1 year ago

Can you point to the part that you think is not working as it should.

The entire system is a very complicated interleaving of multiple streams, some async*, some from controllers, some affected by stream transformers or asyncExpand.

The behavior of an async* function is already described. Cancelling its subscription makes it exit the body in an orderly manner, the finallys of the body doing the equivalent of the onCancel of a stream controller to clean up, and complete the returned future when the clean-up is complete.

The behavior of asyncExpand should be that when its subscription is cancelled, it cancels the current inner stream subscription, if one is active, and when that has completed, it cancels the outer stream subscription (which is paused while there is an inner stream subscription), and when that's done, its own cancel call future is completed.

Both of these seem to be working as intended.

I'm not familiar enough with web-sockets or WebSocketTransformation to know what they are supposed to do.

phr34k commented 1 year ago

@lrhn i believe in my previous posts I've already touched on my observations that the current system does not allow for an gracefull shutdown of the socket

facts:

The websocket stream [3] get's ubsubscribed before oncancel [5] is executed, so there's no way possible to let it attempt an gracefull shutdown and the correct shutdown state of the websocket has to be obtained with implementation detail trickery [6].

Just because the invidual parts are working as intended and considered sound doesn't mean the sum of them their parts must be sound also. Evidently in this case you'd actually want the onCancel control the canceling of the inner streams (websocket), and after onCancel's future returns terminate the stack.

lrhn commented 1 year ago

So the problem is with WebSocket?

StreamIterator.cancel immediately cancels its subscription using StreamSubscription.cancel, and returns the result future. That's reasonable. (I recommend using StreamQueue over StreamIterator, because it's nicer, but StreamIterator does work as intended, when it's used correctly, and it is here.)

The StreamIterator is used on the stream of a synchronous StreamController, so calling the StreamIterator.cancel requests a cancel from the stream controller.

The StreamController is most likely executing an addStream of an asyncExpand. The stream controller cancels by first cancelling the addStream, and then calling its own onCancel callback to return a future saying that cancel is done.

The asyncExpand is internally using another stream controller and an addStream, so cancelling the expand-stream cancels the inner stream (if one is active), then cancels the outer stream, and then it's done.

The inner stream of asyncExpand is a web socket emitted as event by the stream created by _generate, so that gets cancelled. Then the outer stream is cancelled, which is the one of _generate, paused at the yield ret; statement. Cancelling an async* stream subscription makes all current and following yield statements act like a return. As you noticed, that means that the code immediately following that yield is not executed. If you need the code to be run even after the stream is cancelled, you should put the code into a finally block too, and you should expect every yield to be able to return.

I think everything here is working as intended.

phr34k commented 1 year ago

Sort of, websockets is a (manifestation of the) problem but not convinced it is the source of the problem.

Normally with communication standards (websockets incl.) it isn't uncommon that you coordinate your shutdown before finally closing the underlying sockets.

That isnt an immediate action and can take some time. What happens is this process seems to get initiated, but lacking a subscription responses stop being processed.

If you read the websocket code you'll see all kinds of weird twists with timers inheritly because of disattached stream subscriber.

I'd argue if enforcing an idiom (stream) causes it to misbehave it must be the idiom at fault.

Don't get me wrong: I admit what you said isnt unreasonable. Individually each these element behaves as you'd expect.

So you tell me your thoughts if I did something unreasonable- if its a specific bug in websocket that need solving or if async/stream mechanism has some shortcomings.

On Sun, Feb 5, 2023, 22:33 Lasse R.H. Nielsen @.***> wrote:

So the problem is with WebSocket?

StreamIterator.cancel immediately cancels its subscription using StreamSubscription.cancel, and returns the result future. That's reasonable. (I recommend using StreamQueue over StreamIterator, because it's nicer, but StreamIterator does work as intended, when it's used correctly, and it is here.)

The StreamIterator is used on the stream of a synchronous StreamController, so calling the StreamIterator.cancel requests a cancel from the stream controller.

The StreamController is most likely executing an addStream of an asyncExpand. The stream controller cancels by first cancelling the addStream, and then calling its own onCancel callback to return a future saying that cancel is done.

The asyncExpand is internally using another stream controller and an addStream, so cancelling the expand-stream cancels the inner stream (if one is active), then cancels the outer stream, and then it's done.

The inner stream of asyncExpand is a web socket emitted as event by the stream created by _generate, so that gets cancelled. Then the outer stream is cancelled, which is the one of _generate, paused at the yield ret; statement. Cancelling an async* stream subscription makes all current and following yield statements act like a return. As you noticed, that means that the code immediately following that yield is not executed. If you need the code to be run even after the stream is cancelled, you should put the code into a finally block too, and you should expect every yield to be able to return.

I think everything here is working as intended.

— Reply to this email directly, view it on GitHub https://github.com/dart-lang/sdk/issues/51241#issuecomment-1418049500, or unsubscribe https://github.com/notifications/unsubscribe-auth/AAJCQ4YNT5U4SNKMRA46QSDWV7B4FANCNFSM6AAAAAAUPTVMWE . You are receiving this because you authored the thread.Message ID: @.***>

lrhn commented 1 year ago

Using a stream might have been the wrong approach. If you can't pause the source, and you get events that must be reacted to, then a stream is not a good choice, because calling pause and then cancel can leave some events unhandled in the subscription buffer.

The source should then buffer those events while the subscription is paused, so it knows to properly dispose them if the subscription cancels.

I don't know the websocket code well enough to say whether that's even the problem, or how easy it is to fix. Let's rope in some dart:io people.