fluttercommunity / redux.dart

Redux for Dart
https://pub.dev/packages/redux
MIT License
515 stars 61 forks source link

Use of RxDart BehaviorSubject instead of StremController #50

Closed ftognetto closed 4 years ago

ftognetto commented 5 years ago

Hi @johnpryan is possible for you a redux implementation using rxdart in the Store class?

In this way it would be possible too to choose using a PublishSubject or a BehaviorSubject for the _changeController.

Thank you,

Fabrizio

ftognetto commented 5 years ago

Something like this

class Store<State> {

...

final Subject<State> _changeController;

...

Store(
    this.reducer, {
    State initialState,
    List<Middleware<State>> middleware = const [],
    bool syncStream: false,
    bool behaviorSubject: false,
    /// If set to true, the Store will not emit onChange events if the new State
    /// that is returned from your [reducer] in response to an Action is equal
    /// to the previous state.
    ///
    /// Under the hood, it will use the `==` method from your State class to
    /// determine whether or not the two States are equal.
    bool distinct: false,

  })
      : _changeController = behaviorSubject ? new BehaviorSubject(sync: syncStream) : new PublishSubject(sync: syncStream) {
    _state = initialState;
    _dispatchers = _createDispatchers(
      middleware,
      _createReduceAndNotify(distinct),
    );
  }

...

Stream<State> get onChange => _changeController.stream.asBroadcastStream();
brianegan commented 5 years ago

Heya @quantosapplications -- I help out with Redux and RxDart -- could you describe in a bit more detail what benefit this would give the end user?

ftognetto commented 5 years ago

Hi, the main benefit comes with using behaviorsubject because it works in a different way than streams, as it emit last value when first subscribed.

I’m implementing redux in a bloc architecture so that repositories can memoizing data using redux and also can expose Observables which bloc can listen.

When bloc subscribe to some observable of a repository is more efficient that observable suddenly emit last value and then if it change also emit (behaviorsubject) so bloc can load data by only listening to that observable.

I can send you some code later, thank you

Fabrizio Tognetto

Il giorno 9 set 2019, alle ore 17:14, Brian Egan notifications@github.com ha scritto:

Heya @quantosapplications -- I help out with Redux and RxDart -- could you describe in a bit more detail what benefit this would give the end user?

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub, or mute the thread.

ftognetto commented 5 years ago

Hi @brianegan, thanks for reply!

I try to make a simple example of what I'm saying

bloc -> ask repository to fetch data repository -> fetch data and dispatch them to the store repository -> expose observable which connect to store bloc -> listen to repository's observable

bloc


/// Constructor
Bloc(){
   repository.entities$.listen((data){
      //do something
   });
}

/// Or else expose so ui can subscribe with StreamBuilder
Observable<List<Entity>> get entities$ => repository.entities$;

...

/// Load method called from ui
Future<void> load() async {
    try {
      await postRepository.getEntities();
    }
    catch(e) {
      //handle error
    } 
}

Repository

/// Repository expose observables that connect to store
Observable<List<Entity>> get entities$ => store.state$.map((AppState state) => state.entities);

/// Fetch data from api and save to store
Future<void> getEntities() async {
    final List<Entity> entities = getDataFromApi();
    store.dispatch(AddEntities(entities: entities));
}

Now this structure has no problems working with Streams because bloc is asking data after it subscribe to it.

But if I have another bloc called "bloc2" that need only to fetch data already present in the store (for example bloc1 manage a list of users, bloc 2 manage user details that are already in the store) i can do it in two way:

  1. Bloc 2 ask repository for the single entity, the repository look into the store if it is present or else it fetch from api.

  2. Bloc 2 subscribe to repository and retrieves data (with behavior subject)

bloc 2


/// Constructor
Bloc2(int id){
   repository.entities$.map((entities) => entities.firstWhere((e.id == id)).listen((data){
      //do something
   });
}

/// Or else expose so ui can subscribe with StreamBuilder
Observable<Entity> get entity$ => repository.entities$.map((entities) => entities.firstWhere((e.id == id));

This is an example of why it could be nice if let user choose if manage store with publish or behavior subject but maybe there can be something else!

Fabrizio

ftognetto commented 5 years ago

51 this would be the pr

ftognetto commented 4 years ago

Hi, @brianegan any update on this? Do you think it could be possible to merge?

brianegan commented 4 years ago

Hey there @quantosapplications -- Overall, I get what you're trying to achieve, but I'm not sure this fits in well with this Redux library specifically.

Since we're trying our best to emulate the default Redux.js functionality in a Dart world, I really think adding in Rx would be overkill for this little library. I think this might be better as a specialized fork for your particular needs!

MichaelMarner commented 4 years ago

This issue sometimes comes up in Epics or other async middleware where you want to do something, but not until the store is in a particular state. For example, wanting do make a request but only after another request has completed.

As a concrete example, wanting do do something once pending uploads are complete, you may have an epic something like this:

Stream call(dynamic actions, EpicStore store) {
    return Observable(actions)
        .whereType<RequestCreate>()
        .switchMap((action) {
          return Observable(store.onChange)
          .map(areFilesUploading) 
          .skipWhile((uploading) => uploading == true)
          .take(1)
          .switchMap((_) {
            return doMyAsyncTask()
          });
        });
}

The problem with the above code is that if there are no files uploading, the epic never completes, because onChange never emits anything. In rxjs this isn't a problem because the store is a BehaviorSubject, and so the current state is emitted on subscribing.

As a workaround, it is possible to merge store.onChange with a BehaviorSubject seeded with the current state. The code above becomes:

Stream call(dynamic actions, EpicStore store) {
    return Observable(actions)
        .whereType<RequestCreate>()
        .switchMap((action) {
          return Observable(
            MergeStream<AppState>(
              [
                BehaviorSubject<AppState>.seeded(store.state),
                store.onChange,
              ],
            ),
          )
          .map(areFilesUploading) 
          .skipWhile((uploading) => uploading == true)
          .take(1)
          .switchMap((_) {
            return doMyAsyncTask()
          });
        });
}

The downside here is it needs to be done per-epic, instead of once.

Posting here to give a possible workaround for others, but also to throw my hand up and suggest using a BehaviorSubject instead of a regular Stream makes Redux much more useful, especially for async operations.