lei4519 / blog

记录、分享
4 stars 1 forks source link

Rxjs 操作符快速入门 #52

Open lei4519 opened 5 months ago

lei4519 commented 5 months ago

前言

好的程序员懂得如何从重复的工作中逃脱:

- 操作DOM时,发现了Jquery。  

- 操作JS时,发现了lodash。  

- 操作事件时,发现了Rx。  

Rxjs 本身的 概念 并不复杂,简单点说就是对观察者模式的封装,观察者模式在前端领域大行其道,不管是使用框架还是原生 JS,你一定都体验过。

在我看来,Rxjs 的强大和难点主要体现对其近 120 个操作符的灵活运用。

可惜官网中对这些操作符的介绍晦涩难懂,这就导致了很多人明明理解 Rxjs 的概念,却苦于不懂的使用操作符而黯然离场。

本文总结自《深入浅出 Rxjs》一书,旨在于用最简洁、通俗易懂的方式来说明 Rxjs 常用操作符的作用。学习的同时,也可以做为平时快速查阅的索引列表

阅读提醒

需要注意流的完成和订阅时间,某些操作符必须等待流完成之后才会触发。

其实根据操作符的功能我们也可以大致推断出结果:如果一个操作符需要拿到所有数据做操作、判断,那一定是需要等到流完成之后才能进行。

创建流操作符

创建流操作符最为流的起点,不存在复杂难懂的地方,这里只做简单的归类,具体使用查阅官网即可,不再赘述。

同步流

异步流

合并类操作符

订阅多条流,将接收到的数据向下吐出。

Concat

首尾连接

concat(source1$, source2$)  

Merge

先到先得

merge(source1$, source2$)  

Zip

一对一合并(像拉链一样)

zip(source1$, source2$)  

combineLatest

合并所有流的最后一个数据

combineLatest(source1$, source2$)  

withLatestFrom

合并所有流的最后一个数据,功能同 combineLatest,区别在于:

source1$.pipe(withLatesFrom(source2$, source3$))  

Race

胜者通吃

race(source1$, source2$)  

startWith

在流的前面填充数据

source1$.pipe(startWith(1))  

forkJoin

合并所有流的最后一个数据

forkJoin(source1$, source2$)  

辅助类操作符

Count

当前流完成之后,统计流一共发出了多少个数据。

source$.pipe(count())  

mix/max

当前流完成之后,计算 最小值/最大值。

source$.pipe(max())  

Reduce

同数组用法,当前流完成之后,将接受的所有数据依次传入计算。

source$.pipe(reduce(() => {}, 0))  

布尔类操作符

Every

同数组,需要注意的是:如果条件都为 true,也要等到流完成才会吐出结果。

原因也很简单,如果流没有完成,那怎么保证后面的数据条件也为 true 呢。

source$.pipe(every(() => true / false))  

find、findIndex

同数组,注意点同 every

source$.pipe(find(() => true / false))  

isEmpty

判断流是不是一个数据都没有吐出就完成了。

source$.pipe(isEmpty())  

defaultIfEmpty

如果流满足 isEmpty,吐出默认值。

source$.pipe(defaultIfEmpty(1))  

过滤类操作符

Filter

同数组

source$.pipe(filter(() => true / false))  

First

取第一个满足条件的数据,如果不传入条件,就取第一个

source$.pipe(first(() => true / false))  

Last

取第一个满足条件的数据,如果不传入条件,就取最后一个,流完成才会触发。

source$.pipe(last(() => true / false))  

Take

拿够前 N 个就完成

source$.pipe(take(N))  

takeLast

拿够后 N 个就结束,因为是后几个所以只有流完成了才会将数据一次发出。

source$.pipe(takeLast(N))  

takeWhile

给我传判断函数,什么时候结束你来定

source$.pipe(takeWhile(() => true / false))  

takeUntil

给我一个流 (A),什么时候这个流 (A) 吐出数据了,我就完成

source$.pipe(takeUntil(timer(1000)))  

Skip

跳过前 N 个数据

source$.pipe(skip(N))  

skipWhile

给我传函数,跳过前几个你来定

source$.pipe(skipWhile(() => true / false))  

skipUntil

给我一个流 (A),什么时候这个流 (A) 吐出数据了,我就不跳了

source$.pipe(skipUntil(timer(1000)))  

转化类操作符

Map

source$.pipe(map(() => {}))  

mapTo

source$.pipe(mapTo("a"))  

Pluck

source$.pipe(pluck("v"))  

有损回压控制

对防抖、节流不了解的请自行查阅相关说明。

Throttle

传入一个流 (A),对上游数据进行节流,直到流 (A) 吐出数据时结束节流向下传递数据,然后重复此过程

source$.pipe(throttle(interval(1000)))  

throttleTime

根据时间 (ms) 节流

source$.pipe(throttleTime(1000))  

Debounce

传入一个流 (A),对上游数据进行防抖,直到流 (A) 吐出数据时结束防抖向下传递数据,然后重复此过程

source$.pipe(debounce(interval(1000)))  

debounceTime

根据时间 (ms) 防抖

source$.pipe(debounceTime(1000))  

Audit

audit 同 throttle,区别在于:

source$.pipe(audit(interval(1000)))  

auditTime

同上,不再赘述

source$.pipe(auditTime(1000))  

Sample

传入一个流 (A),对上游数据吐出的最新数据进行缓存,直到流 (A) 吐出数据时从缓存中取出数据向下传递,然后重复此过程

source$.pipe(sample(interval(1000)))  

sampleTime

根据时间 (ms) 取数

source$.pipe(sampleTime(1000))  

Distinct

所有元素去重,返回当前流中从来没有出现过的数据。

传入函数时,根据函数的返回值分配唯一 key。

source$.pipe(distinct())  
Observable.of({ age: 4, name: "Foo" }).pipe(distinct((p) => p.name))  

distinctUntilChanged

相邻元素去重,只返回与上一个数据不同的数据。

传入函数时,根据函数的返回值分配唯一 key。

source$.pipe(distinctUntilChanged())  

distinctUntilKeyChanged

source$.pipe(distinctUntilKeyChanged("id"))  

ignoreElements

忽略上游的所有数据,当上游完成时,ignoreElements 也会完成。(我不关心你做了什么,只要告诉我完没完成就行)

source$.pipe(ignoreElements())  

elementAt

只获取上游数据发出的第 N 个数据。

第二个参数相当于默认值:当上游没发出第 N 个数据就结束时,发出这个参数给下游。

source$.pipe(elementAt(4, null))  

Single

source$.pipe(single(() => true / false))  

无损回压控制

bufferTime、windowTime

缓存上游吐出的数据,到指定时间后吐出,然后重复。

source$.pipe(bufferTime(1000))  

bufferCount、windowCount

缓存上游吐出的数据,到指定个数后吐出,然后重复。

第二个参数用来控制每隔几个数据开启一次缓存区,不传时可能更符合我们的认知。

source$.pipe(bufferCount(10))  

bufferWhen、windowWhen

传入一个返回流 (A) 的工厂函数

流程如下:

  1. 触发订阅时,调用工厂函数拿到流 (A),开始缓存
  2. 等待流 (A) 发出数据时,将缓存的值向下吐出
  3. 重新调用工厂函数,拿到一个新的流 (A),开启缓存,循环往复。
randomSeconds = () => timer((Math.random() * 10000) | 0)  
source$.pipe(bufferWhen(randomSeconds))  

bufferToggle、windowToggle

第一个参数为开启缓存流 (O),第二个参数为返回关闭缓存流 (C) 的工厂函数

流程如下:

  1. 当开启流 (O) 吐出数据时,调用工厂函数获取关闭流 (C),开始缓存
  2. 等待关闭流 (C) 吐出数据后,将缓存的值向下吐出
  3. 等待开启流 (O) 吐出数据,然后重复步骤 1
source$.pipe(bufferToggle(interval(1000), () => randomSeconds))  

buffer、window

传入一个关闭流 (C),区别与 bufferWhen:传入的是流,而不是返回流的工厂函数。

触发订阅时,开始缓存,当关闭流 (C) 吐出数据时,将缓存的值向下传递并重新开始缓存。

source$.pipe(buffer(interval(1000)))  

累计数据

Scan

scan 和 reduce 的区别在于:

区别于其他流,scan 拥有了保存、记忆状态的能力。

source$.pipe(scan(() => {}, 0))  

mergeScan

同 scan,但是返回的不是数据而是一个流。

source$.pipe(mergeScan(() => interval(1000)))  

错误处理

Catch

捕获错误

source$.pipe(catch(err => of('I', 'II', 'III', 'IV', 'V')))  

Retry

传入数字 N,遇到错误时,重新订阅上游,重试 N 次结束。

source$.pipe(retry(3))  

retryWhen

传入流 (A),遇到错误时,订阅流 (A),流 (A) 每吐出一次数据,就重试一次。流完成,retrywfhen 也完成。

source$.pipe(retryWhen((err) => interval(1000)))  

Finally

source$.pipe(finally())  

多播操作符

Multicast

接收返回 Subject 的工厂函数,返回一个 hot observable(HO)

当链接开始时,订阅上游获取数据,调用工厂函数拿到 Subject,上游吐出的数据通过 Subject 进行多播。

source$.pipe(multicast(() => new Subject()))  

Publish

source$.pipe(publish())  

Share

基于 publish 的封装,返回调用 refCount 后的结果(看代码)

source$.pipe(share())  
// 等同于  
source$.pipe(publish().refCount())  

publishLast

当上游完成后,多播上游的最后一个数据并完成当前流。

source$.pipe(publishLast())  

publishReplay

传入缓存数量 N,缓存上游最新的 N 个数据,当有新的订阅时,将缓存吐出。

source$.pipe(publishReplay(1))  

publishBehavior

缓存上游吐出的最新数据,当有新的订阅时,将最新值吐出。如果被订阅时上游从未吐出过数据,就吐出传入的默认值。

source$.pipe(publishBehavior(0))  

高阶合并类操作符

如下代码示例,顶层的流吐出的并不是普通的数据,而是两个会产生数据的流,那么此时下游在接受时,就需要对上游吐出的流进行订阅获取数据,如下:

of(of(1, 2, 3), of(4, 5, 6))  
    .subscribe(  
        ob => ob.subscribe((num) => {  
            console.log(num)  
        })  
    )  

上面的代码只是简单的将数据从流中取出,如果我想对吐出的流运用前面讲的操作符应该怎么办?

cache = []  
of(of(1, 2, 3), of(4, 5, 6))  
    .subscribe({  
        next: ob => cache.push(ob),  
        complete: {  
            concat(...cache).subscribe(console.log)  
            zip(...cache).subscribe(console.log)  
        }  
    })  

先不管上述实现是否合理,我们已经可以对上游吐出的流运用操作符了,但是这样实现未免也太过麻烦,所以 Rxjs 为我们封装了相关的操作符来帮我们实现上述的功能。

总结一下:高阶操作符操作的是流,普通操作符操作的是数据。

concatAll

对应 concat,缓存高阶流吐出的每一个流,依次订阅,当所有流全部完成,concatAll 随之完成。

source$.pipe(concatAll())  

mergeAll

对应 merge,订阅高阶流吐出的每一个流,任意流吐出数据,mergeAll 随之吐出数据。

source$.pipe(mergeAll())  

zipAll

对应 zip,订阅高阶流吐出的每一个流,合并这些流吐出的相同索引的数据向下传递。

source$.pipe(zipAll())  

combineAll

对应 combineLatest,订阅高阶流吐出的每一个流,合并所有流的最后值向下传递。

source$.pipe(combineAll())  

高阶切换类操作符

Switch

切换流 - 喜新厌旧

高阶流每吐出一个流时,就会退订上一个吐出的流,订阅最新吐出的流。

source$.pipe(switch())  

Exhaust

切换流 - 长相厮守

当高阶流吐出一个流时,订阅它。在这个流没有完成之前,忽略这期间高阶流吐出的所有的流。当这个流完成之后,等待订阅高阶流吐出的下一个流订阅,重复。

source$.pipe(exhaust())  

高阶 Map 操作符

看完例子,即知定义。

例子

实现如下功能:

普通实现

mousedown$ = formEvent(document, "mousedown")  
mousemove$ = formEvent(document, "mousemove")  

mousedown$.pipe(  
  map(() => mousemove$),  
  mergeAll()  
)  
  1. mousedown 事件触发后,使用 map 操作符,将向下吐出的数据转换成 mousemove 事件流。
  2. 由于返回的是流而非数据,所以需要使用 mergeAll 操作符帮我们将流中的数据展开。
  3. 这样我们最终接受到的就是 mousemoveevent 事件对象了。

注:由于只有一个事件流,所以使用上面介绍的任意高阶合并操作符都是一样的效果。

高阶 Map 实现

mousedown$.pipe(mergeMap(() => mousemove$))  

不难看出,所谓高阶 map,就是

concatMap   = map + concatAll  
mergeMap        = map + mergeAll  
switchMap   = map + switch  
exhaustMap  = map + exhaust  
concatMapTo = mapTo + concatAll  
mergeMapTo  = mapTo + mergeAll  
switchMapTo = mapTo + switch  

Expand

类似于 mergeMap,但是,所有传递给下游的数据,同时也会传递给自己,所以 expand 是一个递归操作符。

source$.pipe(expand((x) => (x === 8 ? EMPTY : x * 2)))  

数据分组

groupBy

输出流,将上游传递进来的数据,根据 key 值分类,为每一个分类创建一个流传递给下游。

key 值由第一个函数参数来控制。

source$.pipe(groupBy((i) => i % 2))  

Partition

groupBy 的简化版,传入判断条件,满足条件的放入第一个流中,不满足的放入第二个流中。

简单说:

source$.pipe(partition())  

结语

以上就是本文的全部内容了,希望你看了会有收获。

如果有不理解的部分,可以在评论区提出,大家一起成长进步。

祝大家早日拿下 Rxjs 这块难啃的骨头。

参考资料