felangel / bloc

A predictable state management library that helps implement the BLoC design pattern
https://bloclibrary.dev
MIT License
11.79k stars 3.39k forks source link

fix: forEach emits new states, after event handler canceled. #3586

Closed thipokch closed 1 year ago

thipokch commented 1 year ago

Description / Expected Behavior

When a new event with restartable() is added to a bloc, it is expected that ongoing asynchronous event will be canceled . However, it appears that emit.forEach within the same event is still emitting new states when it should be closed.

Steps To Reproduce

// pubspec.yaml

flutter_bloc: ^8.1.1
bloc_concurrency: ^0.2.0
bloc_test: ^9.1.0
mocktail: ^0.3.0
1. Create the following bloc. ```dart import 'package:bloc_concurrency/bloc_concurrency.dart'; import 'package:flutter_bloc/flutter_bloc.dart'; class DemoBloc extends Bloc { Source source; DemoBloc(this.source) : super(StateA()) { on( (event, emit) => emit(StateA()), transformer: restartable(), ); on( (event, emit) => emit.forEach(source.numbers, onData: (data) => StateB()), transformer: restartable()); } } // EVENTS abstract class DemoEvent {} class EventA extends DemoEvent {} class EventB extends DemoEvent {} // STATES abstract class DemoState {} class StateA extends DemoState {} class StateB extends DemoState {} abstract class Source { Stream get numbers; } ```
2. Create following the bloc test ```dart import 'dart:async'; import 'package:bloc_test/bloc_test.dart'; import 'package:demo/demo/demo_bloc.dart'; import 'package:flutter_test/flutter_test.dart'; import 'package:mocktail/mocktail.dart'; class _MockSource extends Mock implements Source {} void main() { group('SessionListBloc', () { late Source source; late StreamController sourceStream; late DemoBloc bloc; setUp(() { source = _MockSource(); sourceStream = StreamController( onListen: () => sourceStream.add(7), ); when(() => source.numbers).thenAnswer((_) => sourceStream.stream); bloc = DemoBloc(source); }); blocTest( "demo", build: () => bloc, // Constructed with StateA() act: (bloc) async { bloc.add(EventB()); // Emits StateB forEach number added to sourceStream bloc.add(EventA()); // Cancels EventB await Future.delayed(const Duration(seconds: 1)); sourceStream.add(6); // No EventB should be emited when 6 is added to sourceStream }, expect: () => [ StateA(), // Initial State StateB(), // streamSource.onListen StateA(), ], ); }); } ```
3. Run tests ``` Expected: [Instance of 'StateA', Instance of 'StateB', Instance of 'StateA'] Actual: [Instance of 'StateA', Instance of 'StateB', Instance of 'StateB'] ```

demo.zip

Additional Context

I'm a little unsure how concurrency transforms interact with each other. Here's the potential use case.

  1. For instance, I want to stream a list of menu from the repo (subscribe event - concurrent).
  2. At the same time, I want to remove a menu from the repo (delete event - concurrent). A list of menu should be updated with changes via (subscribe event).
  3. Then, I want to cancel the subscribe event using (unsubscribe event - restartable)
jacobtipp commented 1 year ago

@thipokch in your DemoBloc, you are using emit.forEach forEach is a method that returns a Future. You must await it inside of the event handler, so the event handler must also be marked as async.

refer to this https://github.com/felangel/bloc/issues/3623#issuecomment-1311005638

change this:

  on<EventB>(
        (event, emit) =>
            emit.forEach(source.numbers, onData: (data) => StateB()),
        transformer: restartable());

to this:

  on<EventB>((event, emit) async {
         await emit.forEach(source.numbers, onData: (data) => StateB())
   },
   transformer: restartable());
thipokch commented 1 year ago

@jacobtipp Thanks!

vasilich6107 commented 1 year ago

@jacobtipp @felangel @thipokch

This does not work

image
felangel commented 1 year ago

@vasilich6107 can you share a link to a minimal reproduction sample? From the screenshot it looks like you’re bucketing EventA and EventB separately so they won’t interfere with each other. I’m guessing you want to bucket them together instead:


on<DemoEvent>((event, emit) async {
  if (event is EventA) {…}
  else if (event is EventB) {…}
}, transformer: restartable());
vasilich6107 commented 1 year ago

hi @felangel I took the data from initial issue description https://github.com/felangel/bloc/issues/3586#issue-1418033166

felangel commented 1 year ago

hi @felangel I took the data from initial issue description #3586 (comment)

See my updated comment and let me know if that helps.

vasilich6107 commented 1 year ago

@felangel this was not my code) I have my own issues with emit.forEach and restartable. I was crawling through the issues and found this one.

I do not know why author closed it as far as suggested approach does not fix the issue.

I will try to find time and create my own example

talksik commented 1 year ago

Thanks for the context above all!

Just trying to fix an issue myself where I want to stop listening to a stream that I started listening to with emit.ForEach.

I tried restartable but that didn't work. The UI dispatches another event, and I want it to stop subscription there. Is this possible?

vasilich6107 commented 1 year ago

Hi @talksik

I've created separate issue with reproduction

https://github.com/felangel/bloc/issues/3888

You can add your own reproduction code to my issue.

talksik commented 1 year ago

@vasilich6107 thank you! will keep an eye out there.

I think my problem may have been that I instantiate multiple instances of the same provider. I need this because I want each child widget in a column of widgets to have their own provider. For some reason though, there is a clash and when one of the items in the list of widgets disposes and I call .close, it still continues to listen to the stream that was passed in. (i.e. emit.forEach doesn't stop listening to stream when the provider is closed.)