felangel / bloc

A predictable state management library that helps implement the BLoC design pattern
https://bloclibrary.dev
MIT License
11.8k stars 3.39k forks source link

fix: Bloc Concurrency with restartable transformer does not cancel previous event #3349

Closed PhilipPurwoko closed 2 years ago

PhilipPurwoko commented 2 years ago

Description Bloc Concurrency with restartable transformer does not cancel previous event

Steps To Reproduce The bloc file merchant_product_bloc.dart that listen on an event MerchantProductCartChanged and emit the state MerchantProductCartChange

class MerchantProductBloc extends Bloc<MerchantProductEvent, MerchantProductState> {
  MerchantProductBloc() : super(MerchantProductInitial()) {
    on<MerchantProductCartChanged>(_changeCart, transformer: restartable());
  }

  Future<void> _changeCart(
    MerchantProductCartChanged event,
    Emitter<MerchantProductState> emit,
  ) async {
    await Future.delayed(const Duration(milliseconds: 300));
    logInfo('Cart Changed | productId=${event.productId} qty=${event.qty}');
    emit(const MerchantProductCartChange.loading());
    try {
      await productRepository.changeCart(
        productId: event.productId,
        qty: event.qty,
        date: event.date,
      );
      emit(const MerchantProductCartChange.success());
    } catch (e) {
      emit(MerchantProductCartChange.error(msg: e.toString(), lastData: event));
    }
  }
}

The button I used to trigger the event MerchantProductCartChanged

BlocProvider.of<MerchantProductBloc>(context).add(
  MerchantProductCartChanged(
    productId: product.id!,
    initialQty: 1,
  ),
);

When I pressed the button 4 times very fast. It logs 4 times as well and call API 4 times

I/PLogger (30516): {PAX A920}  {Loket}  {Cart Changed | productId=104 qty=13}  {06 May 2022 08:35:58 PM}  {INFO}
I/PLogger (30516): {PAX A920}  {Loket}  {Cart Changed | productId=104 qty=14}  {06 May 2022 08:35:58 PM}  {INFO}
I/PLogger (30516): {PAX A920}  {Loket}  {Cart Changed | productId=104 qty=15}  {06 May 2022 08:35:58 PM}  {INFO}
I/PLogger (30516): {PAX A920}  {Loket}  {Cart Changed | productId=104 qty=16}  {06 May 2022 08:35:58 PM}  {INFO}

Expected Behavior Should only logs one line at last event

I/PLogger (30516): {PAX A920}  {Loket}  {Cart Changed | productId=104 qty=16}  {06 May 2022 08:35:58 PM}  {INFO}
felangel commented 2 years ago

Hi @PhilipPurwoko 👋 Thanks for opening an issue!

Are you able to provide a minimal reproduction sample? Thanks 🙏

YStepiuk-DOIT commented 2 years ago

Hi @felangel ! The issue is in the on\<Event>() method

        void handleEvent() async {
          void onDone() {
            emitter.complete();
            _emitters.remove(emitter);
            if (!controller.isClosed) controller.close();
          }

          try {
            _emitters.add(emitter);
            await handler(event as E, emitter);
          } catch (error, stackTrace) {
            onError(error, stackTrace);
            rethrow;
          } finally {
            onDone();
          }
        }

Execution of handleEvent() is not handled properly. It's 'fire and forget'. Just emitter is closed. This is causing all started event handlers to continue doing their job, just without ability to emit new state.

Minimal code to reproduce issue:

class TestEvent {}

class TestBloc extends Bloc<TestEvent, Object> {
  TestBloc() : super(Object()) {
    on<TestEvent>(_onTestEvent, transformer: restartable());
  }

  var testCounter = 0;

  Future<void> _onTestEvent(event, emit) async {
    final counter = testCounter++;
    print('Starting test count: $counter, is emitter alive: ${!emit.isDone}');
    await Future.delayed(const Duration(seconds: 5));
    // We should reach here only for latest event
    print('Finished test count: $counter, is emitter alive: ${!emit.isDone}');
  }
}

void main() {
  final bloc = TestBloc();
  bloc.listen(null);
  bloc.add(TestEvent());
  bloc.add(TestEvent());
  bloc.add(TestEvent());
}
// Expected to print:
// Starting test count: 0, is emitter alive: true
// Starting test count: 1, is emitter alive: true
// Starting test count: 2, is emitter alive: true
// Finished test count: 2, is emitter alive: true

// Actually prints:
// Starting test count: 0, is emitter alive: true
// Starting test count: 1, is emitter alive: true
// Starting test count: 2, is emitter alive: true
// Finished test count: 0, is emitter alive: false
// Finished test count: 1, is emitter alive: false
// Finished test count: 2, is emitter alive: true
YStepiuk-DOIT commented 2 years ago

The fix would be to save StreamSubscription for await handler(event as E, emitter); and cancel it in StreamController

felangel commented 2 years ago

Hi @YStepiuk This is because Futures aren't truly cancelable. To get the behavior you're describing you can simply check if emit.isDone is true before performing any expensive computations:

import 'package:bloc/bloc.dart';
import 'package:bloc_concurrency/bloc_concurrency.dart';

class TestEvent {}

class TestBloc extends Bloc<TestEvent, Object> {
  TestBloc() : super(Object()) {
    on<TestEvent>(_onTestEvent, transformer: restartable());
  }

  var testCounter = 0;

  Future<void> _onTestEvent(event, emit) async {
    final counter = testCounter++;
    print('Starting test count: $counter, is emitter alive: ${!emit.isDone}');
    await Future.delayed(const Duration(seconds: 5));
    if (emit.isDone) return;
    // We should reach here only for latest event
    print('Finished test count: $counter, is emitter alive: ${!emit.isDone}');
  }
}

void main() {
  final bloc = TestBloc();
  bloc.stream.listen(null);
  bloc.add(TestEvent());
  bloc.add(TestEvent());
  bloc.add(TestEvent());
}
// Actually prints:
// Starting test count: 0, is emitter alive: true
// Starting test count: 1, is emitter alive: true
// Starting test count: 2, is emitter alive: true
// Finished test count: 2, is emitter alive: true

This is also related to https://github.com/felangel/bloc/issues/3069

felangel commented 2 years ago

Closing for now since there don't appear to be any actionable next steps -- this is working as expected. Feel free to comment with any outstanding questions and I'm happy to continue the conversation 👍

politebarista commented 2 years ago

Hello @felangel! First of all, thank you so much for everything you do for the Flutter community 💙 I recently ran into a similar problem and your solution helped, but your solution (checking emit.isDone) also works without specifying restartable. I'm sorry if the question seems trivial to you, but then why do we need a restartable in bloc_concurrency at all? How does it help?

felangel commented 2 years ago

Hello @felangel! First of all, thank you so much for everything you do for the Flutter community 💙 I recently ran into a similar problem and your solution helped, but your solution (checking emit.isDone) also works without specifying restartable. I'm sorry if the question seems trivial to you, but then why do we need a restartable in bloc_concurrency at all? How does it help?

Thanks for the kind words! The advantage of using restartable is you can disregard the results of previous events. This is important when you don't have expensive operations and you want the user to always see the results of their most recent actions. Hope that helps 👍

vasilich6107 commented 1 year ago

Hi @felangel Could you clarify where to put this if (emit.isDone) return; in case of await emit.forEach?

vasilich6107 commented 1 year ago

Im trying to implement the approach from this article https://verygood.ventures/blog/how-to-use-bloc-with-streams-and-concurrency

but the restartable processes all calls

image