// An observable that emits 10 multiples of 100 every 1 second
const source$ = Observable.interval(1000)
.take(10)
.map(x => x * 100);
/**
* returns a promise that waits `ms` milliseconds and emits "done"
*/
function promiseDelay(ms) {
return new Promise(resolve => {
setTimeout(() => resolve('done'), ms);
});
}
// using it in a switchMap
source$.switchMap(x => promiseDelay(x)) // works
.subscribe(x => console.log(x));
source$.switchMap(promiseDelay) // just a little more terse
.subscribe(x => console.log(x));
// or takeUntil
source$.takeUntil(doAsyncThing('hi')) // totally works
.subscribe(x => console.log(x))
// or weird stuff you want to do like
Observable.of(promiseDelay(100), promiseDelay(10000)).mergeAll()
.subscribe(x => console.log(x))
经验证确实工作的很好
使用defer函数可以让返回promise的函数可以重新执行化
如果你的函数返回一个promise你可以使用Observable.defer包裹他,就可以使得他在发生错误是可以进行重试jsbin。
Observable.defer: Returns an observable sequence that invokes the specified factory function whenever a new observer subscribes.
function getErroringPromise() {
console.log('getErroringPromise called');
return Promise.reject(new Error('sad'));
}
Observable.defer(getErroringPromise)
.retry(3)
.subscribe(x => console.log);
// logs "getErroringPromise called" 4 times (once + 3 retries), then errors
经验证确实工作的很好
使用defer来封装async-await
defer是一个强有力的工具,你也可以用它来封装async-await函数
Observable.defer(async function() {
const a = await promiseDelay(1000).then(() => 1);
const b = a + await promiseDelay(1000).then(() => 2);
return a + b + await promiseDelay(1000).then(() => 3);
})
.subscribe(x => console.log(x)) // logs 7
const click$ = Observable.fromEvent(button, 'clicks');
/**
* Waits for 10 clicks of the button
* then posts a timestamp of the tenth click to an endpoint
* using fetch
*/
async function doWork() {
await click$.take(10)
.forEach((_, i) => console.log(`click ${i + 1}`));
return await fetch(
'notify/tenclicks',
{ method: 'POST', body: Date.now() }
);
}
先上图:
原文地址(english, 需翻墙)
Rxjs observable和promise以及Async-Await的互相操作
无论何时我都会被问到一个问题那就是如何再使用rxjs的时候使用promise和async和await呢?或者什么时候不能混合使用?我也说过几个不要同时使用的栗子。rxjs从一开始就可以很好的和promise一起使用。希望这篇文章能够很好的阐述一下。
如果他接受Observable, 那么他就可以接受promise
举个栗子,假如你在使用switchMap, 那么你就可以像返回一个Observable一样返回一个Promise。就像下面这样 jsbin:
经验证确实工作的很好
使用defer函数可以让返回promise的函数可以重新执行化
如果你的函数返回一个promise你可以使用Observable.defer包裹他,就可以使得他在发生错误是可以进行重试jsbin。 Observable.defer: Returns an observable sequence that invokes the specified factory function whenever a new observer subscribes.
经验证确实工作的很好
使用defer来封装async-await
defer是一个强有力的工具,你也可以用它来封装async-await函数
这个没有jsbin尝试失败了呃,因为不支持async-await,估计是我的姿势错了,但我在本地试了ok。
用forEach订阅一个Observable, 然后来创建使用async-await的函数来并发执行任务。
forEach 介绍:
经本地验证很ok
使用toPromise()和async/await来把最后一个订阅值返回为一个Promise
事实上toPromise比较怪异因为他并不是rxjs规范所定义的操作符,只是我们提供了而已。而且toPromise只会把最后一个值使用promise进行包装,那就意味着,若是Observable一直不触发complete那么这个promise就永远不会resolve。
const source$ = Observable.interval(1000).take(3); // 0, 1, 2 // waits 3 seconds, then logs "2". // because the observable takes 3 seconds to complete, and // the interval emits incremented numbers starting at 0 async function test() { console.log(await source$.toPromise()); }