ReactiveX / rxdart

The Reactive Extensions for Dart
http://reactivex.io
Apache License 2.0
3.37k stars 270 forks source link

Problems with Observables in debug mode in flutter #329

Closed rullyalves closed 5 years ago

rullyalves commented 5 years ago

While coding my Observables-based application, I noticed that in some cases when using the operators: merge or mergeWith, had problems like: "stream has already been listened to", Even though converting Observable to a ValueObservable, I also noticed that this problem seems to happen only in Flutter debug mode, I may be wrong, but is there any explanation for this?

frankpepermans commented 5 years ago

Hey there,

Could you share some code so I can reproduce the issue?

Thanks!

rullyalves commented 5 years ago

unfortunately my project is private, but I can try to separate some code for it, I also noticed that when debugging and having many breakpoints, streams seem to lose data, strangely.

rullyalves commented 5 years ago

Strangely enough, if you get the error "stream already heard", even when using publishValue and autoConnect (), here is a code that can reproduce this problem:

https://github.com/rullyalves/Ponyvie---Flutter-Movie-App/blob/patch-1/lib/blocs/favorite_bloc.dart


import 'package:bloc_pattern/bloc_pattern.dart';
import 'package:ponyvie/models/item_model.dart';
import 'package:rxdart/rxdart.dart';

class FavoriteBloc extends BlocBase {
  static FavoriteBloc _instance = FavoriteBloc._internal();

  factory FavoriteBloc.getInstance() {
    if (_instance == null) _instance = FavoriteBloc._internal();
    return _instance;
  }

  FavoriteBloc._internal() {
    final list = Observable.just(<MovieModel>[]).publishValue().autoConnect();

    final addItem = _add.withLatestFrom(
        list, (item, List<MovieModel> list) => list..add(item));

    final removeItem = _remove.withLatestFrom(
        list, (item, List<MovieModel> list) => list..remove(item));

    items = Observable.merge([addItem,removeItem]).publishValue().autoConnect();
  }

  final _add = BehaviorSubject<MovieModel>();
  final _remove = BehaviorSubject<MovieModel>();

  Observable<int> total;
  Observable<List<MovieModel>> items;

  void add(MovieModel i) => _add.add(i);
  void remove(MovieModel i) => _remove.add(i);

  @override
  void dispose() {
    _items.close();
    _add.close();
    _remove.close();
    _instance = null;
    super.dispose();
  }
}