ReactiveX / rxdart

The Reactive Extensions for Dart
http://reactivex.io
Apache License 2.0
3.37k stars 270 forks source link

[Q] Some Question with Share(). #259

Closed Guang1234567 closed 5 years ago

Guang1234567 commented 5 years ago

Hello dev:

I have a bloc for flutter app below:

class XXXXPageBloc extends BlocBase {

    PublishSubject<_Request> _onLoadPagesData;

    Observable<ResultState> _stateRewardsPagesData;
    Observable<ResultState> _stateDinePagesData;
    Observable<ResultState> _stateTravelPagesData;
    Observable<ResultState> _stateStayPagesData;
    Observable<ResultState> _statePlayPagesData;

    @override
    void dispose() {
        super.dispose();
        _onLoadPagesData.close();
    }

    @override
    String toString() {
        return 'RewardsPageBloc@$hashCode{}';
    }

    RewardsPageBloc() :super() {
        _onLoadPagesData = PublishSubject<_Request>();

        Observable<ResultState> wrap(String kind) {
            return _onLoadPagesData
            //.transform(bindToLifecycle())
                    .startWith(_Request(kind))
                    .where((request) {
                        return kind == request.kind || "All" == request.kind;
                    })
                    .debounce(const Duration(milliseconds: 250))
                    .flatMap<ResultState>((request) => _loadPagesData(request))
                    .startWith(StateNoTerm());
        }

        Observable<ResultState> _getPageData(Observable<ResultState> upStream) {
            return upStream.map((resultState) {
                if (resultState is StatePopulated<RewardsDto>) {
                    final result = List<OfferListItemModel>();
                    resultState.result?.data?.forEach((item) {
                        result.add(OfferListItemModel.fromS(item));
                    });
                    return StatePopulated<List<OfferListItemModel>>(result: result);
                }
                return resultState;
            }).share();
        }

        _stateRewardsPagesData = _getPageData(wrap("Rewards"));

        print("${_stateRewardsPagesData.isBroadcast}");

        _stateDinePagesData = _getPageData(wrap("Dine"));

        _stateTravelPagesData = _getPageData(wrap("Travel"));

        _stateStayPagesData = _getPageData(wrap("Stay"));

        _statePlayPagesData = _getPageData(wrap("Play"));
    }

    Stream<ResultState> _loadPagesData(_Request request) async* {
        String kind = request.kind;
        if (kind.isEmpty) {
            yield StateNoTerm();
        } else {
            if (request.from == null || request.from == "") {
                yield StateLoading();
            }

            try {
                RewardsDto result = await httpManager.get<RewardsDto>(ApiAddress.getRewards(kind), null);
                if (result == null) {
                    yield StateEmpty();
                } else {
                    yield StatePopulated(result: result);
                }
            } catch (e, stackTrace) {
                yield StateError(error: e, stackTrace: stackTrace);
            }
        }
    }

    void pullToRefresh(String kind) {
        _onLoadPagesData.add(_Request(kind, from: "pullToRefresh"));
    }

    Observable<ResultState> getPageData(String kind) {
        switch (kind) {
            case "Rewards":
                return _stateRewardsPagesData;
            case "Dine":
                return _stateDinePagesData;
            case "Travel":
                return _stateTravelPagesData;
            case "Stay":
                return _stateStayPagesData;
            case "Play":
                return _statePlayPagesData;
            default:
                return Observable.just(StateError(error: "Rewards($kind) not exist!"));
        }
    }
}

class _Request {
    final String kind;
    final String from;

    _Request(this.kind, {this.from = ""});
}

And i use _bloc.getPageData in a page of PageView like below:

class _CatalogTabPageState extends State<CatalogTabPage> /*with AutomaticKeepAliveClientMixin*/ {

    /*@override
    bool get wantKeepAlive => true;*/

    @override
    void initState() {
        super.initState();
        print("initState ${widget.tabInfo.name}   @$hashCode");
    }

    @override
    void dispose() {
        super.dispose();
        print("dispose ${widget.tabInfo.name}   @$hashCode");
    }

    @override
    Widget build(BuildContext context) {
        //super.build(context);
        print("build ${widget.tabInfo.name}   @$hashCode");
        var stream2 = widget._bloc.getPageData(widget.tabInfo.id)
                .doOnData((state) => print("doOnData  $state ${widget.tabInfo.name}   @$hashCode"))
                .doOnCancel(() {
            print("doOnCancel ${widget.tabInfo.name}   @$hashCode  ${Trace.current()}");
        });

        print("build ${widget.tabInfo.name}   @$hashCode  ${stream2.isBroadcast}");

        return Container(
                color: Color(0xFFF5F5F5),
                child: StreamBuilder<ResultState>(
                        initialData: StateNoTerm(),
                        stream: stream2,

                        builder: (BuildContext context, AsyncSnapshot<ResultState> snapshot) {
                            final state = snapshot.data;
                            return stateSwitch(
                                    state,
                                    onStateNoTerm: (state) => null,
                                    onStateLoading: (state) => LoadingWidget(),
                                    onStateEmpty: (state) => EmptyWidget(),
                                    onStateError: (StateError<String> state) => result_state.ErrorWidget(title: state.error),
                                    onStatePopulated: (StatePopulated<List<OfferListItemModel>> state) =>
                                            ResultWidget(widget.tabInfo, widget._bloc, items: state.result)
                                    );
                        }
                        )
                );
    }
}

And console output below after the _CatalogTabPageState dispose and re-initState everytime.


I/flutter (16294): initState Traval   @112567120

I/flutter (16294): build Traval   @112567120

I/flutter ( 2770): doOnCancel Travel   @741815232  package:XXXX_app/page/offers/rewards_page.dart 265:76  _CatalogTabPageState.build.<fn>
I/flutter ( 2770): package:rxdart/src/transformers/do.dart 165:38           DoStreamTransformer._buildTransformer.<fn>.<fn>
I/flutter ( 2770): dart:async/stream_controller.dart 713:20                 _StreamController._recordCancel
I/flutter ( 2770): dart:async/stream_controller.dart 841:24                 _ControllerSubscription._onCancel
I/flutter ( 2770): dart:async/stream_impl.dart 242:21                       _BufferingStreamSubscription._cancel
I/flutter ( 2770): dart:async/stream_impl.dart 393:5                        _BufferingStreamSubscription._sendDone
I/flutter ( 2770): dart:async/stream_impl.dart 283:7                        _BufferingStreamSubscription._close
I/flutter ( 2770): dart:async/stream_controller.dart 772:19                 _SyncStreamController._sendDone
I/flutter ( 2770): dart:async/stream_controller.dart 629:7                  _StreamController._closeUnchecked
I/flutter ( 2770): dart:async/stream_controller.dart 622:5                  _StreamController.close
I/flutter ( 2770): package:rxdart/src/transformers/do.dart 153:28           DoStreamTransfor

Question:

The code above will call doOnCancel () after initState() and build(BuildContext) everytime when i jumpTo a new page? That make a new page did not have any result data to display because stream was canceled.

brianegan commented 5 years ago

Heya -- this is a common mistake. You're creating a Stream inside the build method. The build method can be run at any time -- from a screen orientation change to pushing a new route, etc.

Therefore, you should not be creating Streams inside the build method. If you do, the StreamBuilder will do the following:

  1. Create the Stream
  2. StreamBuilder subscribes
  3. Rebuild triggered by pushing a new route
  4. Recreate the Stream
  5. StreamBuilder sees there's a new stream and unsubscribes from the old one (triggering the onCancel handler)
  6. StreamBuilder subscribes to the new Stream.

You have two options to fix this:

  1. Create and store the Stream in initState, e.g _myStream = createTheStream() and use that in the build method.
  2. Ensure the Bloc returns the same Stream every time you request it.
Guang1234567 commented 5 years ago

@brianegan

Thanks your reply, but not working. ^_^

It could be something else cause this problem.

Guang1234567 commented 5 years ago

@brianegan

I write a simple demo to reappear this problem today.

https://github.com/Guang1234567/flutter_demo_navigator_with_ali_router/blob/a831f35dd830968de4f58518b3e2a475654406e7/lib/rxdart/rxdart_test_widget.dart#L1-L96

_bloc.getPageData("Play”) is implemented by share().

https://github.com/Guang1234567/flutter_demo_navigator_with_ali_router/blob/a831f35dd830968de4f58518b3e2a475654406e7/lib/rxdart/rewards_page_bloc.dart#L38

The ouput:


I/flutter ( 6457): onData1: Instance of 'StateNoTerm<dynamic>'
I/flutter ( 6457): onData2: Instance of 'StateNoTerm<dynamic>'
I/flutter ( 6457): onData1: Instance of 'StateLoading<dynamic>'
I/flutter ( 6457): onData2: Instance of 'StateLoading<dynamic>'
I/flutter ( 6457): onData1: StatePopulated@699752661{result: Response@750916912{info: 1234567 Play}}
I/flutter ( 6457): onData2: StatePopulated@699752661{result: Response@750916912{info: 1234567 Play}}

I/flutter ( 6457): onClik:  1     //<-------------
I/flutter ( 6457): onDone3

Question:

When i click a btn, it will cancel two existed Stream.

 _subscription1.cancel();
 _subscription2.cancel();

but onDone1 and onDone2 was not printed at this moment. Then StreamSubscription subscription3 = _bloc.getPageData("Play”)

                   _subscription1.cancel();
                    _subscription2.cancel();

                    StreamSubscription subscription3 = _bloc.getPageData("Play")
                            .listen(
                                    (resultState) {
                                print("onData3: ${resultState}");
                            },
                            onError: (error) {
                                print("onError3: ${error}");
                            },
                            onDone: () {
                                print("onDone3");
                            },
                            cancelOnError: true);

onDone3 was printed!

Why the onDone1 onDone2 didnot be printed until _subscription1 _subscription2 cancel?

Why the onDone event was triggered at the third time listening of the share Stream?

Guang1234567 commented 5 years ago

why onDone called until all StreamSubscription.cancel, but ouput onDone until subscribe again after StreamSubscription.cancel?

answer: https://github.com/dart-lang/sdk/issues/9026


How to cache data for ervery page widget in the pageview of flutter that didnot keep alive?

answer

using shareReplay(1) to instead of share().