JTangming / blog

My repository on GitHub.
Other
53 stars 0 forks source link

理解 RxJS #23

Open JTangming opened 5 years ago

JTangming commented 5 years ago

基础概念了解

在理解 RxJS 之前,先了解一下 Observable 的相关核心概念:

来一个例子加深一下对上面概念的理解:

// declare a publishing operation
const observable = new Observable(observer => {
  setTimeout(() => {
    observe.next('test demo');
  });
});
// initiate execution
observe.subscribe({
  next: x => console.log(x),
  error: err => console.error(err),
  complete: () => console.log('done')
});

简单实现一个 Observable

根据前面的了解,我们可以大致实现一个 Observable 类。 初始化实例的时候传入 _observeFun,当调用类实例方法 subscribe 时执行传入的方法 _subscribeFun

class Observable {
  _observeFun: Function;

  constructor(observeFun) {
    this._observeFun = observeFun;
  }

  subscribe(observe: any) {
    this._observeFun(observe);
  }
}

这就是一个简易版的观察者模式的实现。

RxJS

RxJS(Reactive Extensions for JavaScript) 是基于 ReactiveX 在 JavaScript 层面上的实现,关于 ReactiveX(An API for asynchronous programming with observable streams) 请参考reactivex.io

RxJS 是一个响应式编程库,它让组合异步代码和基于回调的代码变得更简单,整个库的基础就是 Observable,注意和观察者对象 Observer 区别开。对异步数据如 Ajax、User Events、Animation、Sockets、Workers 提供了一种 Observable 类型的发布订阅实现,输出给开发者使用,参考RxJS Docs

举个例子,RxJS 提供了一个 fromEvent 用于监听相关 DOM 事件,通过 Observable 实现的这个 fromEvent 简单的代码示例如下:

function fromEvent(target, eventName) {
  return new Observable((observer) => {
    const handler = (e) => observer.next(e);

    // Add the event handler to the target
    target.addEventListener(eventName, handler);

    return () => {
      // Detach the event handler from the target
      target.removeEventListener(eventName, handler);
    };
  });
}

下面是对 fromEvent 的一个应用:

import { fromEvent } from 'rxjs';

const el = document.getElementById('my-element');
const mouseMoves = fromEvent(el, 'mousemove');

// Subscribe to start listening for mouse-move events
const subscription = mouseMoves.subscribe((evt: MouseEvent) => {
  console.log(`Coords: ${evt.clientX} X ${evt.clientY}`);
});

即当 Observeable 的实例 subscribe() 方法被调用后,实现了一个鼠标的事件监听,每当鼠标移动,通过发布者函数的 observer.next(e) 方法将数据流发布给订阅者。类似于:

el.addEventListener('mousemove', evt => {
   console.log(`Coords: ${evt.clientX} X ${evt.clientY}`);
}, false);

观察者模式在 Web 中最常见的应该是 DOM 事件的监听和触发。对于上面的例子,发布订阅相关信息如下:

fromEvent 和监听 DOM 事件的类比图效果如下: rxjs-1

RxJS 还有一个重要的概念:Operators,是对可操作的数据流进行一些中间处理的 API,在 subscribe 中的 observer 最终接收到的数据往往是经过 Operator 处理完后的数据。这些 API 是一些工具型函数,把现有的异步代码转换成可观察对象,对 observer stream 进行迭代处理,其中包括对不同类型的值进行例如 Mapping、过滤、组合等,把当前的转成(transformTo)另一个(更多时候配合 pipe 使用。

import { map } from 'rxjs/operators';

const nums = of(1, 3, 5, 7);

const squareValues = map((val: number) => val * 10);
const squaredNums = squareValues(nums);

squaredNums.subscribe(x => console.log(x));

可以将上面代码形象表示成

--1---3--5-----7------
   map(i => i * 10)
--10--30-50----70-----

以上是对 RxJS 的基础了解,RxJS 的核心大致理解成如下图: RXJS-2

下面我们更进一步。

RxJS 相对于 Promise

RxJS 的核心概念是 Observable(可观察对象),通过以上基础概念理解,我们对比一下 Promise:

除了可以通过 Observer 的 error 回调来处理外,RxJS 还提供了 catchError 操作符,它允许你在 pipe 中处理已知错误。更重要的是 catchError 提供了 retry 操作符让你可以尝试失败的请求。对于错误处理,由于可观察对象会异步生成值,所以用 try/catch 是无法捕获错误的。

举个例子:

import { ajax } from 'rxjs/ajax';
import { map, retry, catchError } from 'rxjs/operators';

const apiData = ajax('/api/data').pipe(
  retry(3), // Retry up to 3 times before failing
  map(res => {
    if (!res.response) {
      throw new Error('Value expected!');
    }
    return res.response;
  }),
  catchError(err => of([]))
);

apiData.subscribe({
  next(x) { console.log('data: ', x); },
  error(err) { console.log('errors already caught... will not run'); }
});

Promise 的错误处理方式可以如下:

somethingAync.then(function() {
    return somethingElseAsync();
}, function(err) {
    handleMyError(err);
});

这种方式的缺点就是,没法捕获 somethingElseAsync() 里边的错误,可以使用 Promise 的原型方法 catch。

用法实战

实现一个输入框搜索的功能,大致需要做这么些事情:

RxJS 一个简单的实现代码:

import { fromEvent } from 'rxjs';
import { ajax } from 'rxjs/ajax';
import { map, filter, debounceTime, distinctUntilChanged, switchMap } from 'rxjs/operators';

const searchBox = document.getElementById('search-box');

const typeahead = fromEvent(searchBox, 'input').pipe(
  map((e: KeyboardEvent) => e.target.value),
  filter(text => text.length > 2),
  debounceTime(300),
  distinctUntilChanged(),
  switchMap(() => ajax('/api/endpoint'))
);

typeahead.subscribe(data => {
 // Handle the data from the API
});

Angular 原生集成了 RxJS,对它的应用可以说比较丰富了,如:

异步复杂度要到什么程度才需要用到Rxjs?

对于群聊,试想一下如果每收到一个 notification 都立刻渲染的话,会有什么问题。通常的做法是批量渲染,即收集一段时间的消息,然后把它们一起渲染出来,例如每一秒批量渲染一次。

用原生 JS 写的话,需要维护一个消息队列池、一个定时器,收到消息,先放进队列池,然后定时器负责把消息渲染出来。

使用 RxJS 的话,很简单:

Rx.Observable
    .fromEvent(ws, 'message')
    .bufferTime(1000)
    .subscribe(messages => render(messages))

其中最为关键的是 bufferTime 这个操作符,可以用下图来概括以上逻辑: bufferTime

RxJS 具备的 100 多个 Operators 提供了强大的业务支撑能力,使用恰当可是事半功倍。到底什么样复杂的异步操作才不算杀鸡焉用牛刀呢?

RxJS 的设计核心在于响应式(reactive)编程,如果你把一个场景下所有 component/service 的 input 和 ouput 都理解为 stream 之间的 publish 和 subscribe,那自然而然就会选择 RxJS。RxJS 的响应式编程另外一大好处在于扩展性好,复杂场景下添加一个具有更新数据能力的组件带来的代码改动可以做到最小,因为你只需要关注 stream 之间的 dependency。

最后不得不再提的是 Operators,Operator 仅仅是提供一种业务抽象能力,但我觉得模式和思路收益更大,抛去上层扩展,Rx 的内核也就 Observable 和 Subject。

还有两个没提的概念,即 subject 和 scheduler。可以参考Rx中的subject和scheduler怎么理解概念?

JTangming commented 4 years ago

观察者模式 vs 发布订阅模式

观察者模式

所谓观察者模式,其实就是为了实现松耦合(loosely coupled)

举个例子,当数据有更新,如 changed() 方法被调用,用来更新 state 数据,比如温度、气压等等。

这样写的问题是,如果想更新更多的信息,比如说湿度,那就要去修改 changed() 方法的代码,这就是紧耦合的坏处。

结合上面 RxJS 提到的 Observable 和 Subject,我们仅仅维护一个可观察者对象即可,即一个 Observable 实例,当有数据变更时,它只需维护一套观察者(Observer)的集合,这些 Observer 实现相同的接口,Subject 只需要知道,通知 Observer 时,需要调用哪个统一方法就好了。如下图: observable

发布订阅模式

在发布订阅模式里,发布者,并不会直接通知订阅者,换句话说,发布者和订阅者,彼此互不相识。

那他们之间如何交流?

答案是,通过第三者触发,也就是在消息队列里面,发布者和订阅者通过事件名来联系,匹配上后直接执行对应订阅者的方法即可。

如图: pub-sub

与观察者模式不一样,发布订阅模式里,发布者和订阅者,不是松耦合,而是完全解耦的。

最后,附一张图总结一下: summary

观察者模式里,只有两个角色 —— 观察者 + 被观察者 而发布订阅模式里,却不仅仅只有发布者和订阅者两个角色,还有一个管理并执行消息队列的“经纪人Broker”

再者

观察者和被观察者,是松耦合的关系 发布者和订阅者,则完全不存在耦合

Reference

JTangming commented 2 years ago

手写一个观察者模式

// 目标者类
class Subject {
  constructor() {
    this.observers = [];  // 观察者列表
  }
  // 添加
  add(observer) {
    this.observers.push(observer);
  }
  // 删除
  remove(observer) {
    let idx = this.observers.findIndex(item => item === observer);
    idx > -1 && this.observers.splice(idx, 1);
  }
  // 通知
  notify() {
    for (let observer of this.observers) {
      observer.update();
    }
  }
}

// 观察者类
class Observer {
  constructor(name) {
    this.name = name;
  }
  // 目标对象更新时触发的回调
  update() {
    console.log(`目标者通知我更新了,我是:${this.name}`);
  }
}

// 实例化目标者
let subject = new Subject();

// 实例化两个观察者
let obs1 = new Observer('前端开发者');
let obs2 = new Observer('后端开发者');

// 向目标者添加观察者
subject.add(obs1);
subject.add(obs2);

// 目标者通知更新
subject.notify();  
// 输出:
// 目标者通知我更新了,我是前端开发者
// 目标者通知我更新了,我是后端开发者