dart-lang / stream_transform

Dart utility methods to create StreamTransfomer instances to manipulate Streams
https://pub.dev/packages/stream_transform
BSD 3-Clause "New" or "Revised" License
123 stars 21 forks source link

Transformers swallow new subscriptions for broadcast streams #132

Open simolus3 opened 3 years ago

simolus3 commented 3 years ago

Consider a stream that does something when being listened to:

import 'dart:async';
import 'package:stream_transform/stream_transform.dart';

class DoOnSubscribeStream<T> extends Stream<T> {
  final Stream<T> inner;
  final void Function() onSubscribe;

  DoOnSubscribeStream(this.inner, this.onSubscribe);

  @override
  bool get isBroadcast => inner.isBroadcast;

  @override
  StreamSubscription<T> listen(void Function(T event)? onData,
      {Function? onError, void Function()? onDone, bool? cancelOnError}) {
    onSubscribe();
    return inner.listen(onData,
        onError: onError, onDone: onDone, cancelOnError: cancelOnError);
  }
}

Now, let's say that stream is used as a broadcast stream that prints something for each new subscription. When using transformers in this package, new subscriptions downstream do not trigger the callback:

void main() {
  final controller = StreamController<void>.broadcast(
      onListen: () => print('controller.onListen'));
  final stream = DoOnSubscribeStream(controller.stream, () {
    print('new subscription');
  });

  var switched = stream.switchMap((event) => Stream.value(event));
  switched.listen(null); // prints!
  switched.listen(null); // does not print!
}

The full example prints:

I would expect it to print:

I know that the reason for this behavior is that the switchMap transformer uses a broadcast stream controller for the inner stream, and that only calls onListen once. However, I don't agree with this behavior because it assumes that each subscription to a broadcast stream emits the same events to be correct. I couldn't find any official source backing that assumption though. Another example for this is the Stream.multi constructor in the SDK. Is this the expected behavior?

void main() {
  var counter = 0;
  final stream = Stream<int>.multi((controller) {
    controller.add(++counter);
    controller.close();
  }, isBroadcast: true);

  var switched = stream.switchMap((event) => Stream.value(event));
  switched.listen(print); // prints 1
  switched.listen(print); // also prints 1 :(
}

See also: The discussion at https://github.com/ReactiveX/rxdart/issues/587

natebosch commented 3 years ago

The choice to use a single listener on broadcast streams was intentional, even though it does conflict with the default behavior from the SDK. @lrhn - do you think this was a bad idea? The Stream.multi API certainly does push against it.

I'm not sure what might be broken if we change that decision.

hoc081098 commented 3 years ago

Built-in operators such as map, where, take, ... do not follow that behavior, while rxdart and stream_transform do.

lrhn commented 3 years ago

I think stream modifiers should generally be lazy. If you call stream.someSome() and it returns a stream, then you shouldn't start listening on stream yet, and you should lazily propagate the listen calls from the result each time they're called. (Not all current stream modifiers do that, it was something we wanted to fix for Dart 2.0, but didn't have time for).

That's why it makes sense to forward the broadcast-status of the source stream through the modifier chain, because it determines whether you can listen more than once on the resulting modifier chain.

It's just not how broadcast stream controllers work, which is why we have a problem here. Multiple listen calls on a broadcast stream only ends up as one onListen call, and any modifications prior in the stream modifier stream only gets done once. It's annoying, not how I'd prefer it to work, etc., but it's how broadcast stream controllers work.

It's also exactly the right tool for this particular job!

The current switchLatest is written exactly as it should be. I'd be tempted to use Stream.multi instead, but it wouldn't work.

If the source stream is a broadcast stream of single-subscription streams, then having two listen calls act independently on the Stream events will cause one of them to get an error when it tries to listen. The switchLatest function can only truly be "multicast" if it's a broadcasts stream of broadcast streams, which we cannot possibly know from the types. That means that we must do a single listen on the element streams and broadcast it to all further listeners.

Or rather, it means that whether switchLatest should really be a broadcast stream depends on whether the inner streams are broadcast streams, not the outer stream, which is again something we can't know until we've seen each individual element stream. I'd consider always making it a non-broadcast stream.


I tried implementing it using Stream.multi.

import "dart:async";

extension<T> on Stream<Stream<T>> {
  Stream<T> switchLatest() {
    var isBroadcast = this.isBroadcast;
    return Stream<T>.multi((controller) {
      // Cache tear-offs.
      void Function(T)? add;
      void Function(Object, StackTrace) addError = controller.addErrorSync;
      StreamSubscription<T>? innerSubscription;
      var outerStreamDone = false;

      var outerSubscription = listen(
          (innerStream) {
            innerSubscription?.cancel();
            innerSubscription = innerStream.listen(add ??= controller.addSync,
                onError: addError, onDone: () {
              innerSubscription = null;
              if (outerStreamDone) controller.close();
            });
          },
          onError: addError,
          onDone: () {
            outerStreamDone = true;
            if (innerSubscription == null) controller.close();
          });
      controller
        ..onPause = () {
          innerSubscription?.pause();
          outerSubscription.pause();
        }
        ..onResume = () {
          innerSubscription?.resume();
          outerSubscription.resume();
        }
        ..onCancel = () {
          var outerCancel =
              !outerStreamDone ? outerSubscription.cancel() : null;
          var innerCancel = innerSubscription?.cancel();
          if (innerCancel == null) {
            if (outerCancel == null) {
              return null;
            }
            return outerCancel;
          }
          if (outerCancel == null) {
            return innerCancel;
          }
          return Future.wait([outerCancel, innerCancel]).then((_) => null);
        };
    }, isBroadcast: isBroadcast);
  }
}

void main() {
  Stream<Stream<int>> s(int n) async* {
    Stream<int> s(int n) async* {
      for (var i = 0; i < 20; i++) {
        yield i * n;
        await Future.delayed(Duration(milliseconds: 100), () {});
      }
    }

    for (var i = 0; i < 10; i++) {
      yield s(n++);
      await Future.delayed(Duration(milliseconds: 500), () {});
    }
  }

  s(1).map((s) => s.asBroadcastStream()).asBroadcastStream().switchLatest()
    ..forEach(print)
    ..forEach(print);
}

That looks fine, but it breaks if I don't have the .map((s) => s.asBroadcastStream()).

simolus3 commented 3 years ago

I see how switchMap is special because it being a broadcast stream depends on inner streams, but there are simpler stream transformers in this package with a similar behavior. The following script does not print anything:

import 'dart:async';
import 'package:stream_transform/stream_transform.dart';

void main() {
  final objects = [1, 'string'];
  var i = 0;
  final stream = Stream.multi((controller) {
    controller
      ..add(objects[i++])
      ..close();
  }, isBroadcast: true);

  final whereString = stream.whereType<String>();
  whereString.listen(print);
  whereString.listen(print);
}

whereString can reasonably be broadcast stream, but then the implementation of transformByHandlers can't use a broadcast stream controller. I tried changing that to use Stream.multi as well but got some test failures (possibly because pause/resume events aren't propagated synchronously with Stream.multi?).

lrhn commented 3 years ago

I agree, and would definitely want to try writing some of these operations using Stream.multi.

Something like:

extension WhereTypeExtension<T> on Stream<T> {
  Stream<S> whereType<S extends T>() {
    return Stream<S>.multi((controller) {
      var subscription = this.listen((value) {
        if (value is S) controller.addSync(value);
      }, onError: controller.addErrorSync, onDone: controller.closeSync);
      controller
        ..onPause = subscription.pause
        ..onResume = subscription.resume
        ..onCancel = subscription.cancel;
    }, isBroadcast: this.isBroadcast);
  }
}
natebosch commented 2 years ago

asyncExpand in the core libraries has similar behavior. I do think we are doing the right thing for concurrentAsyncExpand and switchMap.

One downside of matching map with whereType and others is the performance hit of repeated work in the case where there are multiple listeners and it wasn't a Stream.multi.

I suppose since it's already the case that authors need to be careful about repeated work with APIs from the SDK, it's better to have the capability to use these transformations on multi streams than not.

I'm also not sure the best way to document this. We don't currently document it on .map or .asyncExpand.