10081677wc / blog

78 stars 6 forks source link

基于 RxJS 构建流式应用 #8

Open 10081677wc opened 6 years ago

10081677wc commented 6 years ago

常规方式实现搜索功能

  1. 监听文本框的输入事件
  2. 发送输入内容至服务器端
  3. 将得到的响应数据进行处理并展示成结果

我们可以很快写出下面的代码来实现需求的功能:

$('input.text').on('keyup', function() {
    $.ajax({
        url: `${API}/${$(this).val()}`,
        success: (res) => {
            if (res.status === 1) {
                render(res.data);
            }
        }
    });
});

显然上述代码存在一个较大的问题(即存在多余的无效请求),我们可以使用 setTimeout 函数截流 的方式对其作出优化:

let t = null;
$('input.text').on('keyup', function() {
    clearTimeout(t);
    t = setTimeout(() => {
        $.ajax({
            url: `${API}/${$(this).val()}`,
            success: (res) => {
                if (res.status === 1) {
                    render(res.data);
                }
            }
        });
    }, 750);
});

基于 RxJS 实现搜索功能

let $input = document.querySelector('input.text');
Rx.Observable.fromEvent($input, 'keyup')
    .debounce(750)
    .pluck('target', 'value')
    .flatMapLatest(url => Http.get(url))
    .filter(r => r.status === 1)
    .map(r => r.data)
    .subscribe(d => render(d));

什么是 RxJS

RxJS

RxJS 是 Reactive Extensions for JavaScript 的缩写,起源于 Reactive Extensions,是一个基于可观测数据流在异步编程应用中的库,RxJS 是 Reactive Extensions 在 JavaScript 上的实现,其他语言也有相应的实现如 RxJava、RxAndroid、RxSwift 等。

适用场景

适用于多个复杂的异步或事件组合在一起,以及处理多个数据序列的情况,假如没有被复杂的异步、事件和数据序列困扰适用 Promise 即可。

Stream 流

流是在时间流逝的过程中产生的一系列事件,它具有时间与事件响应的概念。

RxJS 基础实现原理简析

RxJS 是基于 观察者模式迭代器模式 以函数式编程思维来实现的。

观察者模式

观察者模式 在前端中最常见的应该是 DOM 事件的监听和触发: 订阅:通过 addEventListener 订阅 document.body 的 click 事件 发布:当 body 节点被点击时便会向订阅者发布这个消息

迭代器模式

迭代器模式 可以用 JavaScript 提供的 Iterable Protocol 可迭代协议来表示,它不是具体的变量类型,而是一种可实现协议,例如 Array、Set 等都属于内置的可迭代类型,可以通过 iterator 方法来获取一个迭代对象,调用迭代对象的 next 方法将获取一个元素对象:

var iterable = [1, 2];
var iterator = iterable[Symbol.iterator]();
iterator.next(); // => { value: "1", done: false}
iterator.next(); // => { value: "2", done: false}
iterator.next(); // => { value: undefined, done: true}

可以使用下面的方法遍历迭代器:

var iterable = [1, 2];
var iterator = iterable[Symbol.iterator]();
var iterator = iterable();

while(true) {
    try {
        let result = iterator.next();  // <= 获取下一个值
    } catch (err) {
        handleError(err);  // <= 错误处理
    }

    if (result.done) {
        handleCompleted();  // <= 无更多值(已完成)
        break;
    }

    doSomething(result.value);
}

RxJS 中的观察者模式和迭代器模式

RxJS 中含有两个基本概念:ObservablesObserver:Observables 作为被观察者,是一个值或事件的流集合;而 Observer 则作为观察者,根据 Observables 进行处理,Observables 与 Observer 之间的订阅发布关系如下:

订阅:Observer 通过 Observable 提供的 subscribe 方法订阅 Observable 发布:Observable 通过回调 onNext 方法向 Observer 发布事件

RxJS 中的 Observer 不仅有 next 方法来接收 Observable 的事件,还提供有另外的两个方法:error 和 complete:

var Observer = {
    next(value) { /* 处理值*/ },
    error(error) { /* 处理异常 */ },
    complete() { /* 处理已完成态 */ }
};

RxJs 中的基本概念

Observable

Observer

Observer 就是当 Observable 返回值的时候接受该值的函数。

Subject

Subject 是一种能够发射数据给多个 observer 的 Observable。

Operators

Observable 上有很多方法比如 map,filter,merge etc. 它们基于调用它们的 observable 返回一个全新的 observable,Operators 分为两种:instance operators 和 static operators,instance operators 是存在于 observable 实例上的方法,而 static operators 是存在于 Observable 这个类型上的静态方法。

常用的 Operators 操作

range

使用 range 方法可以创建一个有限长度的整数序列:第一个参数声明序列的起始值,第二个参数 声明序列的元素数量。

// 从 30 开始生成 3 个数的序列
// 输出:30 31 32 33
Rx.Observable.range(30,4)  

flatMap

flatMap 方法包括两个步骤:首先将一个序列的各元素映射为序列,然后将各元素序列融合。

/* 序列:10 20 30 */
var source = Rx.Observable.of(10, 20, 30);

var mf = function(item) {
    return Rx.Observable.range(item, 3);
};

/* 序列:10 11 12 20 21 22 30 31 32 */
var target = source.mergeMap(mf);

flatMapLatest

flatMapLatest 与 flatMap的区别在于,它仅仅将最新激活的元素序列中的元素输出到目标序列中,与 flatMap 一样其参数需要指定一个映射函数,其返回值应当为一个序列。

flatMap 有一个很适用的场景就是搜索框,在用户输入一串字符后,将其发送到服务器并获取搜索结果,这里就涉及到两 个Observable:

Observable
    .fromEvent($input, 'keyup')
    .flatMap(url => Http.get(url))
    .subscribe(res => console.log(res))

使用 flatMap 就可以直接获取到新的 Observable 返回的数据,但是这里存在一个问题:如果用户有多次输入,由于网络原因可能会发生前一次响应时间比后一次长的情况,这时后一次的结果就会被覆盖。

flatMapLatest 可以解决这个问题:如果之前的 Observable 还没有未触发,而又收到新的 Observable,flatMapLatest 会取消之前的 Observable,只处理最新收到的 Observable,这样就可以保证处理请求的先后顺序。

使用 RxJS 一步步实现搜索示例

使用 RxJS 提供的 fromEvent 接口来监听输入框的 keyup 事件,触发 keyup 时将产生 Observable。

let $input = document.querySelector('input.text');
Rx.Observable.fromEvent($input, 'keyup')
             .subscribe(e => console.log(e));

这里我们并不仅仅想输出事件,而是需要拿到文本的输入值进行请求,最终渲染出搜索结果,这涉及到一个新的 Operators 操作:

pluck('target', 'value') 将输入的 event,输出成 event.target.value。

let $input = document.querySelector('input.text');
Rx.Observable.fromEvent($input, 'keyup')
    .pluck('target', 'value')
    .flatMapLatest(url => Http.get(url))
    .subscribe(d => render(d));

上面的代码可以实现简单的搜索呈现,但是仍然存在多余的无效请求问题。

debounce(TIMES) 表示经过 TIMES 毫秒后没有流入新值,那么才将值转入下一个环节,这个与前面使用 setTimeout 来实现函数节流的方式有一致效果。

flatMapLatest() 使用 flatMapLatest 替换 flatMap,将能取消上一个已无用的请求,只保留最后的请求结果流,这样就确保处理展示的是最后的搜索的结果。

最终实现如下,明显可以看出 RxJS 让代码变得十分简洁。

let $input = document.querySelector('input.text');
Rx.Observable.fromEvent($input, 'keyup')
    .debounce(750)
    .pluck('target', 'value')
    .flatMapLatest(url => Http.get(url))
    .filter(r => r.status === 1)
    .map(r => r.data)
    .subscribe(d => render(d));