ReactiveX / rxdart

The Reactive Extensions for Dart
http://reactivex.io
Apache License 2.0
3.37k stars 271 forks source link

Unexpected behaviour of BehaviorSubject under FakeAsync #395

Open zeekhuge opened 4 years ago

zeekhuge commented 4 years ago

The following test with BehaviourSubject fails while the one with StreamController works fine. I have tried this with rxDart versions 0.22.4, 0.22.5 and 0.23.1. None of them passes.

import 'dart:async';

import 'package:fake_async/fake_async.dart';
import 'package:flutter_test/flutter_test.dart';
import 'package:rxdart/rxdart.dart';

void main () {
    test("BehavioudSubject under fake-async", () => fakeAsync((async) {
        int output;
        methodUnderTest(BehaviorSubject())
            .then((val) => output = val);

        async.flushMicrotasks();

        expect(output, 1);
    }));

    test("StreamController under fake-async", () => fakeAsync((async) {
        int output;
        methodUnderTest(StreamController())
            .then((val) => output = val);

        async.flushMicrotasks();

        expect(output, 1);
    }));
}

Future<int> methodUnderTest (StreamController<int> controller) async {
    controller.add(1);
    return await controller.stream.first;
}
zeekhuge commented 4 years ago

Using the fakeAsync as following : test("...", () { fakeAsync((async) async { ... }); }) actually solves the issue.

zeekhuge commented 4 years ago

But I am still unable to get it working using WidgetTester. This comment does not work for me. In fact, that comment seems wrong since this test passes:

    testWidgets('again test', (WidgetTester tester) async {
        await Future.value(() async {
            throw "Exception";
        });
    });
zeekhuge commented 4 years ago

Okay, theirs more . Even using test("...", () { fakeAsync((async) async { ... }); }) is not correct. The following test passes :

    test("test", () {fakeAsync((async) async {
        throw "Exception";
    });});

Which basically means suggestion in this comment is not working either.

Also, the following test, i.e. adding Future.microtask(() => null) never completes (and thus times-out):

       test("BehavioudSubject under fake-async", () => fakeAsync((async) async {
        int output;
        methodUnderTest(BehaviorSubject())
            .then((val) => output = val);

        async.flushMicrotasks();
        await Future.microtask(() => null);

        expect(output, 1);
    }));

Maybe #365 should be opened again and this be closed, being a duplicate ?

zeekhuge commented 4 years ago

This is becoming more troublesome. This test case, although prints the stack-trace, passes. It should actually fail.

 testWidgets("Exception after future", (tester) async {
    /* set mocks and other */
    var m = () async {
        await methodUnderTest(BehaviorSubject());
//      If their is any future here, even the exception stack-trace does not get printed
//      Future.delayed(Duration(microseconds: 10));
        throw "Exception";
    };

    m();

    await tester.pump(Duration(days: 1));
});
dmitryelagin commented 4 years ago

I believe this behavior caused by Dart itself. BehaviorSubject uses StartWithStreamTransformer under the hood, which has the following line: https://github.com/ReactiveX/rxdart/blob/master/lib/src/transformers/start_with.dart#L43

Here subscription.cancel() returns Future; next operation will start with its whenComplete method. In some cases this Future is pre-defined Future._nullFuture and is tied to the root zone, as you can see here: https://github.com/dart-lang/sdk/blob/master/sdk/lib/async/future.dart#L151

If Future is resolved, then whenComplete method will schedule its callback with scheduleMicrotask method of the zone where it was resolved. And in cases with Future._nullFuture it is the root zone. So, fakeAsync zone doesn't know anything about this callback and can't call it.

See the following issues for more information: https://github.com/dart-lang/sdk/issues/40131 https://github.com/google/quiver-dart/issues/583

I researched this because I have a similar problem in my tests, but with .debounce():

import 'dart:async';

import 'package:quiver/testing/async.dart';
import 'package:rxdart/rxdart.dart';
import 'package:test/test.dart';

void main() {
  test('Counter should be incremented', () {
    FakeAsync().run((fakeAsync) {
      var counter = 0;

      final _controller = StreamController<void>();

      Observable(_controller.stream)
        .debounceTime(const Duration(minutes: 1))
        .listen((_) {
          print('increment');
          counter += 1;
        });

      _controller.add(null);

      fakeAsync.flushTimers();

      expect(counter, 1);
    });
  });
}

This test fails with rxdart 0.22.6 (but works as expected with rxdart 0.22.4):

Expected: <1>
  Actual: <0>

package:test_api                                         expect
test/dart_test.dart 25:7                                 main.<fn>.<fn>
package:quiver/testing/src/async/fake_async.dart 196:24  _FakeAsync.run.<fn>
dart:async                                               _CustomZone.runGuarded
package:quiver/testing/src/async/fake_async.dart 195:11  _FakeAsync.run
test/dart_test.dart 9:17                                 main.<fn>

increment
✖ Counter should be incremented

I still don't have a proper workaround. It seems that the cheapest way is to change

onCancel: () => subscription.cancel()

to

onCancel: () => Future.value(subscription.cancel())

This will override the zone of the Future to the one we can control. But it must be done in RxDart itself which is not correct since it is not a problem with RxDart.

Still, I hope that somebody from the RxDart team will help us to find a good workaround.

IgorZubov commented 4 years ago

Me too. Waiting for any updates.

brianegan commented 4 years ago

Hi all -- thanks so much to @dmitryelagin -- that is an amazing investigation. I'll be honest, we've tried to use and make FakeAsync work in the past, but it does some odd things that have led me to really question whether it should ever be used to test Streams, since you can't guarantee the behavior under test will be the behavior you get when the app is actually running.

However, since Flutter uses FakeAsync everywhere, I think we need to support it.

Overall, this is a bigger change that will requires a lot of additional tests. I'll try to find some time this week or early next to go through and add FakeAsync tests for, well everything, to ensure using a custom future instead the constant nullFuture fixes everything up!

Thanks for the report and the great investigation.

brianegan commented 4 years ago

Update: I've taken some time to do a prototype of this change. However, when I think about it a bit more, making the suggested change fixes the bugs under FakeAsync conditions, but IMO would actually be a bad change under "normal" conditions.

The core problem: By returning a Future.value(subscription.cancel()), the inner cancellation subscription is essentially completely ignored, because the core Stream code only listens to when the outer Future.value is resolved:

if (_cancelFuture != null &&
        !identical(_cancelFuture, Future._nullFuture)) {
      _cancelFuture.whenComplete(sendDone);
    }

This means that if the inner onCancel future should be properly awaited before emitting the done event to the Stream, that functionality would be lost.

Another issue: RxDart isn't the only library affected by this problem. For example, regular old Stream.periodic Streams also exhibit this issue with FakeAsync, because they also use Future._nullFuture under the hood -- and that problem has been reported a few different times in the past.

Therefore, it feels like while we could make this change to RxDart, it's actually not a good change, since it would fix the issues for a rather unique, heavily modified Test environment at the cost of risking unexpected behavior in the "normal" production environments. I think the proper solution is to request more help from the Dart and Quiver teams on how best to resolve this issue.

rbellens commented 4 years ago

I found the following solution/workaround for the problem. The problem is that when you have some asynchronous task with both microtasks that are registered in the fake async zone and microtasks that are registered in the root zone (because of the Future._nullFuture being used in e.g. StreamSubscription.cancel), the flushMicrotasks will stop on the first microtask in the root zone. This microtask will later be executed and might register additional microtasks in the fake async zone, but those will never be executed. A possible solution is to alternately flush microtasks in the fake async zone (with flushMicrotasks) and in the root zone (by awaiting a microtask registered in the root zone) until all the work is done.

A function creating a test case under fake async could then look like:

@isTest
void testUnderFakeAsyncFixed(String description, Function() body,
    {Timeout timeout}) {
  test(description, () async {
    var fakeAsync = FakeAsync();
    var f = fakeAsync.run((async) {
      return body();
    });

    var isDone = false;
    unawaited(f.whenComplete(
        () => isDone = true)); // check if all work in body has been done
    while (!isDone) {
      // flush the microtasks in real async zone
      await Future.microtask(() => null);
      // flush the microtasks in the fake async zone
      fakeAsync.flushMicrotasks();
    }

    return f;
  }, timeout: Timeout(Duration(seconds: 1)));
}

A fix for testWidgets could look like:

@isTest
void testWidgetsFixed(
    String description, Future<void> Function(WidgetTester) body,
    {Timeout timeout}) {
  testWidgets(description, (tester) async {
    var f = body(tester);

    var isDone = false;
    unawaited(f.whenComplete(() => isDone = true));
    // while not done register a microtask in the real async zone
    while (!isDone) {
      await tester.runAsync(() => Future.microtask(() => null));
    }

    return f;
  }, timeout: timeout);
}

A full example of this with some tests:

import 'dart:async';

import 'package:fake_async/fake_async.dart';
import 'package:flutter_test/flutter_test.dart';
import 'package:meta/meta.dart';
import 'package:pedantic/pedantic.dart';

void main() {
  Future Function() createFuture;

  Future<void> methodUnderTest() async {
    await Future.microtask(
        () => null); // creates a microtask in fake async zone
    await createFuture(); // creates a microtask in real async zone
    await Future.microtask(() => null);
    await createFuture();
    await Future.microtask(() => null);
  }

  group('StreamSubscription cancel on root Zone', () {
    createFuture = () {
      return StreamController()
          .stream
          .listen((value) {})
          .cancel(); // cancel returns the static value Future._nullFuture
    };

    test('under real async', methodUnderTest); // this succeeds
    testUnderFakeAsync('under fake async', methodUnderTest,
        timeout: Timeout(Duration(seconds: 1))); // this times out
    testUnderFakeAsyncFixed('under fake async', methodUnderTest,
        timeout: Timeout(Duration(seconds: 1))); // this succeeds
    testWidgets('under testWidgets', (tester) => methodUnderTest(),
        timeout: Timeout(Duration(seconds: 1))); // this times out
    testWidgetsFixed('under testWidgets fixed', (tester) => methodUnderTest(),
        timeout: Timeout(Duration(seconds: 1))); // this succeeds
  });

  group('Future from root zone', () {
    createFuture = () {
      return Zone.root.run(() => Future.value());
    };

    test('under real async', methodUnderTest); // this succeeds
    testUnderFakeAsync('under fake async', methodUnderTest,
        timeout: Timeout(Duration(seconds: 1))); // this times out
    testUnderFakeAsyncFixed('under fake async', methodUnderTest,
        timeout: Timeout(Duration(seconds: 1))); // this succeeds
    testWidgets('under testWidgets', (tester) => methodUnderTest(),
        timeout: Timeout(Duration(seconds: 1))); // this times out
    testWidgetsFixed('under testWidgets fixed', (tester) => methodUnderTest(),
        timeout: Timeout(Duration(seconds: 1))); // this succeeds
  });
}

@isTest
void testUnderFakeAsync(String description, Function() body,
    {Timeout timeout}) {
  test(description, () async {
    var fakeAsync = FakeAsync();
    var f = fakeAsync.run((async) {
      return body();
    });

    // flush the microtasks created in the fake async zone
    // this blocks on first microtask in real async zone
    fakeAsync.flushMicrotasks();

    return f;
  }, timeout: Timeout(Duration(seconds: 1)));
}
rbellens commented 14 hours ago

It seems most of the time, this issue is caused by the StreamSubscription.cancel, which returns a Future._nullFuture created in the root zone instead of the zone calling the cancel method. See also these issues https://github.com/google/quiver-dart/issues/583 and https://github.com/dart-lang/sdk/issues/40131#issuecomment-2335991349.

Although the cause is not within the rxdart package, it could implement some workarounds. Currently, each StreamTransformer that uses the cancel method in its implementation might cause problems. For example, since 0.28.0, the switchMap first cancels the subscription to the current stream before starting to listen to the next one. This could be fixed by checking if the returned future is the Future._nullFuture and if so, use Future.value(null) instead. Although, Future._nullFuture is private, we can access it by setting the static variable final _nullFuture = StreamController().stream.listen((_){}).cancel(). Might feel a bit hacky, but should not break anything even if the internal implementation in the dart sdk changes.

Similarly, the streams returned/created in this package could be wrapped, so that the cancel method on the subscriptions do not return the Future._nullFuture.

If you want, I can try to implement these changes and do a pull request, but I am not sure that you will accept them. So, before spending time on it, I'd like to know if you would accept such an approach.