cisen / blog

Time waits for no one.
129 stars 20 forks source link

rxjs相关 #401

Open cisen opened 5 years ago

cisen commented 5 years ago

https://segmentfault.com/a/1190000012252368 https://segmentfault.com/a/1190000013300134 https://rxjs-cn.github.io/learn-rxjs-operators/ 接口控制:https://segmentfault.com/a/1190000010088631

总结

Observable可以认为是加强版的Promise

RxJS快速入门

2.1 初级核心概念

上面我们已经使用同步编程创建好了一个流的处理过程,但此时ob作为源并不会立刻发射数据,如果我们在map中打印n是不会得到任何输出的,因为ob作为Observable序列必须被“订阅”才能够触发上述过程,也就是subscribe(发布/订阅模式)。

const ob = Observable.interval(1000);
ob.take(3).map(n => n * 2).filter(n => n > 0).subscribe(n => console.log(n));

结果:

2 //第2秒
4 //第3秒

上面代码中我们给subscribe传入了一个函数,这其实是一种简写,subscribe完整的函数签名如下:

ob.subscribe({
    next: d => console.log(d),
    error: err => console.error(err),
    complete: () => console.log('end of the stream')
})

直接给subscribe传入一个函数会被当做是next函数。这个完整的包含3个函数的对象被称为observer(观察者),表示的是对序列结果的处理方式。next表示数据正常流动,没有出现异常;error表示流中出错,可能是运行出错,http报错等等;complete表示流结束,不再发射新的数据。在一个流的生命周期中,error和complete只会触发其中一个,可以有多个next(表示多次发射数据),直到complete或者error。

observer.next可以认为是Promise中then的第一个参数,observer.error对应第二个参数或者Promise的catch。

RxJS同样提供了catch操作符,err流入catch后,catch必须返回一个新的Observable。被catch后的错误流将不会进入observer的error函数,除非其返回的新observable出错。

Observable.of(1).map(n => n.undefinedMethod()).catch(err => {
    // 此处处理catch之前发生的错误
    return Observable.of(0); // 返回一个新的序列,该序列成为新的流。
});

2.2 创建可观察序列

创建一个序列有很多种方式,我们仅列举常用的几种:

2.3 合并序列

合并序列也属于创建序列的一种,例如有这样的需求:进入某个页面后拿到了一个列表,然后需要对列表每一项发出一个http请求来获取对应的详细信息,这里我们把每个http请求作为一个序列,然后我们希望合并它们。 合并有很多种方式,例如N个请求按顺序串行发出(前一个结束再发下一个);N个请求同时发出并且要求全部到达后合并为数组,触发一次回调;N个请求同时发出,对于每一个到达就触发一次回调。 如果不用RxJS,我们会比较难处理这么多情形,不仅实现麻烦,维护更麻烦,下面是使用RxJS对上述需求的解决方案:

const ob1 = Observable.ajax('api/detail/1');
const ob2 = Observable.ajax('api/detail/2');
...
const obs = [ob1, ob2...];
// 分别创建对应的HTTP请求。

1. N个请求按顺序串行发出(前一个结束再发下一个)
```js
Observable.concat(...obs).subscribe(detail => console.log('每个请求都触发回调'));
  1. N个请求同时并行发出,对于每一个到达就触发一次回调
    Observable.merge(...obs).subscribe(detail => console.log('每个请求都触发回调'));
  2. N个请求同时发出并且要求全部到达后合并为数组,触发一次回调
    Observable.forkJoin(...obs).subscribe(detailArray => console.log('触发一次回调'));

3. 使用RxJS实现搜索功能

搜索是前端开发中很常见的功能,一般是监听的keyup事件,然后将内容发送到后台,并展示后台返回的数据。

<input id="text"></input>
<script>
    var text = document.querySelector('#text');
    text.addEventListener('keyup', (e) =>{
        var searchText = e.target.value;
        // 发送输入内容到后台
        $.ajax({
            url: `/search/${searchText}`,
            success: data => {
              // 拿到后台返回数据,并展示搜索结果
              render(data);
            }
        });
    });
</script>

上面代码实现我们要的功能,但存在两个较大的问题:

多余的请求 当想搜索“爱迪生”时,输入框可能会存在三种情况,“爱”、“爱迪”、“爱迪生”。而这三种情况将会发起 3 次请求,存在 2 次多余的请求。

已无用的请求仍然执行 一开始搜了“爱迪生”,然后马上改搜索“达尔文”。结果后台返回了“爱迪生”的搜索结果,执行渲染逻辑后结果框展示了“爱迪生”的结果,而不是当前正在搜索的“达尔文”,这是不正确的。

减少多余请求数,可以用 setTimeout 函数节流的方式来处理,核心代码如下:

<input id="text"></input>
<script>
    var text = document.querySelector('#text'),
        timer = null;
    text.addEventListener('keyup', (e) =>{
        // 在 250 毫秒内进行其他输入,则清除上一个定时器
        clearTimeout(timer);
        // 定时器,在 250 毫秒后触发
        timer = setTimeout(() => {
            console.log('发起请求..');
        },250)
    })
</script>

已无用的请求仍然执行 的解决方式,可以在发起请求前声明一个当前搜索的状态变量,后台将搜索的内容及结果一起返回,前端判断返回数据与当前搜索是否一致,一致才走到渲染逻辑。最终代码为:

<input id="text"></input>
<script>
    var text = document.querySelector('#text'),
        timer = null,
        currentSearch = '';

    text.addEventListener('keyup', (e) =>{
        clearTimeout(timer)
        timer = setTimeout(() => {
            // 声明一个当前所搜的状态变量
            currentSearch = '书'; 

            var searchText = e.target.value;
            $.ajax({
                url: `/search/${searchText}`,
                success: data => {
                    // 判断后台返回的标志与我们存的当前搜索变量是否一致
                    if (data.search === currentSearch) {
                        // 渲染展示
                        render(data);
                    } else {
                        // ..
                    }
                }           
            });
        },250)
    })
</script>

上面代码基本满足需求,但代码开始显得乱糟糟。我们来使用RxJS实现上面代码功能,如下:

var text = document.querySelector('#text');
var inputStream = Rx.Observable.fromEvent(text, 'keyup') //为dom元素绑定'keyup'事件
                    .debounceTime(250) // 防抖动
                    .pluck('target', 'value') // 取值
                    .switchMap(url => Http.get(url)) // 将当前输入流替换为http请求
                    .subscribe(data => render(data)); // 接收数据

RxJS能简化你的代码,它将与流有关的内部状态封装在流中,而不需要在流外定义各种变量来以一种上帝视角控制流程。Rx的编程方式使你的业务逻辑流程清晰,易维护,并显著减少出bug的概率。

个人总结的常用操作符:

对于这些操作符的使用不再详细描述,请参阅网上资料。

中文官网 http://cn.rx.js.org/ 附上个人翻译的一些文章

RxJS:冷热模式的比较 RxJS: map, flatMap和flatMapLatest的区别 RxJS: 详解forkJoin, zip, combineLatest之间的区别 参考文章:构建流式应用:RxJS 详解

cisen commented 5 years ago

先后请求

Map 和 Subscribe

有些时候,当我们发送下一个请求时,需要依赖于上一个请求的数据。即我们在需要在上一个请求的回调函数中获取相应数据,然后在发起另一个 HTTP 请求。

import { Component, OnInit } from '@angular/core';
import { Http } from '@angular/http';
import 'rxjs/add/operator/map';

@Component({
  selector: 'app-root',
  template: `
    <p>{{username}} Detail Info</p>
    {{user | json}}
  `
})
export class AppComponent implements OnInit {
  constructor(private http: Http) { }

  apiUrl = 'https://jsonplaceholder.typicode.com/users';
  username: string = '';
  user: any;

  ngOnInit() {
    this.http.get(this.apiUrl)
      .map(res => res.json())
      .subscribe(users => {
        let username = users[6].username;
        this.http.get(`${this.apiUrl}?username=${username}`)
          .map(res => res.json())
          .subscribe(
            user => {
              this.username = username;
              this.user = user;
            });
      });
  }
}

在上面示例中,我们先从 https://jsonplaceholder.typicode.com/users 地址获取所有用户的信息,然后再根据指定用户的 username 进一步获取用户的详细信息。虽然功能实现了,但有没有更好的解决方案呢?答案是有的,可以通过 RxJS 库中提供的 mergeMap 操作符来优化上述的流程。

mergeMap

import { Component, OnInit } from '@angular/core';
import { Http } from '@angular/http';
import 'rxjs/add/operator/map';
import 'rxjs/add/operator/mergeMap';

@Component({
  selector: 'app-root',
  template: `
    <p>{{username}} Detail Info</p>
    {{user | json}}
  `
})
export class AppComponent implements OnInit {
  constructor(private http: Http) { }

  apiUrl = 'https://jsonplaceholder.typicode.com/users';

  username: string = '';

  user: any;

  ngOnInit() {
    this.http.get(this.apiUrl)
      .map(res => res.json())
      .mergeMap(users => {
        this.username = users[6].username;
        return this.http.get(`${this.apiUrl}?username=${this.username}`)
          .map(res => res.json())
      })
      .subscribe(user => this.user = user);
  }
}

在上面示例中,我们通过 mergeMap 操作符,解决了嵌套订阅的问题。最后我们来看一下如何处理多个并行的 Http 请求。 一个ajaxObserver只有一个pipe

import { map, mergeMap } from 'rxjs/operators';
getUserInfo().pipe(mergeMap((userInfo => getAddressList(userInfo.data.orgId))));
// 先获取用户信息再地址列表
export function getUserInfoAndAddressList() {
  return GET(`/organization-aggregation/my/customers`).pipe(map(res => res.data), mergeMap(getAddressList2))
}
// 获取地址列表
export function getAddressList(orgId: string | number) {
  return GET(`/organization-aggregation/organizations/${orgId}/delivery-addresses`).pipe(map(res => res.data));
}
// 获取用户信息
export function getUserInfo() {
  return GET(`/organization-aggregation/my/customers`).pipe(map(res => res.data));
}

操作符

合并操作符

combineLatest combineLatest与zip很相似,combineLatest一开始也会等待每个子流都发射完一次数据,但是在合并时,如果子流1在等待其他流发射数据期间又发射了新数据,则使用子流最新发射的数据进行合并,之后每当有某个流发射新数据,不再等待其他流同步发射数据,而是使用其他流之前的最近一次数据进行合并。

const ob1 = Rx.Observable.interval(1000).map(d => `ob1:${d}`).take(3);
const ob2 = Rx.Observable.interval(2000).map(d => `ob2:${d}`).take(2);

Rx.Observable.combineLatest(ob1, ob2).subscribe({
  next: (data) => console.log(data),
  complete: () => console.log('complete')
});
// ["ob1:1", "ob2:0"] ob1等待ob2发射,当ob2发射时ob1已经发射了第二次数据,使用ob1的第二次数据
// ["ob1:2", "ob2:0"] ob1继续发射第三次也是最后一次数据,ob2虽然还未发射,但是可以使用它上一次的数据
// ["ob1:2", "ob2:1"] ob2发射第二次也是最后一次数据,使ob1上一次的数据。
// "complete"

forkJoin

Rx.Observable.forkJoin(ob1, ob2).subscribe((data) => console.log(data)); // ["ob1:2", "ob2:1"]

ob1会在发射完第三个数据时停止发射,ob2会在发射完第二个数据时停止,而forkJoin合并后的流会等到ob1和ob2都结束时,发射一次数据,也就是触发一次subscribe里的回调,接收到的数据为ob1和ob2发射的最后一次数据的数组。

**zip**
- 每个ob都触发玩对应次数的next就会统一触发一次complete
zip工作原理如下,当每个传入zip的流都发射完毕第一次数据时,zip将这些数据合并为数组并发射出去;当这些流都发射完第二次数据时,zip再次将它们合并为数组并发射。以此类推直到其中某个流发出结束信号,整个被合并后的流结束,不再发射数据。
```js
const ob1 = Rx.Observable.interval(1000).map(d => `ob1:${d}`).take(3);
const ob2 = Rx.Observable.interval(2000).map(d => `ob2:${d}`).take(2);

Rx.Observable.zip(ob1, ob2).subscribe({
  next: (data) => console.log(data),
  complete: () => console.log('complete')
});
// ["ob1:0", "ob2:0"] ob1等待ob2发射数据,之后合并
// ["ob1:1", "ob2:1"] 此时ob2结束,整个合并的流也结束
// "complete"

zip和forkJoin的区别在于,forkJoin仅会合并各个子流最后发射的一次数据,触发一次回调;zip会等待每个子流都发射完一次数据然后合并发射,之后继续等待,直到其中某个流结束(因为此时不能使合并的数据包含每个子流的数据)。

cisen commented 5 years ago

总结

  1. 适合某一事件改变指定组件,过多模糊组件不适合
  2. 接口防抖功能很好用