Open evantianx opened 6 years ago
delay
延迟接受源 Observable 的时间。
参数可以为毫秒数,在 x 毫秒后接收源 Observable;或者指定一个具体日期,在到达该日期时接收。
const source = Rx.Observable.of(null)
const msg = Rx.Observable.merge(
source.mapTo('Hello'),
source.mapTo('From').delay(1000),
source.mapTo('Me').delay(2000)
)
const subscribe = msg.subscribe(val => console.log(val))
delayWhen
类似于 delay
,只不过延迟时间由参数来决定。
const source = Rx.Observable.interval(1000)
// 等同于:
// const example = source.delay(5000)
const example = source
.delayWhen(() => Rx.Observable.timer(5000))
const subscribe = example.subscribe(val => console.log(val))
let
let
不同于其他操作符,获取到的值不是数据源发出的数据,而是整个数据源 Observable。
这可以使我们可以更为灵活地操作数据源:
const source = Rx.Observable.from([1, 2, 3, 4, 5])
const obsThroughMyOperators = (myOperators) => {
return source
.map(val => val + 10)
.let(myOperators)
}
const myOperators = obs => obs.map(val => val + 50).map(val => val + 50)
// 111 112 113 114 115
const example = obsThroughMyOperators(myOperators).subscribe(val => console.log(val))
toPromise
将普通 Observable 转换为 Promise
const obs = val => Rx.Observable.of(val).delay(2000)
// Promise.all() 接受一个 Promise 数组作为参数
const source = () => {
return Promise.all([
obs('hello').toPromise(),
obs('world').toPromise()
])
}
// Promise.all result: hello,world
source().then(val => console.log(`Promise.all result: ${val}`))
buffer
buffer(closingNotifier: Observable): Observable
收集输出值,只有在给定的 Obserable 发出值时才发出由收集到的值所组成的数组。
// 每一秒发出值
const obs1 = Rx.Observable.interval(1000)
// 点击页面时发出值
const obs2 = Rx.Observable.fromEvent(document, 'click')
const example = obs1.buffer(obs2)
// 点击后才获取到值 [0, 1, 2, 3, 4]
const subscribe = example.subscribe(val => console.log(val))
bufferCount
bufferCount(bufferSize: number, startBufferEvery: number = null): Observable
收集输出值,只有达到给定的数量时才发出由收集到的值所组成的数组
关于第二个参数,是用来指定何时开启下一个缓冲区(即可以创造重叠的缓冲区,重叠部分由参数指定)
举例来说:
// 创建每1秒发出值的 observable const source = Rx.Observable.interval(1000);
如果第一个参数(bufferSize)是3,而第二个参数(startBufferEvery)是1: 第一次 interval 的值: buffer 1: [0] 第2次 interval 的值: buffer 1: [0,1] buffer 2: [1] 第3次 interval 的值: buffer 1: [0,1,2] 缓冲数量已达到3,发出缓冲区 buffer 2: [1,2] buffer 3: [2] 第4次 interval 的值: buffer 2: [1,2,3] 缓冲数量已达到3,发出缓冲区 buffer 3: [2, 3] buffer 4: [3]
bufferTime
bufferTime(bufferTimeSpan: number, bufferCreationInterval: number, scheduler: Scheduler): Observable
收集输出值,只有达到给定时间时才发出由收集到的值所组成的数组
类似于
bufferCount
也可以通过指定第二个参数来创建重叠缓冲区
bufferToggle
bufferToggle(openings: Observable, closingSelector: Function): Observable
开启开关时收集来自 Obsevable 的值,关闭开关后发出缓冲区的值
const sourceInterval = Rx.Observable.interval(1000)
// 意味着第五秒开启缓冲区接受值, 以后每隔五秒创建一个新的缓冲区
const startInterval = Rx.Observable.interval(5000)
// 三秒释放一次缓冲区的值
const closeInterval = val => {
console.log(`val ${val} was emitted`)
return Rx.Observable.interval(3000)
}
// [4, 5, 6] ... [9, 10, 11]
const subscribe = sourceInterval.bufferToggle(startInterval, closeInterval).subscribe(val => console.log(val))
bufferWhen
bufferWhen(closingSelector: function): Observable
收集来自 Observable 的值, 直到关闭选择器发出值才发出缓冲区的值
const source = Rx.Observable.interval(1000)
const closeInterval = () => Rx.Observable.interval(5000)
// [0, 1, 2, 3]... [4, 5, 6, 7, 8]
const subscribe = source.bufferWhen(closeInterval).subscribe(val => console.log(val))
优先关闭,其次接受 --0--1--2--3-- 经过了5秒 此时关闭选择器发出了第一个值,故缓冲区关闭(同时开启新的缓冲区,将 4 存为第一个值) 4--5--6--7--8-- 关闭旧缓冲区 开启新缓冲区 循环♻️
concatMap
concatMap(project: function, resultSelector: function): Observable
将值映射成内部的 Observable, 并按照顺序发出
此处的顺序指的是,只有前面的 Observable 发出值,才会执行后面的 Observable
// 注意 concatMap 和 mergeMap 的区别
const concatMapSub = Rx.Observable.of(2000, 1000)
.concatMap(v => Rx.Observable.of(v).delay(v))
// concatMap: 2000, concatMap: 1000
.subscribe(v => console.log('concatMap:', v))
const mergeMapSub = Rx.Observable.of(2000, 1000)
.mergeMap(v => Rx.Observable.of(v).delay(v))
// mergeMap: 1000, mergeMap: 2000
.subscribe(v => console.log('mergeMap:', v))
concatMapTo
concatMapTo(observable: Observable, resultSelector: function): Observable
当前一个 observable 完成时订阅提供的 observable 并发出值。
const interval = Rx.Observable.interval(2000)
const msg = Rx.Observable.of('second(s) elapsed!')
const example = interval.concatMapTo(msg, (time, msg) => `${time} ${msg}`)
// 0 second(s) elapsed!
// 1 second(s) elapsed!
const subscribe = example.subscribe(val => console.log(val))
expand
expand(project: function, concurrent: number, scheduler: Scheduler): Observable
递归调用传给的函数
const source = Rx.Observable.of(2)
const example = source
.expand(val => {
console.log(`Passed value: ${val}`)
return Rx.Observable.of(val + 1)
})
.take(5)
const subscription = example.subscribe(val => console.log(val)) // 2 3 4 5 6
groupBy(keySelector: Function, elementSelector: Function): Observable
根据所提供的函数来将值分组,返回多个 Observable
const cat = [
{
name: 'yellow',
age: 3,
},{
name: '1bean',
age: 1,
},{
name: '2bean',
age: 1
},{
name: '3bean',
age: 1,
},{
name: '3bean',
age: 1
},{
name: '4bean',
age: 2,
}
]
const source = Rx.Observable.from(cat)
const example = source
.groupBy(cat => cat.age)
.flatMap(group => group.reduce((acc, curr) => [...acc. curr], []))
const subscription = example.sunscribe(val => console.log(val))
map(project: Function, thisArg: any): Observable
对源 Observable 的每个值应用投射函数
const source = Rx.Observable.from([1, 2, 3, 4, 5])
const example = source.map(val => val + 10)
const subscription = example.subscribe(val => console.log(val)) // 11 12 13 14 15
mapTo(value: any): Observable
把 Observable 发出的每个值都映射为常量
const source = Rx.Observable.fromEvent(document, 'click')
const example = source.mapTo('Hello World! ')
const subscription = example.subscribe(val => console.log(val))
mergeMap(project: function: Observable, resultSelector: function: any, concurrent: number): Observable
不仅映射还要打平成一个 Observable
flatMap 是 mergeMap 的别名
partition(predicate: function: boolean, thisArg: any): [Observable, Observable]
将一个 Observable 分割成两个按照预定规则形成的 Observable
const source = Rx.Observable.from([1, 2, 3, 4, 5, 6])
const example = source
.map(val => {
if (val > 3) {
throw `${val} greater than 3! `
}
return {success: val}
})
.catch(val => Rx.Observable.of({error: val}))
const [success, error] = example.partition(res => res.success)
const subscription = Rx.Observable.merge(
success.map(val => `Success! ${val.success}`),
error.map(val => `Error! ${val.error}`)
).subscribe(val => console.log(val))
pluck(properties: ...args): Observable
选择属性来发出
// 提取属性
const people = [
{ name: 'Tim', age: 40},
{ name: 'Sarah', age: 37 }
]
const source = Rx.Observable.from(people)
const example = source.pluck('name')
// Tim, Sarah
const subscription = example.subscribe(val => console.log(val))
// 提取嵌套属性,不存在时返回 undefined
const source = Rx.Observable.from([
{name: 'Joe', age: 30, job: {title: 'Developer', language: 'JavaScript'}},
{name: 'Sarah', age:35}
]);
// 提取 job 中的 title 属性
const example = source.pluck('job', 'title');
// 输出: "Developer" , undefined
const subscribe = example.subscribe(val => console.log(val));
scan(accumulator: function, seed: any): Observable
归并并输出每次值 区别于 reduce
takeUntil(notifier: Observable): Observable
由给定的 Observable 确定前述 Observable 是否提前结束
如果后述 Observable 不发出值则前述 Observable 将一直执行下去直到结束。
const source = Rx.Observable.interval(1000)
const example = source.takeUntil(Rx.Observable.timer(5000))
const subscription = example.subscribe(val => console.log(val)) // 0 1 2 3
race(): Observable
提供的 Observable 中谁最快则发出谁的值
const example = Rx.Observable.race(
Rx.Observable.interval(1000),
Rx.Observable.interval(1500),
Rx.Observable.interval(2000),
Rx.Observable.interval(2500)
)
const subscribe = example.subscribe(val => console.log(val)) // 1 2 3 间隔为 1s
catch(project: function): Observable
处理 Observable 序列中的错误
const source = Rx.Observable.throw('This is an error!')
const example = source.catch(val => Rx.Observable.of(`Oops, ${val}`))
const subscription = example.subscribe(val => console.log(val))
do
返回与源相同的 Observable, 但为每一项都指定运行时观察者或回调函数。
一般用来打印日志或进行其他检查: