ReactiveX / rxdart

The Reactive Extensions for Dart
http://reactivex.io
Apache License 2.0
3.36k stars 272 forks source link

shareValue() not behaving as expected #452

Open feinstein opened 4 years ago

feinstein commented 4 years ago

Sorry if this maybe is a question, but I don't think that shareValue() is behaving as it was expected to (or at least how I understood it was supposed to work, which in this case it could be something to add to the docs).

I have this code (adapted here for simplicity) on my library:

static final StreamController<FirebaseUser> _userReloadedStreamController = 
    StreamController<FirebaseUser>.broadcast();
static Stream<FirebaseUser> get onUserReloaded => _userReloadedStreamController.stream;
...
onAuthStateChangedOrReloaded = 
    Rx.merge([FirebaseAuth.instance.onAuthStateChanged, onUserReloaded]).shareValue();

Where I am returning a Stream that is supposed to merge 2 streams and every time someone listens to it, they will be able to receive the last emitted value. FirebaseAuth.instance.onAuthStateChanged has an implementation that resembles a Behavior Subject, every time there's a new subscriptions it emits the last value, but it doesn't use RxDart.

I was testing this inside an App where there's just one subscriber at first, when this subscriber is first created things work fine, but then the subscriber is destroyed and when a new subscriber comes the Stream stops behaving normally, there are only 1 or 0 subscribers at any given time. Here are some logs to illustrate better what's going on:

This is the Widget I am using for testing:

class Wrapper extends StatefulWidget {
  Wrapper() {
    print('created');
  }

  @override
  _WrapperState createState() => _WrapperState();
}

class _WrapperState extends State<Wrapper> {
  @override
  Widget build(BuildContext context) {
    return StreamBuilder<FirebaseUser>(
        stream: FirebaseAuth.instance.onAuthStateChanged,
        builder: (context, snapshot) {
          print(snapshot.connectionState);
          print('snapshot.data(user): ${snapshot.data}');
          return Container();
        });
  }

  @override
  void initState() {
    super.initState();
    print('init');
  }

  @override
  void dispose() {
    print('disposed');
    super.dispose();
  }
}

It just logs the different states of the Widget.

Using the stream: FirebaseAuth.instance.onAuthStateChanged These are the logs that I get when Wrapper is created and destroyed multiple times:

I/flutter (21763): created I/flutter (21763): init I/flutter (21763): ConnectionState.waiting I/flutter (21763): snapshot.data(user): null I/flutter (21763): ConnectionState.active I/flutter (21763): snapshot.data(user): FirebaseUser(Instance of 'PlatformUser') I/flutter (21763): ConnectionState.waiting I/flutter (21763): snapshot.data(user): FirebaseUser(Instance of 'PlatformUser') I/flutter (21763): ConnectionState.active I/flutter (21763): snapshot.data(user): FirebaseUser(Instance of 'PlatformUser') I/flutter (21763): disposed I/flutter (21763): init I/flutter (21763): ConnectionState.waiting I/flutter (21763): snapshot.data(user): null I/flutter (21763): ConnectionState.active I/flutter (21763): snapshot.data(user): FirebaseUser(Instance of 'PlatformUser') I/flutter (21763): disposed I/flutter (21763): init I/flutter (21763): ConnectionState.waiting I/flutter (21763): snapshot.data(user): null I/flutter (21763): ConnectionState.active I/flutter (21763): snapshot.data(user): FirebaseUser(Instance of 'PlatformUser') I/flutter (21763): disposed I/flutter (21763): init I/flutter (21763): ConnectionState.waiting I/flutter (21763): snapshot.data(user): null I/flutter (21763): ConnectionState.active I/flutter (21763): snapshot.data(user): FirebaseUser(Instance of 'PlatformUser')

Basically, it goes from ConnectionState.waiting to ConnectionState.active as expected.

Now, these are the logs that I get when I use onAuthStateChangedOrReloaded (the one using shareValue()) instead:

I/flutter (21763): created I/flutter (21763): init I/flutter (21763): ConnectionState.waiting I/flutter (21763): snapshot.data(user): null I/flutter (21763): ConnectionState.active I/flutter (21763): snapshot.data(user): FirebaseUser(Instance of 'PlatformUser') I/flutter (21763): disposed I/flutter (21763): init I/flutter (21763): ConnectionState.waiting I/flutter (21763): snapshot.data(user): FirebaseUser(Instance of 'PlatformUser') I/flutter (21763): ConnectionState.done I/flutter (21763): snapshot.data(user): FirebaseUser(Instance of 'PlatformUser') I/flutter (21763): disposed I/flutter (21763): init I/flutter (21763): ConnectionState.waiting I/flutter (21763): snapshot.data(user): FirebaseUser(Instance of 'PlatformUser') I/flutter (21763): ConnectionState.done I/flutter (21763): snapshot.data(user): FirebaseUser(Instance of 'PlatformUser') I/flutter (21763): disposed

As you can see, the stream created by shareValue() works as expected on the first subscription, than later it goes instantaneously from ConnectionState.waiting to ConnectionState.done.

Is this the expected behavior? I couldn't find it documented anywhere. Should I be doing something differently?

Azbesciak commented 9 months ago

@hoc081098 I did not mention refCount anywhere and that was on purpose. refCount is an implementation detail which means that there's no requirement that shareValue has to use it.

BTW after the cancel (I may have amnesia, maybe mentioned before) it does not start listening when new listeners come.

So if all listeners stopped to listen, it is broken if you want to subscribe again.

In short, you would need to create a new stream.

AlexDochioiu commented 9 months ago

If anyone is interested, I wrote my own solution for a share stream that self-dispose if there's no listeners (with option for a delay before disposal) but can be observed again with no problems after disposal.

P.s. Have been using this in production for months with no issues.

import 'dart:async';

import 'package:rxdart/rxdart.dart';

/// A [Stream] that converts a single-subscription Stream into a broadcast Stream that replays
/// the latest value to any new listener while subscribed (if [replayLastValueToNewListeners] is true.
/// Once all subscribers have canceled or the stream completes, the underlying subscription is canceled.
///
/// Adding listeners after the stream has completed/disposed and the [delayBeforeCancel] time has passed
/// will result in a new broadcast stream being created with [_factory] and shared to new listeners.
class RefCountReusableStream<T> extends Stream<T> {
  final bool replayLastValueToNewListeners;
  final Duration delayBeforeCancel;
  final void Function()? onCancel;
  final Stream<T> Function() _factory;
  StreamSubscription<T>? _subscription;
  StreamController<T>? _controller;

  RefCountReusableStream._(
    this._factory, {
    required this.replayLastValueToNewListeners,
    required this.delayBeforeCancel,
    this.onCancel, // called after delayBeforeCancel once all listeners have canceled (can be called more than once)
  });

  /// Creates a [RefCountReusableStream] that will create a new broadcast stream using [_factory] and
  /// share it to new listeners. New listeners WILL NOT receive the last value emitted by the stream.
  RefCountReusableStream.publish(
    Stream<T> Function() factory, {
    Duration delayBeforeCancel = const Duration(),
    void Function()? onCancel,
  }) : this._(
          factory,
          replayLastValueToNewListeners: false,
          delayBeforeCancel: delayBeforeCancel,
          onCancel: onCancel,
        );

  /// Creates a [RefCountReusableStream] that will create a new broadcast stream using [_factory] and
  /// share it to new listeners. New listeners WILL receive the last value emitted by the stream.
  RefCountReusableStream.behaviour(
    Stream<T> Function() factory, {
    Duration delayBeforeCancel = const Duration(),
    void Function()? onCancel,
  }) : this._(
          factory,
          replayLastValueToNewListeners: true,
          delayBeforeCancel: delayBeforeCancel,
          onCancel: onCancel,
        );

  Timer? _disposeTask;

  void _requestDispose() {
    _disposeTask ??= Timer(delayBeforeCancel, () {
      _subscription?.cancel();
      if (onCancel != null) onCancel!();
      _subscription = null;
      _controller = null;
      _disposeTask = null;
    });
  }

  @override
  StreamSubscription<T> listen(void Function(T)? onData,
      {Function? onError, void Function()? onDone, bool? cancelOnError}) {
    _disposeTask?.cancel();
    _disposeTask = null;

    if (_controller == null) {
      _controller = replayLastValueToNewListeners
          ? BehaviorSubject(onCancel: () => _requestDispose(), sync: true)
          : PublishSubject(onCancel: () => _requestDispose(), sync: true);
      _subscription = _factory().listen(_controller!.add, onError: _controller!.addError, onDone: _controller!.close);
    }

    return _controller!.stream.listen(onData, onError: onError, onDone: onDone, cancelOnError: cancelOnError);
  }
}
Azbesciak commented 9 months ago

Basically, is not that the source of our problem?

/// A special [StreamSubscription] that not only cancels the connection to
/// the source [Stream], but also closes down a subject that drives the Stream.
class ConnectableStreamSubscription<T> extends StreamSubscription<T> {
  final StreamSubscription<T> _source;
  final Subject<T> _subject;

  /// Constructs a special [StreamSubscription], which will close the provided subject
  /// when [cancel] is called.
  ConnectableStreamSubscription(this._source, this._subject);

  @override
  Future<dynamic> cancel() =>
      _source.cancel(); //.then<void>((_) => _subject.close()); <------ there

As I went through the base code, I see multiple issues - you just close the input based on receivers - even if it will emit something later, it is just ignored even if it is subscribed (some partial changes in connectable_stream, in AbstractConnectableStream [_connection is late final] and in ConnectableStreamSubscription mentioned above, but I have no more patience).

Close does not mean "no listeners" for now - it just cut it off, like cutting a head. It should not be irreversible. It just should signal to the emitter that no receivers for now, not to kill it.

Do you kill your bus driver when you get off as the last one?

Azbesciak commented 9 months ago

@AlexDochioiu Thank you for that, I just wanted to write the same stuff on my own :)

hoc081098 commented 9 months ago

Basically, is not that the source of our problem?

/// A special [StreamSubscription] that not only cancels the connection to
/// the source [Stream], but also closes down a subject that drives the Stream.
class ConnectableStreamSubscription<T> extends StreamSubscription<T> {
  final StreamSubscription<T> _source;
  final Subject<T> _subject;

  /// Constructs a special [StreamSubscription], which will close the provided subject
  /// when [cancel] is called.
  ConnectableStreamSubscription(this._source, this._subject);

  @override
  Future<dynamic> cancel() =>
      _source.cancel(); //.then<void>((_) => _subject.close()); <------ there

Yes, I think we can expand the current API with more options like rxjs: resetOnComplete, ...

https://github.com/ReactiveX/rxjs/blob/master/packages/rxjs/src/internal/operators/share.ts#L5-L47

Azbesciak commented 9 months ago

@hoc081098 I am not sure if it makes sense to expand - that is logical behavior and natural to other RX env users.

Contrary question - there were no more listeners, it was closed. But you want to listen to the behavior state once again.

What is the benefit of having it closed/not usable? You even cannot check if that was closed.

It should not be dependent on the fact somebody stopped to listen to that if I will get messages or not.

hoc081098 commented 9 months ago

@hoc081098 I am not wonder is it make sense to expand - that is logical behavior and natural to other RX env users.

Contrary question - there were no more listeners, it was closed. But you want to listen to the behavior state once again.

What is the benefit of having it closed/not usable? You even cannot check if that was closed.

It should not be dependent on the fact somebody stopped to listen to that if I will get messages or not.

TLTR: Dart Stream does not match 100% of Rx spec.

The root cause is when using share with Single-Subscription Stream.

Stream<int> singleSubscriptionStream = ...;

var shared = singleSubscriptionStream.share();

var sub = shared.listen(null);
// After that ...
sub.cancel();
// After that...

shared.listen(null); <-- singleSubscriptionStream is listen twice -> throws Error
Azbesciak commented 9 months ago

OK, but is not that the reason for broadcast?

hoc081098 commented 9 months ago

Because of that, ConnectableStream will close the internal Subject when onCancel happens, to avoid this error.

But there is an edge case, for something likes this:

var shared = Rx.fromCallable(lambda, reusable: true).share()

We still want to re-listen to shared, and re-execute lambda again. So the best way is adding more options and keeping the old behavior as default.

rorystephenson commented 3 months ago

If anyone is interested, I wrote my own solution for a share stream that self-dispose if there's no listeners (with option for a delay before disposal) but can be observed again with no problems after disposal.

P.s. Have been using this in production for months with no issues.

import 'dart:async';

import 'package:rxdart/rxdart.dart';

/// A [Stream] that converts a single-subscription Stream into a broadcast Stream that replays
/// the latest value to any new listener while subscribed (if [replayLastValueToNewListeners] is true.
/// Once all subscribers have canceled or the stream completes, the underlying subscription is canceled.
///
/// Adding listeners after the stream has completed/disposed and the [delayBeforeCancel] time has passed
/// will result in a new broadcast stream being created with [_factory] and shared to new listeners.
class RefCountReusableStream<T> extends Stream<T> {
  final bool replayLastValueToNewListeners;
  final Duration delayBeforeCancel;
  final void Function()? onCancel;
  final Stream<T> Function() _factory;
  StreamSubscription<T>? _subscription;
  StreamController<T>? _controller;

  RefCountReusableStream._(
    this._factory, {
    required this.replayLastValueToNewListeners,
    required this.delayBeforeCancel,
    this.onCancel, // called after delayBeforeCancel once all listeners have canceled (can be called more than once)
  });

  /// Creates a [RefCountReusableStream] that will create a new broadcast stream using [_factory] and
  /// share it to new listeners. New listeners WILL NOT receive the last value emitted by the stream.
  RefCountReusableStream.publish(
    Stream<T> Function() factory, {
    Duration delayBeforeCancel = const Duration(),
    void Function()? onCancel,
  }) : this._(
          factory,
          replayLastValueToNewListeners: false,
          delayBeforeCancel: delayBeforeCancel,
          onCancel: onCancel,
        );

  /// Creates a [RefCountReusableStream] that will create a new broadcast stream using [_factory] and
  /// share it to new listeners. New listeners WILL receive the last value emitted by the stream.
  RefCountReusableStream.behaviour(
    Stream<T> Function() factory, {
    Duration delayBeforeCancel = const Duration(),
    void Function()? onCancel,
  }) : this._(
          factory,
          replayLastValueToNewListeners: true,
          delayBeforeCancel: delayBeforeCancel,
          onCancel: onCancel,
        );

  Timer? _disposeTask;

  void _requestDispose() {
    _disposeTask ??= Timer(delayBeforeCancel, () {
      _subscription?.cancel();
      if (onCancel != null) onCancel!();
      _subscription = null;
      _controller = null;
      _disposeTask = null;
    });
  }

  @override
  StreamSubscription<T> listen(void Function(T)? onData,
      {Function? onError, void Function()? onDone, bool? cancelOnError}) {
    _disposeTask?.cancel();
    _disposeTask = null;

    if (_controller == null) {
      _controller = replayLastValueToNewListeners
          ? BehaviorSubject(onCancel: () => _requestDispose(), sync: true)
          : PublishSubject(onCancel: () => _requestDispose(), sync: true);
      _subscription = _factory().listen(_controller!.add, onError: _controller!.addError, onDone: _controller!.close);
    }

    return _controller!.stream.listen(onData, onError: onError, onDone: onDone, cancelOnError: cancelOnError);
  }
}

@AlexDochioiu I'm looking for the behaviour that I think your RefCountReusableStream.behaviour() implements but I'm running in to unexpected results. I've written a couple of test cases to demonstrate it (the first test throws, the second test passes but the printed output is not as expected).

  1. When using RefCountReusableStream.behaviour() with a non-broadcast stream as the source I get an exception when I close all listeners and then add a new listener. This is perhaps expected behaviour given how dart stream work.
  2. When using RefCountReusableStream.behaviour() with a broadcast stream it doesn't seem to replay the value that was added to the stream before a listener is added (see printed output when running it). That means values added before the first listener don't get replayed and if we add a value, close all listeners and then add another listener the new listener doesn't get the last value.
void main() {
  test('RefCountReusableStream.behaviour() from non-broadcast', () async {
    final controller = StreamController<int>();

    final shareStream =
        RefCountReusableStream.behaviour(() => controller.stream);
    controller.add(1);
    controller.add(2);
    debugPrint('Start listening');
    final sub1 = shareStream.listen((event) => debugPrint('l1: e$event'));
    final sub2 = shareStream.listen((event) => debugPrint('l2: e$event'));
    controller.add(3);
    // Give the event loop a chance to pass the event to the listeners.
    await Future.delayed(Duration.zero);
    await sub1.cancel();
    await sub2.cancel();

    // Give the event loop time to perform the auto-close logic.
    await Future.delayed(Duration.zero);

    // Fails when it tries to listen here with:
    // Bad state: Stream has already been listened to.
    final sub3 = shareStream.listen((event) => debugPrint('l3: e$event'));
    final sub4 = shareStream.listen((event) => debugPrint('l4: e$event'));
    controller.add(4);
    await Future.delayed(Duration.zero);
    await sub3.cancel();
    await sub4.cancel();
  });

  test('RefCountReusableStream.behaviour() from broadcast', () async {
    final controller = StreamController<int>.broadcast();

    final shareStream =
        RefCountReusableStream.behaviour(() => controller.stream);
    controller.add(1);
    controller.add(2); // Does not get replayed to the listeners
    debugPrint('Start listening');
    final sub1 = shareStream.listen((event) => debugPrint('l1: e$event'));
    final sub2 = shareStream.listen((event) => debugPrint('l2: e$event'));
    controller.add(3);
    // Give the event loop a chance to pass the event to the listeners.
    await Future.delayed(Duration.zero);
    await sub1.cancel();
    await sub2.cancel();

    // Give the event loop time to perform the auto-close logic.
    await Future.delayed(Duration.zero);

    // Fails when it tries to listen here with:
    // Bad state: Stream has already been listened to.
    final sub3 = shareStream.listen((event) => debugPrint('l3: e$event'));
    final sub4 = shareStream.listen((event) => debugPrint('l4: e$event'));
    controller.add(4);
    // Give the event loop a chance to pass the event to the listeners.
    await Future.delayed(Duration.zero);
    await sub3.cancel();
    await sub4.cancel();
  });
}