redux-observable / redux-observable

RxJS middleware for action side effects in Redux using "Epics"
https://redux-observable.js.org
MIT License
7.85k stars 466 forks source link

Process Manager dispatch multiple actions #62

Closed beckend closed 8 years ago

beckend commented 8 years ago

I tried to return an array of action objects from the observable, Uncaught Error: Actions must be plain objects. Use custom middleware for async actions.

It would be great if it worked, then I could dispatch to a logger or notification component, for success and errors.

martypenner commented 8 years ago

You can already do this with proper use of Rx operators. E.g.

const myCoordinator = actions$ =>
  actions$.ofType('MY_COOL_ACTION')
    .flatMap(action =>
      Observable.concat(
        // Fire 2 actions, one after the other
        Observable.of(someCoolAction),
        Observable.of(someOtherAction)
      )
    );

There are many other combinations possible.

beckend commented 8 years ago

What if I wanted to use the payload of the ajax.get? I would not be able to access it without tricks.

martypenner commented 8 years ago

Can you provide an example?

beckend commented 8 years ago
action$.ofType(LOGIN_REQUEST)
  .switchMap(() =>
    Observable.ajax.post(
      'http://www.endpoint.com',
      {
        username: 'user',
        password: 'password'
      })
      .map(payload => {
        // Do double jobs, to some notification component which listens to a reducer
        // And the session reducer which holds auth
        // I would need to save the payload to some temp variable to pass it to a concat
        // If this is not possible
        return [
          { type: LOGIN_SUCCESS, payload: payload.response },
          { type: NOTIFY_SUCCESS, payload: payload.response }
        ];
      })
      .takeUntil(action$.ofType(LOGIN_ABORT))
      .catch(({ xhr }) => Observable.of(loginFailed(xhr.response)))
    )
  );

And to add, if I wanted to do conditional stuff based of fail or success, in a concat, I had to have a variable that would be in above scope to check if it's a success or not.

martypenner commented 8 years ago

Your example wouldn't have to change much at all:

action$.ofType(LOGIN_REQUEST)
  .switchMap(() =>
    Observable.ajax.post(
      'http://www.endpoint.com',
      {
        username: 'user',
        password: 'password'
      })
      // Note the different operator here
      .flatMap(payload =>
        // Concat 2 observables so they fire sequentially
        Observable.concat(
          Observable.of({ type: LOGIN_SUCCESS, payload: payload.response }),
          Observable.of({ type: NOTIFY_SUCCESS, payload: payload.response })
        )
      )
      .takeUntil(action$.ofType(LOGIN_ABORT))
      .catch(({ xhr }) => Observable.of(loginFailed(xhr.response)))
    )
  );

For failure or success, you could likely encapsulate that in a catch operator, or alternatively, something like retryWhen.

beckend commented 8 years ago

Thanks, that works nicely, then I guess I will leave it to the mods to close if it's not going to be implemented.

martypenner commented 8 years ago

No problem. Rx has a big learning curve; I often find the solution is much simpler than I first thought.

beckend commented 8 years ago

Actually, I just tested the flatMap, it did not work, the second sequence did not fire. The observable went to the catch clause when adding the second sequence, removing the second sequence works fine.

beckend commented 8 years ago

Ok sorry for the noise, it's just the normal trail and error, it actually works.

jayphelps commented 8 years ago

Thanks @martypenner! Indeed we defer to idiomatic Rx in situations like these and will not likely support dispatching an array of actions. (Unless of course Redux starts supporting it, in which case it would start working automatically. Although I discourage it, you could create a pretty trivial Redux middleware to add support to Redux.

jayphelps commented 8 years ago

You can also use Rx's .switch() operator to basically say "whatever is emitted, treat it as something that can be consumed as an observable (technically ObservableInput). Arrays can be consumed as such.

http://jsbin.com/walopuh/edit?js,output

Observable.of([1, 2, 3])
  .switch() // comment me out to see it emit array
  .subscribe(
    value => console.log('value', value)
  );

So instead of emitting an array [1, 2, 3], it emits 1 then 2, then 3.

beckend commented 8 years ago

Thanks guys, wish there were some good docs of the millions of ways the operators could be used...

newtriks commented 8 years ago

@jayphelps to dispatch multiple actions outside of the flatMap in the catch operator would Observable.merge be appropriate?

For example:

.catch(({ xhr }) => 
  Observable.merge(
    Observable.of(someAction(xhr)),
    Observable.of(somOtherAction(xhr))
  )
)
jayphelps commented 8 years ago

@newtriks absolutely works, though there are some ways you can simply it:

Simple

Since your example wants to emit two actions, one after the other, you don't need a stream for each of them but instead can just return a single stream that emits both. Observable.of accepts any number of values to emit in order.

.catch({ xhr }) => Observable.of(
  someAction(xhr),
  somOtherAction(xhr)
))

Most simple

This relies on the fact that in RxJs v5, arrays can be returned whenever an observable is expected and will be consumed as one. This is effectively identical to the previous example, but some find this too magical/terse because many people are still unfamiliar with this feature.

.catch({ xhr }) => [
  someAction(xhr),
  somOtherAction(xhr)
])
newtriks commented 8 years ago

@jayphelps oooooh I like 😸

slack-imgs com

So what about when order matters?

jayphelps commented 8 years ago

@newtriks You also mentioned you'd be putting this outside of a flatMap, so I want to warn that if you mean on the top-level stream of a given epic, letting errors bubble up to there is often too late.

As an example:

const somethingEpic = action$ =>
  action$.ofType(WOW)
    .flatMap(action => something(action))
    .catch({ xhr }) => Observable.of(
      someAction(xhr),
      somOtherAction(xhr)
    ));

If we catch the error here, this entire epic has already terminated and will no longer listen for future actions.

Instead, you almost always want to catch the errors where they occur, so they do not bubble out to the top.

const somethingEpic = action$ =>
  action$.ofType(WOW)
    .flatMap(action =>
      something(action))
        .catch({ xhr }) => Observable.of(
          someAction(xhr),
          somOtherAction(xhr)
        ))
    );

This is also described a bit in the docs but I highly recommend a recent talk by @blesh which explains why this is the case very intuitively https://www.youtube.com/watch?v=3LKMwkuK0ZE

jayphelps commented 8 years ago

@newtriks

So what about when order matters?

Can you give me an example of what you mean? In my examples, the output order is the order in which you provided them to Observable.of. So the order does matter.

newtriks commented 8 years ago

@jayphelps my mistake, sorry I skipped through your answer too quick. S'all looks great thanks.

jakecraige commented 7 years ago

I had quite a difficult time converting this saga that looked pretty simple on the surface, to an epic. I figured I'd share since this is the issue I found when googling dispatching multiple actions.

Saga looked like so:

export function* getTrackedWinesSaga(): any {
  try {
    yield put(beginFetch())
    const wines = yield call(api.getTrackedWines)
    yield put(trackedWines(wines))
  } catch (error) {
    yield put(trackedWines(error))
  } finally {
    yield put(endFetch())
  }
}

Since I wasn't making any progress attempting to use operators to get it to work. I started my writing my own observable implementation like so, which is gross, but works:

export const getTrackedWinesEpic = (action$, store) =>
  action$.ofType(TRACKED_WINES_REQUEST).switchMap(() => {
    return new Observable(observer => {
      observer.next(beginFetch())

      const token = store.getState().session.user.authentication_token
      const subscription = api.getTrackedWines$(token) // rx'd version of API request
        .subscribe({
          next: wines => observer.next(trackedWines(wines)),
          error: error => observer.error(trackedWines(error)),
          complete: () => {
            observer.next(endFetch())
            observer.complete()
          }
        })

      return function unsubscribe() {
        subscription.unsubscribe()
      }
    })
  })

The "shape" of that helped me get a better idea of what I might need to do so I decided to carry on attempting to figure it out. I knew I had an observable that needed to emit multiple values in a sequence, so I went to the helpful operator finder at the bottom of http://reactivex.io/rxjs/ which led me to concat. (I had earlier tried this, but I think kept running into issues, one of them being I kept passing it an array rather than multiple params).

After some experimentation, I ended up with this:

import { of as of$ } from 'rxjs/observable/of'
import { concat as concat$ } from 'rxjs/observable/concat'

export const getTrackedWinesEpic = (action$, store) =>
  action$.ofType(TRACKED_WINES_REQUEST).switchMap(() =>
    concat$(
      of$(beginFetch()),
      api.getTrackedWines$().map(trackedWines).catch(e => of$(trackedWines(e))),
      of$(endFetch()),
    )
  )

Hopefully this helps some fellow Rx-ers if they face a similar issue.

damassi commented 7 years ago

@jakecraige - Just what I was looking for

MistyKuu commented 7 years ago

@jakecraige I tried to call action like you do and it doesn't work unless I wrap it in dispatch. Is beginFetch() just a plain action or already wrapped in dispatch?

mkamranhamid commented 6 years ago

i'm late i guess but if anyone of you see this i've a question i'm new to redux and i'm coming from angular and services logic so i kind of know how redux is working. I'm using only 2 epic listeners(one to call GET method from the service and one for POST) + services(for fetching data from an api). So whenever i've to call any api i make an object with some api configuration including url, param and if include authorization header in the api now the thing is this thing works fine when i dispatch one action at a time but when i've to do multiple actions means multiple get or post it stops in the epic after doing all the service and epic work and doesn't go to reducer to store the data and gives me error Actions must be plain objects. Use custom middleware for async actions I know there's something wrong in my implementation. It would be great if it works this way else i've to move to other services like thunk or something else.

Stackoverflow

jmarceli commented 6 years ago

Based on @jakecraige example and this SO question I've made my own example for redux actions chaining in redux-observable 1.0.x and rxjs 6.2.x:

import { ofType } from 'redux-observable';
import { of } from 'rxjs/observable/of';
import { from } from 'rxjs/observable/from';
import { delay, merge, concat, map, flatMap, catchError } from 'rxjs/operators';
import axios from 'axios';

const apiGet = async () => {
  return await axios({ url: 'https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.2.1/rxjs.umd.min.js' });
  // to generate API response error (403) use e.g.:
  // return await axios({ url: 'https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.2.1/rxjs.umd.min.jsNONEXISTING' });
};

const testEpic = action$ => action$.pipe(
  ofType('CONNECT'), // this action will trigger epic start
  // delay(3000),
  flatMap(action =>
    of({ type: 'TEST_PREPARE' }).pipe( // dispatch TEST_PREPARE at the beginning
      concat( // concat emmits actions one after another, use merge to emmit all of them at once
        from(apiGet()).pipe( // convert async/await to Observable
          // delay(3000),
          map(response => {
            console.log(response);
            return { type: 'TEST_SUCCESS' }; // dispatch TEST_SUCCESS after receiving data
          }),
          // catchError(error => of({ type: 'TEST_ERROR' })), // catch ERROR here to always dispatch TEST_AFTER_SUCCESS action
        ),
        of({ type: 'TEST_AFTER_SUCCESS' }), // dispatch TEST_AFTER_SUCCESS, just because we can
      ),
      catchError(error => of({ type: 'TEST_ERROR' })), // catch ERROR here to avoid dispatching TEST_AFTER_SUCCESS on error
    ),
  ),
);

What it does is emitting a series of actions in response to 'CONNECT' action.

CONNECT-->TEST_PREPARE-->TEST_SUCCESS-->TEST_AFTER_SUCCESS

You may add some arbitrary delay to 'TEST_AFTER_SUCCESS' by piping some delay() e.g.:

        of({ type: 'TEST_AFTER_SUCCESS' }).pipe(
          delay(3000)
        ),

DISCLAIMER There is a high possibility that this piece of code may be written much better. I'm still learining Rxes so please comment if you see some obvious bugs/improvements.

jmarceli commented 6 years ago

This might be obvious for more experienced Rxers, but it wasn't for me. There are two kind of concat imports available. One is basic observable and the other is a static method (more). In my previous example I was using concat observable operator while static method will simplify code. Here it is:

import { ofType } from 'redux-observable';
import { of, from, concat } from 'rxjs'; // works for RxJS v6
import { delay, merge, map, flatMap, catchError } from 'rxjs/operators'; // line changed
import axios from 'axios';

const apiGet = async () => {
  return await axios({ url: 'https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.2.1/rxjs.umd.min.js' });
  // to generate API response error (403) use e.g.:
  // return await axios({ url: 'https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.2.1/rxjs.umd.min.jsNONEXISTING' });
};

const testEpic = action$ => action$.pipe(
  ofType('CONNECT'), // this action will trigger epic start
  // delay(3000),
  flatMap(action => concat(
    of({ type: 'TEST_PREPARE' }), // dispatch TEST_PREPARE at the beginning
    from(apiGet()).pipe( // convert async/await to Observable
      // delay(3000),
      map(response => {
        console.log(response);
        return { type: 'TEST_SUCCESS' }; // dispatch TEST_SUCCESS after receiving data
      }),
      // catchError(error => of({ type: 'TEST_ERROR' })), // catch ERROR here to always dispatch TEST_AFTER_SUCCESS action
    ),
    of({ type: 'TEST_AFTER_SUCCESS' }), // dispatch TEST_AFTER_SUCCESS, just because we can
  ),
  catchError(error => of({ type: 'TEST_ERROR' })), // catch ERROR here to avoid dispatching TEST_AFTER_SUCCESS on error
  )),
);
evertbouw commented 6 years ago

@jmarceli Rxjs also exports the operators startWith and endWith. You can place these in your apiGet pipe. This is functionally the same as your concat but it might be easier to read. It is harder to delay the after success action this way tho

flatMap(action => from(apiGet()).pipe(
    startWith({ type: 'TEST_PREPARE' }),
    map(response => ({ type: 'TEST_SUCCESS' })),
    endWith({ type: 'TEST_AFTER_SUCCESS' }),
)),
jmarceli commented 6 years ago

Your hint looks promising but it doesn't work as expected in case of an API error response. Please check the https://rxviz.com/v/RObNbm9O

Pappa commented 6 years ago

Regarding the concat examples above. Just in case anyone else stumbles across the same problem as me, using the version of concat exported from "rxjs/operators" will produce the error TypeError: fn is not a function. It took me a day to realise I was using the wrong concat.

TheFullResolution commented 6 years ago

@Pappa thanks for saving me lots of misery...

ilionic commented 6 years ago

re. import { concat } from 'rxjs/observable/concat'; getting:

This dependency was not found:

* rxjs-compat/observable/concat in ./node_modules/rxjs/observable/concat.js

since not using rxjs-compat

jmarceli commented 6 years ago

You may try with (untested, found somewhere in my old code):

import { concat } from 'rxjs';
jayphelps commented 6 years ago

@jmarceli yep, that's the correct one. import { concat } from 'rxjs'; in rxjs v6

giacomocerquone commented 6 years ago

Can we add this somewhere in the docs? I may submit a PR :)

jayphelps commented 6 years ago

@giacomocerquone sorry--I'm not sure which thing you want to add to the redux-observable docs? import { concat } from 'rxjs'; ? That would be in RxJS's docs. 😃

giacomocerquone commented 6 years ago

the @beckend or @jakecraige answers, maybe in the recipes in order to let people know how to dispatch multiple actions effectively. I know that maybe someone that knows rxjs enough should already know how to do this, but at this point recipes like "error handling" and "cancellation" should be useless too!

jayphelps commented 6 years ago

@giacomocerquone oh! yes, a recipe sounds good.

I would suggest however adding the a clarification at the top that sequentially dispatching multiple actions synchronously is sometimes a sign that you actually should just be using a single action since multiple reducers can change state in response to receiving the same action. However, that's not a rule, and there are plenty of cases where having separate actions provides easier separation of concerns for testing and reusability; the major takeaway is just to stop and ask "is it cleaner to just use a single action and have my reducers respond to it?" If you're not sure how to word that, no worries at all! You can skip it and I can come up with something.

giacomocerquone commented 6 years ago

Wait, the saga example that jakecraige tried to reproduce with observables wasn't synchronous, but asynchronous:

export function* getTrackedWinesSaga(): any {
  try {
    yield put(beginFetch())
    const wines = yield call(api.getTrackedWines)
    yield put(trackedWines(wines))
  } catch (error) {
    yield put(trackedWines(error))
  } finally {
    yield put(endFetch())
  }
}

In order to put() trackedWines() action, he needs the wines const that he obtains from a previous asynchronous call through call(). Am I getting confused somewhere?

Apart from this, I know of the "multiple sync dispatch" weirdness, abramov tweeted about it too. We can also add a sync and async recipe for dispatching multiple actions where I can include this.

Thanks for the time.

kidwm commented 5 years ago

@giacomocerquone have you submit your PR for doc?

ppalmeida commented 5 years ago

As said by @Pappa, take care importing the wrong operators. In my case the error was like:

TypeError: Object(...) is not a function

Then, after 2h scrapping my code and hitting my head against the table, I found it:

// WRONG!!
import { map, mergeMap } from "rxjs/operators/map";

Jesusssss...

// Correct  ;-)
import { map } from "rxjs/operators/map";
import { mergeMap } from "rxjs/operators/mergeMap";