ReactiveX / rxdart

The Reactive Extensions for Dart
http://reactivex.io
Apache License 2.0
3.36k stars 271 forks source link

Deadlocks in tests after upgrading rxdart #587

Closed Mike278 closed 3 years ago

Mike278 commented 3 years ago

Took another stab at upgrading off rxdart 0.23.1, but unfortunately still getting some failing tests. Here's a repro/comparison (could probably be minimized further but not sure which parts are relevant).

import 'package:meta/meta.dart';
import 'package:moor/ffi.dart';
import 'package:moor/moor.dart';
import 'package:rxdart/rxdart.dart';
import 'package:test/test.dart';

class TestDb extends GeneratedDatabase {
  TestDb() : super(SqlTypeSystem.withDefaults(), VmDatabase.memory());
  @override final List<TableInfo> allTables = const [];
  @override final int schemaVersion = 1;
}

Future<void> testCase({
  @required Stream<int> Function() createOuter,
  @required Stream<int> Function() createInner,
}) async {
  final log = <int>[];
  final timeout = Duration(milliseconds: 100);

  final a = createOuter();
  final b = a.switchMap((_) => createInner());

  b.listen(log.add);
  await b.first.then(log.add)
    .timeout(timeout, onTimeout: () => fail('1st should complete'));
  expect(log, [2, 2]);

  b.listen(log.add);
  await b.first.then(log.add)
    .timeout(timeout, onTimeout: () => fail('2nd should complete'));
  expect(log, [2, 2, 2, 2]);
}

void main() {
  group('rxdart upgrade', () {

    test("moor", () async {
      final db = TestDb();
      Stream<int> selectInt(int i) => db
        .customSelect('select $i a')
        .map((row) => row.read<int>('a'))
        .watchSingle();
      await testCase(
        createOuter: () => selectInt(1),
        createInner: () => selectInt(2),
      );
    });

    test("rxdart", () async {
      final outer = BehaviorSubject<int>();
      final tc = testCase(
        createOuter: () => outer,
        createInner: () {
          final inner = BehaviorSubject<int>();
          Future.delayed(Duration(milliseconds: 10)).then((_) => inner.add(2));
          return inner;
        }
      );
      await Future.delayed(Duration(milliseconds: 10));
      outer.add(1);
      await tc;
    });
  });
}

Since this began after upgrading rxdart I've started by creating an issue here, but @simolus3 might be able to help determine if this is a moor bug.

477 and #500 might have some context from the last time we tried to upgrade off rxdart 0.23.1.

I looked around for similar issues since last time and came #511 looks similar. The 1st repro in that issue still times out with rxdart 0.27.0, even though the 2nd repro looks fixed.

simolus3 commented 3 years ago

I'm taking a look at this now. Interestingly both tests pass if you cancel the first subscription before the second run:

  final sub = b.listen(log.add);
  await b.first
      .then(log.add)
      .timeout(timeout, onTimeout: () => fail('1st should complete'));
  expect(log, [2, 2]);
  await sub.cancel();
simolus3 commented 3 years ago

My take is that RxDart is doing some understandable but questionable things here. First, it looks like the root of the failure is from forwardingStream:

https://github.com/ReactiveX/rxdart/blob/3975769b7d45a8b6e99aab56c319df3a8589bcaf/lib/src/utils/forwarding_stream.dart#L68-L74

This function is used by most transformers from RxDart, including switchMap. It maps broadcast streams to broadcast streams (which is reasonable), but the broadcast stream controller will only invoke the onListen callback for the first listener. This means that the "inner" stream will only get listened to once! Since moor streams emit their cached value in the onListen callback (where else would they do it), you don't get a value on the second subscription.

The reason RxDart subjects don't suffer from the same behavior is because forwardingStream treats them specially. For example, we can use the following class to obtain a Stream that should behave exactly like a Subject wrt. to the Stream api in dart:async: Extracted to gist due to length.

When one only uses the stream api from dart:async, a subject should be indistinguishable from _SyncProxyStream(subject). However, this change causes the rxdart test to fail as well:

      final outer = BehaviorSubject<int>();
      final tc = testCase(
          createOuter: () => _SyncProxyStream(outer), // Test fails because of this
          createInner: () {
            final inner = BehaviorSubject<int>();
            Future.delayed(Duration(milliseconds: 10))
                .then((_) => inner.add(2));
            return inner;
          });
      await Future.delayed(Duration(milliseconds: 10));
      outer.add(1);
      await tc;

This means that RxDart's transformers don't reliably work on every Stream, which is really unfortunate. At the same time, I understand the need to keep "broadcastability" through stream transformers which probably wouldn't work otherwise.

My suggestion would be to make forwardingStream return a single-subscription stream when the input is not a Subject, but I'm not sure if there are negative consequences of that as well. It's certainly a breaking change.

frankpepermans commented 3 years ago

To understand the issue correctly, do you expect that onListen fires on every new Subscription?

Because, that is not the case, for example:

import 'dart:async';

void main() {
  var count = 0;
  final controller = StreamController<int>.broadcast(onListen: () => print('listen to me! ${++count}'));

  controller.stream.listen((_) {});
  controller.stream.listen((_) {});
  controller.stream.listen((_) {});
  controller.stream.listen((_) {});
}

Here, the print invokes only once, "listen to me! 1".

As I recall, this was actually wrong in earlier rxdart versions, where onListen would actually do trigger on every subscription.

If you cancel them all, and then do a new subscription, then onListen invokes again.

simolus3 commented 3 years ago

do you expect that onListen fires on every new Subscription?

In general, yes! I know that the onListen callback from StreamController.broadcast is not necessarily invoked for every subscription, but I don't think that RxDart should make that assumption for every broadcast stream.

For instance, consider this example:

class DoOnSubscribeStream<T> extends Stream<T> {
  final Stream<T> inner;
  final void Function() onSubscribe;

  DoOnSubscribeStream(this.inner, this.onSubscribe);

  @override
  bool get isBroadcast => inner.isBroadcast;

  @override
  StreamSubscription<T> listen(void Function(T event)? onData,
      {Function? onError, void Function()? onDone, bool? cancelOnError}) {
    onSubscribe();
    return inner.listen(onData,
        onError: onError, onDone: onDone, cancelOnError: cancelOnError);
  }
}

void main() {
  final controller = StreamController.broadcast();
  final stream = DoOnSubscribeStream(controller.stream, () {
    print('new subscription');
  });

  stream.listen(null); // prints!
  stream.listen(null); // prints!
  stream.listen(null); // prints!

  final switched = stream.switchMap((_) => Stream.empty());
  switched.listen(null); // prints!
  switched.listen(null); // does not print -- why?
}

I think it's perfectly reasonable to expect the last line to print subscribed as well.

In fact, there are less-contrived examples of this as well. For instance, let's say someone writes their own broadcast stream that emits a cached value for each new listener if one is available (moor is doing pretty much that). Those streams depend on new listens not being dropped by some transformer down the line.

In my opinion, the special case for subjects in forwardingStream shows that new subscriptions should propagate even for broadcast streams. If those weren't special cased, switchMap wouldn't properly work for behavior subjects for instance. I strongly feel that RxDart shouldn't give its own stream implementations any special treatment, it's fragile and breaks for transformations that may be implemented by third-party libraries.

frankpepermans commented 3 years ago

We only use forwardingStream for transformations that need to do something on special events, like onListen, onPause, onResume or onCancel. Not all our transformers need it, some use a normal Stream sink.

But these events fire just like they would on "normal" Dart streams, and we follow the normal Stream behavior with rxdart.

frankpepermans commented 3 years ago

Ok I now get what you mean,

You invoke on the actual listen method, not on the onListen events, in that case, yes we have a difference in rxdart indeed...

Will see how breaking that would be

frankpepermans commented 3 years ago

See here for a PR which would fix your issue: https://github.com/ReactiveX/rxdart/pull/588

Mike278 commented 3 years ago

Thank you both for the quick responses! Glad to see it's a simple fix. I'll try to give that branch a try tomorrow.

frankpepermans commented 3 years ago

@Mike278 Sorry I was a bit too fast there, the fix would be a little more complex unfortunately, I'll keep you posted :/

frankpepermans commented 3 years ago

So that PR branch probably works for your listen issue, but we need a better solution of course, it's a bit hacky atm.

Do feel free to try it out in the meantime of course.

hoc081098 commented 3 years ago

do you expect that onListen fires on every new Subscription?

In general, yes! I know that the onListen callback from StreamController.broadcast is not necessarily invoked for every subscription, but I don't think that RxDart should make that assumption for every broadcast stream.

RxDart's forwarding stream behavior is the same as fromHandlers (stream_transform) https://github.com/dart-lang/stream_transform/blob/533e1af154629959545e3a3728312af6cb4f5619/lib/src/from_handlers.dart#L10. I think that is expected behavior.

frankpepermans commented 3 years ago

Yes, but the issue here is that they expect the listen handler to be invoked on each listen:

import 'dart:async';

import 'package:rxdart/rxdart.dart';

class DoOnSubscribeStream<T> extends Stream<T> {
  final Stream<T> inner;
  final void Function() onSubscribe;

  DoOnSubscribeStream(this.inner, this.onSubscribe);

  @override
  bool get isBroadcast => inner.isBroadcast;

  @override
  StreamSubscription<T> listen(void Function(T event)? onData,
      {Function? onError, void Function()? onDone, bool? cancelOnError}) {
    onSubscribe();
    return inner.listen(onData,
        onError: onError, onDone: onDone, cancelOnError: cancelOnError);
  }
}

void main() {
  final controller =
      StreamController<void>.broadcast(onListen: () => print('I start!'));
  final stream = DoOnSubscribeStream(controller.stream, () {
    print('new subscription');
  });

  var switched = stream.switchMap((event) => Stream.value(event));
  switched.listen(null); // prints!
  switched.listen(null); // does not print!

  var mapped = stream.map((event) => event);
  mapped.listen(null); // prints!
  mapped.listen(null); // prints!
}

it prints "I start!" once, as expected, but if you override the listen handler and expect it to run on each actual listen, then it is inconsistent.

hoc081098 commented 3 years ago

IMO DoOnSubscribeStream means calling callback when controller.stream is listen to, because the Stream is broadcast, it should be listen once :))

hoc081098 commented 3 years ago

Only built-in operators cause the inconsistent, but stream_transform package (from dart-lang org) does not follow that behavior 😕

simolus3 commented 3 years ago

`IMO DoOnSubscribeStream means calling callback when controller.stream is listen to

I agree.

because the Stream is broadcast, it should be listen once

Isn't the whole point of broadcast streams that they can be listened to multiple times? :D

I wonder if package:stream_transform should change their behavior as well. I'll open an issue there, we may get more insights on on whether the behavior is intentional or not then.

Mike278 commented 3 years ago

Hmm the repro still fails for me on that branch:

dependencies:
  # ...
  rxdart:

# ...
dependency_overrides:
  rxdart:
    git:
      url: https://github.com/ReactiveX/rxdart.git
      ref: d6a7761cb74761f4f8b6e3663c445f28a145a75b
  rxdart:
    dependency: "direct main"
    description:
      path: "."
      ref: d6a7761cb74761f4f8b6e3663c445f28a145a75b
      resolved-ref: d6a7761cb74761f4f8b6e3663c445f28a145a75b
      url: "https://github.com/ReactiveX/rxdart.git"
    source: git
    version: "0.27.0"
Mike278 commented 3 years ago

It looks like https://github.com/ReactiveX/rxdart/pull/588 fixes at least some of the problem. I tried to combine all the different examples from this issue into one runnable test: https://gist.github.com/Mike278/f21c92e562428af26af58128d0209b00

frankpepermans commented 3 years ago

Is it an option to move the code in the listen override into an onListen handler?

Mike278 commented 3 years ago

My understanding is that would mean the code is only invoked each time the listener count goes from 0 to 1, but the goal is to invoke the code each time the listener count is incremented.

I think I can reduce the remaining failures to this case:

import 'dart:async';

import 'package:rxdart/rxdart.dart';
import 'package:test/test.dart';

class WrappedStream<T> extends Stream<T> {
  final Stream<T> inner;

  WrappedStream(this.inner);

  @override
  bool get isBroadcast => inner.isBroadcast;

  @override
  StreamSubscription<T> listen(void Function(T event) onData,
      {Function onError, void Function() onDone, bool cancelOnError}) {
    return inner.listen(onData,
        onError: onError, onDone: onDone, cancelOnError: cancelOnError);
  }
}

void main() {
  test('rxdart upgrade', () async {
    final controller = BehaviorSubject.seeded('controller');
    final stream = WrappedStream(controller.stream);

    final switched = stream.switchMap((_) {
      return BehaviorSubject.seeded('switched');
    });

    final timeout = Duration(milliseconds: 100);

    switched.listen(null); // note: commenting this out makes the test pass on 0.27

    final value1 = await switched.first
        .timeout(timeout, onTimeout: () => fail('1st should complete'));
    expect(value1, 'switched');

    final value2 = await switched.first
        .timeout(timeout, onTimeout: () => fail('2nd should complete')); // timeout here with rxdart 0.27 and PR#588
    expect(value2, 'switched');
  });
}
Mike278 commented 3 years ago

@frankpepermans any idea what's up with that test failure above? Other than that it looks like the PR is really close!

frankpepermans commented 3 years ago

It's not really a good PR though, and will never be merged in the current state, but it was an effort to see if it could resolve your problem (which it apparently almost does then).

Can you maybe explain in more detail why you'd need every new subscription to invoke listen on all upstream targets?

...maybe we can also think of a different solution then?

simolus3 commented 3 years ago

Can you maybe explain in more detail why you'd need every new subscription to invoke listen on all upstream targets?

In case of moor, it comes from three requirements basically:

Regarding the third point, there's also a philosophical argument to be made that RxDart should extend Dart's streams so I think it's unfortunate if moor has to use RxDart's subjects to be compatible with it.

frankpepermans commented 3 years ago

Ok I think I understand the issue now, so we use ForwardStream to add some hooks that we need for some transformers, and indeed, we have special cases for our own Subjects in there.

We do that indeed to maintain the behavior, i.e. to not suddenly switch from, say a BehaviorSubject, to a plain StreamController, because that would lose the "emit last event on subscribe" behavior.

Correct?

simolus3 commented 3 years ago

We do that indeed to maintain the behavior, i.e. to not suddenly switch from, say a BehaviorSubject, to a plain StreamController, because that would lose the "emit last event on subscribe" behavior.

Yes exactly. I don't think it's bad to keep that behavior (it's essentially an optimization when the source stream is known to avoid duplicate computations of transformers). I would prefer new subscriptions to go through for non-subject broadcast streams though.

Lasse from the Dart team suggested using Stream.multi as a suitable implementation for the forwarding stream here. It doesn't have the same problem because it treats each downstream subscription individually. But I think that's essentially the ForwardingStream you've introduced in your PR. He also added some notes on how switchMap may be a special transformer (but the general problem applies to other forwarding transformations too).

Mike278 commented 3 years ago

Thank you both again for your collaboration here! @frankpepermans Do you think an approach based on Stream.multi would be a good way to move forward?

frankpepermans commented 3 years ago

@Mike278 not sure, but I'll try to make some time to investigate

frankpepermans commented 3 years ago

I did a few attempts to get Stream.multi working, but unfortunately it always breaks on the many use cases we have, most notably the doOnX transformers.

Also, a bit of an annoyance is that subscribing to a Stream.multi always yields a StreamSubscription, even if the underlying Stream is not a broadcast Stream for example, since the description is deferred internally and the StateError that we expect, throws at a different point in time.

frankpepermans commented 3 years ago

...actually, bit more tinkering, might be able to get it up without breaking too much, @Mike278 could you try: https://github.com/ReactiveX/rxdart/tree/stream_multi ?

Mike278 commented 3 years ago

I haven't tried it out yet, but having a quick look at the code I think there might be a few issues. For example controller.onListen = maybeListen; doesn't do anything according to the Stream.multi docs: Setting its StreamController.onListen has no effect since the onListen callback is called instead, and the StreamController.onListen won't be called later.

Also, a bit of an annoyance is that subscribing to a Stream.multi always yields a StreamSubscription, even if the underlying Stream is not a broadcast Stream

I think the idea for Stream.multi/MultiStreamController is to almost introduce a 3rd type of stream that can act like either/both/neither (in fact the PR that added it mentioned that it's "more like a StreamController2"

A multi-subscription stream can behave like any other stream. If the onListen callback throws on every call after the first, the stream behaves like a single-subscription stream. If the stream emits the same events to all current listeners, it behaves like a broadcast stream.

frankpepermans commented 3 years ago

maybeListen is being called, if onListen does nothing, then we can remove it.

Ignore that comment I made before, all 700+ tests now pass, with using Stream.multi

Mike278 commented 3 years ago

I had a chance to try this out. The moor test from the repro in the OP no longer times out (woo! 🥳), but now both tests fail because an extra onData event is delivered. Here's a smaller repro:

test('duplicate events', () async {
  final source = BehaviorSubject.seeded('source');
  final switched = source.switchMap((value) => BehaviorSubject.seeded('switched'));
  int i = 0;
  switched.listen((_) => i++);
  expect(await switched.first, 'switched');
  expect(i, 1);
  expect(await switched.first, 'switched');
  expect(i, 1); // fails because i==2
});
rxdart:
  dependency: "direct main"
  description:
    path: "."
    ref: stream_multi
    resolved-ref: "2f465a4c5f86dc8a95359150efdda6aa2bc41c09"
    url: "https://github.com/ReactiveX/rxdart.git"
  source: git
  version: "0.27.1"
hoc081098 commented 3 years ago

@Mike278 Could you try https://github.com/ReactiveX/rxdart/pull/605

Mike278 commented 3 years ago

Looks like that works, all our tests pass with #605 - no more deadlocks!