ReactiveX / rxdart

The Reactive Extensions for Dart
http://reactivex.io
Apache License 2.0
3.37k stars 270 forks source link

[Question] How to stop or kill an Observable.periodic(Duration(seconds:1)) within a BLOC #270

Closed AlexandreRoba closed 5 years ago

AlexandreRoba commented 5 years ago

Dear All,

I have a widget that has a state. In this state I'm instanciating a bloc. This bloc has a private variable scheduler which is instantiated with an Observable.periodic. Basically the bloc keep pulling a state from the backend. My issue is that this bloc cleans all the stream is has when the widget is disposed. But magically the scheduler in the bloc keeps pulling data every 1 seconds and tries to returns it to the different streams but they have been disposed.

I tried to set the schecduler to null in the dispose of my bloc but it keeps pulling the data from the back. How do you clean that Observable.periodic?.... Any help appreciated.

AlexandreRoba commented 5 years ago

I have created an exemple of what I'm talking about:

import 'package:flutter/material.dart';
import 'package:rxdart/rxdart.dart';
import 'dart:async';

void main() => runApp(MyApp());

class MyApp extends StatelessWidget {
  // This widget is the root of your application.
  @override
  Widget build(BuildContext context) {
    return MaterialApp(
      title: 'Flutter Demo',
      theme: ThemeData(
        primarySwatch: Colors.blue,
      ),
      home: HomePage(),
    );
  }
}

class HomePage extends StatelessWidget {
  @override
  Widget build(BuildContext context) {
    return Scaffold(
      body: Container(child: Center(child: RaisedButton(child:Text('Show Timer'),onPressed: (){
        Navigator.of(context).push(MaterialPageRoute(builder: (context)=>TimerPage()));
      },),),),
    );
  }
}

class TimerPageBloc {
  Observable _scheduler;
  PublishSubject<String> _time;

  TimerPageBloc() {
    _scheduler = Observable.periodic(Duration(seconds: 1));
    _time = PublishSubject<String>();
    _scheduler.listen((_){
      print("I'm still kicking and alive....");
      var now = DateTime.now();
      _time.add('${now.hour}:${now.minute}:${now.second}');
    });
  }

  Stream<String> get time => _time.stream;

  void dispose() {
    _scheduler = null;
    _time.close();
  }
}

class TimerPage extends StatefulWidget {
  @override
  _TimerPageState createState() => _TimerPageState();
}

class _TimerPageState extends State<TimerPage> {

  TimerPageBloc _bloc;

  @override
  void initState() {
    super.initState();
    _bloc = TimerPageBloc();
  }

  @override
  void dispose() {
    _bloc.dispose();
    super.dispose();
  }

  @override
  Widget build(BuildContext context) {
    return Scaffold(
      appBar: AppBar(
        title: Text('Timer'),
      ),
      body: Center(
        child: Column(
          mainAxisAlignment: MainAxisAlignment.center,
          children: <Widget>[
            Text(
              'You have pushed the button this many times:',
            ),
            StreamBuilder<String>(
              stream: _bloc.time,
              builder: (context, snapshotTime) {
                return Text(
                  '${snapshotTime.data}',
                  style: Theme.of(context).textTheme.display1,
                );
              }
            ),
            RaisedButton(child:Text('Leave'),onPressed: (){
              Navigator.of(context).pop();
            },)
          ],
        ),
      ),
    );
  }
}

And this is what i get in the logs when I ask for the timer page

flutter: I'm still kicking and alive....
flutter: I'm still kicking and alive....
flutter: I'm still kicking and alive....
flutter: I'm still kicking and alive....
flutter: I'm still kicking and alive....
flutter: I'm still kicking and alive....
flutter: I'm still kicking and alive....
flutter: I'm still kicking and alive....

Then when I leave the timer...

flutter: I'm still kicking and alive....
[VERBOSE-2:ui_dart_state.cc(148)] Unhandled Exception: Bad state: Cannot add new events after calling close
#0      _BroadcastStreamController.add (dart:async/broadcast_stream_controller.dart:249:24)
#1      Subject._add (package:rxdart/src/subjects/subject.dart:135:16)
#2      Subject.add (package:rxdart/src/subjects/subject.dart:129:5)
#3      new TimerPageBloc.<anonymous closure> (package:timer_issue/main.dart:43:13)
#4      _rootRunUnary (dart:async/zone.dart:1132:38)
#5      _CustomZone.runUnary (dart:async/zone.dart:1029:19)
#6      _CustomZone.runUnaryGuarded (dart:async/zone.dart:931:7)
#7      _BufferingStreamSubscription._sendData (dart:async/stream_impl.dart:336:11)
#8      _BufferingStreamSubscription._add (dart:async/stream_impl.dart:263:7)
#9      _SyncStreamController._sendData (dart:async/stream_controller.dart:764:19)
#10     _StreamController._add (dart:async/stream_controller.dart:640:7)
#11     _StreamController.add (dart:async/stream_controller.dart:586:5)
#12     new Stream.peri<…>
flutter: I'm still kicking and alive....
[VERBOSE-2:ui_dart_state.cc(148)] Unhandled Exception: Bad state: Cannot add new events after calling close
#0      _BroadcastStreamController.add (dart:async/broadcast_stream_controller.dart:249:24)
#1      Subject._add (package:rxdart/src/subjects/subject.dart:135:16)
#2      Subject.add (package:rxdart/src/subjects/subject.dart:129:5)
#3      new TimerPageBloc.<anonymous closure> (package:timer_issue/main.dart:43:13)
#4      _rootRunUnary (dart:async/zone.dart:1132:38)
#5      _CustomZone.runUnary (dart:async/zone.dart:1029:19)
#6      _CustomZone.runUnaryGuarded (dart:async/zone.dart:931:7)
#7      _BufferingStreamSubscription._sendData (dart:async/stream_impl.dart:336:11)
#8      _BufferingStreamSubscription._add (dart:async/stream_impl.dart:263:7)
#9      _SyncStreamController._sendData (dart:async/stream_controller.dart:764:19)
#10     _StreamController._add (dart:async/stream_controller.dart:640:7)
#11     _StreamController.add (dart:async/stream_controller.dart:586:5)
#12     new Stream.peri<…>
flutter: I'm still kicking and alive....

How to I kill this timer? :(

frankpepermans commented 5 years ago

There's a few ways to kill a periodic Stream, one example would be to use a takeUntil operator, or you could also use a sample instead, where the sampler is something that you could terminate.

Give the former approach:

// Create a Subject that acts as the termination trigger
_onTerminate = StreamController<bool>();
// Create the periodic Stream
scheduler = Observable.periodic(Duration(seconds: 1));
// use takeUntil...
scheduler
    .takeUntil(_onTerminate.stream);

// inside dispose(), simply trigger an event for _onTerminate
_onTerminate.add(true);
AlexandreRoba commented 5 years ago

Hi @frankpepermans thanks for the quick answer. I tried it on my sample and it does not kills the Periodic observable. i was suspecting maybe a race condition and delayed the dispose of the streams in the dispose method but still. It does not kill the Observable :(

My sample:

import 'package:flutter/material.dart';
import 'package:rxdart/rxdart.dart';
import 'dart:async';

void main() => runApp(MyApp());

class MyApp extends StatelessWidget {
  // This widget is the root of your application.
  @override
  Widget build(BuildContext context) {
    return MaterialApp(
      title: 'Flutter Demo',
      theme: ThemeData(
        primarySwatch: Colors.blue,
      ),
      home: HomePage(),
    );
  }
}

class HomePage extends StatelessWidget {
  @override
  Widget build(BuildContext context) {
    return Scaffold(
      body: Container(
        child: Center(
          child: RaisedButton(
            child: Text('Show Timer'),
            onPressed: () {
              Navigator.of(context)
                  .push(MaterialPageRoute(builder: (context) => TimerPage()));
            },
          ),
        ),
      ),
    );
  }
}

class TimerPageBloc {
  //Added
  StreamController<bool> _onTerminate;
  Observable _scheduler;
  PublishSubject<String> _time;

  TimerPageBloc() {
    _onTerminate = StreamController<bool>();
    _scheduler = Observable.periodic(Duration(seconds: 1));
    //Added
    _scheduler.takeUntil(_onTerminate.stream);
    _time = PublishSubject<String>();
    _scheduler.listen((_) {
      print("I'm still kicking and alive....");
      var now = DateTime.now();
      _time.add('${now.hour}:${now.minute}:${now.second}');
    });
  }

  Stream<String> get time => _time.stream;

  void dispose() {
    //Added
    _onTerminate.add(true);
    //I tried with a delay here... Still does not kill the Observable
    _onTerminate.close();
    _time.close();
  }
}

class TimerPage extends StatefulWidget {
  @override
  _TimerPageState createState() => _TimerPageState();
}

class _TimerPageState extends State<TimerPage> {
  TimerPageBloc _bloc;

  @override
  void initState() {
    super.initState();
    _bloc = TimerPageBloc();
  }

  @override
  void dispose() {
    _bloc.dispose();
    super.dispose();
  }

  @override
  Widget build(BuildContext context) {
    return Scaffold(
      appBar: AppBar(
        title: Text('Timer'),
      ),
      body: Center(
        child: Column(
          mainAxisAlignment: MainAxisAlignment.center,
          children: <Widget>[
            Text(
              'You have pushed the button this many times:',
            ),
            StreamBuilder<String>(
                stream: _bloc.time,
                builder: (context, snapshotTime) {
                  return Text(
                    '${snapshotTime.data}',
                    style: Theme.of(context).textTheme.display1,
                  );
                }),
            RaisedButton(
              child: Text('Leave'),
              onPressed: () {
                Navigator.of(context).pop();
              },
            )
          ],
        ),
      ),
    );
  }
}
frankpepermans commented 5 years ago

You are subscribing the the un-transformed Stream, try:

TimerPageBloc() {
    _onTerminate = StreamController<bool>();
    _scheduler = Observable.periodic(Duration(seconds: 1)).takeUntil(_onTerminate.stream);
    //Added
    _time = PublishSubject<String>();
    _scheduler.listen((_) {
      print("I'm still kicking and alive....");
      var now = DateTime.now();
      _time.add('${now.hour}:${now.minute}:${now.second}');
    });
  }
AlexandreRoba commented 5 years ago

Hi Great indeed... How did i missed that... :) idid tried with:

_scheduler = Observable.periodic(Duration(seconds: 1))..takeUntil(_onTerminate.stream);

But it was not working. Your proposition makes sens and works. Thanks for the help.

So for anyone interested the solution was to transform the observable with a takeuntill like:

_scheduler = Observable.periodic(Duration(seconds: 1)).takeUntil(_onTerminate.stream);