ReactiveX / rxdart

The Reactive Extensions for Dart
http://reactivex.io
Apache License 2.0
3.37k stars 271 forks source link

Partition stream helper #436

Open EricHurt opened 4 years ago

EricHurt commented 4 years ago

RxJS implemented the partition operator which creates two streams from a source and a predicate. It's something I've been waiting for, so I thought it would be useful in RxDart as well.

I put together a crude example below. Unfortunately, without destructuring, RxDart needs its own class for returning the streams. The example could probably be improved some, but the general idea is there.

An alternative is converting the class to a helper function that is imported by the Rx class and used Rx.partition => PartitionedStreams.

class Partition<T> {

  static PartitionStreams<T> partition<T>(Stream<T> source, bool Function(T value) predicate) {
    if (source == null) {
      throw ArgumentError('source cannot be null');
    } else if (predicate == null) {
      throw ArgumentError('predicate cannot be null');
    }
    var sourceBroadcast = source.asBroadcastStream();

    StreamSubscription<T> trueSubscription;
    StreamController<T> trueController;

    StreamSubscription<T> falseSubscription;
    StreamController<T> falseController;

    trueController = StreamController(
      sync: true,
      onListen: () {
        trueSubscription =
            sourceBroadcast.listen((value) {
              if (predicate(value)) {
                trueController.add(value);
              }
            },
          onError: trueController.addError,
          onDone: () {
            if (!trueController.isClosed) {
              trueController.close();
            }
          },
        );
      },
      onPause: ([Future<dynamic> resumeSignal]) =>
          falseSubscription.pause(resumeSignal),
      onResume: () => trueSubscription.resume(),
      onCancel: () => trueSubscription.cancel(),
    );

    falseController = StreamController(
      sync: true,
      onListen: () {
        falseSubscription =
            sourceBroadcast.listen((value) {
              if (!predicate(value)) {
                falseController.add(value);
              }
            },
          onError: falseController.addError,
          onDone: () {
            if (!falseController.isClosed) {
              falseController.close();
            }
          },
        );
      },
      onPause: ([Future<dynamic> resumeSignal]) =>
          falseSubscription.pause(resumeSignal),
      onResume: () => falseSubscription.resume(),
      onCancel: () => falseSubscription.cancel(),
    );

    return PartitionStreams<T>(trueController, falseController);
  }
}

class PartitionStreams<T> {
  final StreamController<T> trueController;
  final StreamController<T> falseController;

  PartitionedStreams(this.trueController, this.falseController);
}
hoc081098 commented 4 years ago

I have mentioned it https://github.com/ReactiveX/rxdart/issues/382#issuecomment-567823906 👍. But Dart has not Destructuring assignment syntax(https://github.com/dart-lang/language/issues/207) like ES6, that makes the API not fluent