ReactiveX / rxdart

The Reactive Extensions for Dart
http://reactivex.io
Apache License 2.0
3.36k stars 271 forks source link

Add listener for Subjects suscriptions (enhancement) #665

Closed todoFixIt closed 2 years ago

todoFixIt commented 2 years ago

There are many cases where is useful to be able to listen when a Subject is being suscribed to and also when the last suscription is removed, and the number of suscriptons too.

something like subject.listenSuscriptions(void Function(int suscriptions))

A real life example: a BehaviorSubject<bool> isLocationEnabled that starts a task when listened to to check if the user has the GPS enabled, and the task is stopped once the last suscription is removed.

Thanks in advance.

hoc081098 commented 2 years ago

You can use StreamController.onListen and StreamController.onCancel

https://replit.com/@hoc081098/FatalLikableVirtualmemory#main.dart:4:30

import 'dart:async';

import 'package:rxdart/rxdart.dart';

Stream<bool> observeLocationEnabled() => Stream.periodic(const Duration(milliseconds: 400), (i) => i.isOdd);

class Service {
  final _isLocationEnabled = BehaviorSubject<bool>.seeded(false);

  ValueStream<bool> get isLocationEnabled$ => _isLocationEnabled;

  Service() {
    StreamSubscription<void>? subscription;
    _isLocationEnabled.onListen = () {
      print("onListen");
      subscription ??= observeLocationEnabled().listen(
        _isLocationEnabled.add,
        onError: _isLocationEnabled.addError,
      );
    };
    _isLocationEnabled.onCancel = () {
      print("onCancel");
      final future = subscription?.cancel();
      subscription = null;
      return future;
    };
  }
}

void main() async {
  final service = Service();
  print("Start...");

  final sub1 = service.isLocationEnabled$.listen((v) => print("[1] $v"));
  await Future<void>.delayed(const Duration(seconds: 1));
  final sub2 = service.isLocationEnabled$.listen((v) => print("[2] $v"));
  await Future<void>.delayed(const Duration(seconds: 1));

  await sub1.cancel();
  await sub2.cancel();
}
todoFixIt commented 2 years ago

@hoc081098 I didn't notice those functions, I guess with that is enoguh for what I want to achieve, thanks.

  /// cancelled. Often called doOnUnsubscribe or doOnDispose in other
  /// implementations.
  ///
  /// ### Example
  ///
  ///     final subscription = TimerStream(1, Duration(minutes: 1))
  ///       .doOnCancel(() => print('hi'))
  ///       .listen(null);
  ///
  ///     subscription.cancel(); // prints 'hi'
  Stream<T> doOnCancel(FutureOr<void> Function() onCancel)

  /// Invokes the given callback function when the stream is first listened to.
  ///
  /// ### Example
  ///
  ///     Stream.fromIterable([1])
  ///       .doOnListen(() => print('Is someone there?'))
  ///       .listen(null); // prints 'Is someone there?'
  Stream<T> doOnListen(void Function() onListen)