Christian-Yang / Translate-and-save

Translate and save for my self
1 stars 0 forks source link

Subject入门笔记 #4

Open Christian-Yang opened 7 years ago

Christian-Yang commented 7 years ago

RxJS 系列文章转载,原文:https://juejin.im/post/58e1b939da2f60005fd3122b

RxJS 系列之一 - Functional Programming 简介

什么是函数式编程

简单说,"函数式编程"是一种 "编程范式"(programming paradigm),也就是如何编写程序的方法论。

它属于 "结构化编程" 的一种,主要思想是把运算过程尽量写成一系列嵌套的函数调用(函数式编程的思想)。举例来说,现在有这样一个数学表达式:

(5+6) - 1 * 3

传统的过程式编程,可能这样写:

var a = 5 + 6;
var b = 1 * 3;
var c = a - b;

函数式编程要求使用函数,我们可以把运算定义成不同的函数:

const add = (a, b) => a + b;
const mul = (a, b) => a * b;
const sub = (a,b) => a - b;

sub(add(5,6), mul(1,3));

我们把每个运算包成一个个不同的函数,并且根据这些函数组合出我们要的结果,这就是最简单的函数式编程。

函数式编程基础条件

函数为一等公民 (First Class)

所谓 "一等公民"(first class),指的是函数与其他数据类型一样,处于平等地位,可以赋值给其他变量,也可以作为参数,传入另一个函数,或者作为其它函数的返回值。

函数赋值给变量:

const greet = function(msg) { console.log(`Hello ${msg}`); }
greet('Semlinker'); // Output: 'Hello Semlinker'

函数作为参数:

const logger = function(msg) { console.log(`Hello ${msg}`); };
const greet = function(msg, print) { print(msg); };
greet('Semlinker', logger);

函数作为返回值:

const a = function(a) {
  return function(b) {
    return a + b;
  };
};
const add5 = a(5);
add5(10); // Output: 15

函数式编程重要特性

只用表达式,不用语句(函数式编程重要特性)

"表达式"(expression)是一个单纯的运算过程,总是有返回值;"语句"(statement)是执行某种操作,没有返回值。函数式编程要求,只使用表达式,不使用语句。也就是说,每一步都是单纯的运算,而且都有返回值

原因是函数式编程的开发动机,一开始就是为了处理运算(computation),不考虑系统的读写(I/O)。"语句"属于对系统的读写操作,所以就被排斥在外。

Pure Function

Pure Function (纯函数) 的特点:

  1. 给定相同的输入参数,总是返回相同的结果

  2. 没有产生任何副作用

  3. 没有依赖外部变量的值

所谓 "副作用"(side effect),是指函数内做了与本身运算无关的事,比如修改某个全局变量的值,或发送 HTTP 请求,甚至函数体内执行 console.log 都算是副作用。函数式编程强调函数不能有副作用,也就是函数要保持纯粹,只执行相关运算并返回值,没有其他额外的行为。

前端中常见的产生副作用的场景:

  1. 发送 HTTP 请求
  2. 函数内调用 logger 函数,如 console.log、console.dir 等
  3. 修改外部变量的值
  4. 函数内执行 DOM 操作

接下来我们看一下纯函数与非纯函数的具体示例:

纯函数示例:

const double = (number) => number * 2;
double(5);

非纯函数示例:

Math.random(); // => 0.3384159509502669
Math.random(); // => 0.9498302571942787
Math.random(); // => 0.9860841663478281

不修改状态 - 利用参数保存状态

函数式编程只是返回新的值,不修改系统变量。因此,不修改变量,也是它的一个重要特点。

在其他类型的语言中,变量往往用来保存"状态"(state)。不修改变量,意味着状态不能保存在变量中。函数式编程使用参数保存状态,最好的例子就是递归,具体示例如下:

function findIndex(arr, predicate, start = 0) {
    if (0 <= start && start < arr.length) {
        if (predicate(arr[start])) {//predicate断言
            return start;
        }
        return findIndex(arr, predicate, start+1);
    }
}
findIndex(['a', 'b'], x => x === 'b'); // 查找数组中'b'的索引值

示例中的 findIndex 函数用于查找数组中某个元素的索引值,我们通过 start 参数来保存当前的索引值,这就是利用参数保存状态。

引用透明

引用透明(Referential transparency),指的是函数的运行不依赖于外部变量或 "状态",只依赖于输入的参数,任何时候只要参数相同,引用函数所得到的返回值总是相同的。

非引用透明的示例:

const FIVE = 5; const addFive = (num) => num + FIVE; addFive(10);

函数式编程的优势

1.代码简洁,开发快速

函数式编程大量使用函数,减少了代码的重复,因此程序比较短,开发速度较快。

2.接近自然语言,易于理解,可读性高

函数式编程的自由度很高,可以写出很接近自然语言的代码。我们可以通过一系列的函数,封装数据的处理过程,代码会变得非常简洁且可读性高,具体参考以下示例:

[1,2,3,4,5].map(x => x * 2).filter(x => x > 5).reduce((p,n) => p + n);

3.可维护性高、方便代码管理

函数式编程不依赖、也不会改变外界的状态,只要给定输入参数,返回的结果必定相同。因此,每一个函数都可以被看做独立单元,很有利于进行单元测试(unit testing)和除错(debugging),以及模块化组合。

4.易于"并发编程"

函数式编程不需要考虑"死锁"(deadlock),因为它不修改变量,所以根本不存在"锁"线程的问题。不必担心一个线程的数据,被另一个线程修改,所以可以很放心地把工作分摊到多个线程,部署"并发编程"(concurrency)。

函数式编程中常用方法

forEach

在 ES 5 版本之前,我们只能通过 for 循环遍历数组:

var heroes = ['Windstorm', 'Bombasto', 'Magneta', 'Tornado'];
for (var i =0, len = heroes.length; i < len; i++) {
  console.log(heroes[i]);
}

参数说明:

  1. callback - 对数组中每一项,进行处理的函数
    1.currentValue - 数组中正在处理的当前元素
    2. index - 数组中正在处理的当前元素的索引
    3. array - 处理的数组      
  2. thisArg (可选的) - 设置执行 callback 函数时,this 的值

以上示例 forEach 方法实现:

var heroes = ['Windstorm', 'Bombasto', 'Magneta', 'Tornado']; heroes.forEach(name => console.log(name));

map

在 ES 5 版本之前,对于上面的示例,如果我们想给每个英雄的名字添加一个前缀,但不改变原来的数组,我们可以这样实现:

var heroes = ['Windstorm', 'Bombasto', 'Magneta', 'Tornado'];
var prefixedHeroes = [];
for (var i =0, len = heroes.length; i < len; i++) {
  prefixedHeroes.push('Super_' + heroes[i]);
}

在 ES 5 版本之后,我们可以使用 map 方法,方便地实现上面的功能。

map 方法签名:

const new_array = arr.map(callback[, thisArg])

参数说明: 1.callback - 对数组中每一项,进行映射处理的函数

currentValue - 数组中正在处理的当前元素
index - 数组中正在处理的当前元素的索引
array - 处理的数组

2.thisArg (可选的) - 设置执行 callback 函数时,this 的值 以上示例 map 方法实现:

var heroes = ['Windstorm', 'Bombasto', 'Magneta', 'Tornado'];
var prefixedHeroes = heroes.map(name => 'Super_' + name);

filter

在 ES 5 版本之前,对于 heroes 数组,我们想获取名字中包含 m 字母的英雄,我们可以这样实现:

var heroes = ['Windstorm', 'Bombasto', 'Magneta', 'Tornado'];
var filterHeroes = [];
for (var i =0, len = heroes.length; i < len; i++) {
  if(/m/i.test(heroes[i])) {
    filterHeroes.push(heroes[i]);
  }
}

在 ES 5 版本之后,我们可以使用 filter 方法,方便地实现上面的功能。

filter 方法签名:

var new_array = arr.filter(callback[, thisArg])

参数说明:

1.callback - 用来测试数组的每个元素的函数。调用时使用参数 (element, index, array)。返回true表示保留该元素(通过测试),false则不保留。 2.thisArg (可选的) - 设置执行 callback 函数时,this 的值 以上示例 filter 方法实现:

var heroes = ['Windstorm', 'Bombasto', 'Magneta', 'Tornado'];
var filterRe = /m/i;
var filterHeroes = heroes.filter(name => filterRe.test(name));
Christian-Yang commented 7 years ago

RxJS 系列之二 - Observable 详解

Observable

在介绍 Observable 之前,我们要先了解两个设计模式:

Observer Pattern - (观察者模式) Iterator Pattern - (迭代器模式) 这两个模式是 Observable 的基础,下面我们先来介绍一下 Observer Pattern。

Observer Pattern

观察者模式定义

观察者模式是软件设计模式的一种。在此种模式中,一个目标对象管理所有相依于它的观察者对象,
并且在它本身的状态改变时主动发出通知。
这通常透过呼叫各观察者所提供的方法来实现。此种模式通常被用来实时事件处理系统。 — 维基百科

观察者模式又叫发布订阅模式(Publish/Subscribe),它定义了一种一对多的关系,让多个观察者对象同时监听某一个主题对象,这个主题对象的状态发生变化时就会通知所有的观察者对象,使得它们能够自动更新自己。

我们可以使用日常生活中,期刊订阅的例子来形象地解释一下上面的概念。期刊订阅包含两个主要的角色:

期刊出版方和订阅者,他们之间的关系如下:

  1. 期刊出版方 - 负责期刊的出版和发行工作
  2. 订阅者 - 只需执行订阅操作,新版的期刊发布后,就会主动收到通知,如果取消订阅,以后就不会再收到通知

在观察者模式中也有两个主要角色:Subject (主题) 和 Observer (观察者) 。它们分别对应例子中的期刊出版方和订阅者。接下来我们来看张图,从而加深对上面概念的理解。 zx1

观察者模式优缺点

观察者模式的优点:

  1. 支持简单的广播通信,自动通知所有已经订阅过的对象
  2. 目标对象与观察者之间的抽象耦合关系能够单独扩展以及重用

观察者模式的缺点:

  1. 如果一个被观察者对象有很多的直接和间接的观察者的话,将所有的观察者都通知到会花费很多时间
  2. 如果在观察者和观察目标之间有循环依赖的话,观察目标会触发它们之间进行循环调用,可能导致系统崩溃

观察者模式的应用

在前端领域,观察者模式被广泛地使用。最常见的例子就是为 DOM 对象添加事件监听,具体示例如下:

<button id="btn">确认</button>

function clickHandler(event) {
    console.log('用户已点击确认按钮!');
}
document.getElementById("btn").addEventListener('click', clickHandler);

上面代码中,我们通过addEventListener API 监听 button 对象上的点击事件,当用户点击按钮时,会自动执行我们的 clickHandler函数。

观察者模式实战

Subject 类定义:

class Subject {

    constructor() {
        this.observerCollection = []; // 观察者集合
    }

    registerObserver(observer) { // 注册观察者
        this.observerCollection.push(observer);
    }

    unregisterObserver(observer) { // 移除观察者
        let index = this.observerCollection.indexOf(observer);
        if(index >= 0) this.observerCollection.splice(index, 1);
    }

    notifyObservers() { // 通知所有观察者
        this.observerCollection.forEach((observer)=>observer.notify());
    }
}

Observer 类定义:

class Observer {

    constructor(name) {
        this.name = name;
    }

    notify() { 
        console.log(`${this.name} has been notified.`);
    }
}

使用示例:

let subject = new Subject(); // 创建主题对象

let observer1 = new Observer('semlinker'); // 创建观察者A - 'semlinker'
let observer2 = new Observer('lolo'); // 创建观察者B - 'lolo'

subject.registerObserver(observer1); // 注册观察者A
subject.registerObserver(observer2); // 注册观察者B

subject.notifyObservers(); // 通知观察者

subject.unregisterObserver(observer1); // 移除观察者A

subject.notifyObservers(); // 验证是否成功移除

以上代码成功运行后控制台的输出结果:

semlinker has been notified. # 输出一次
2(unknown) lolo has been notified. # 输出两次

需要注意的是,在观察者模式中,通常情况下调用注册观察者后,会返回一个函数,用于移除监听解除注册,有兴趣的读者,可以自己尝试一下。(备注:在 Angular 1.x 中调用 $scope.$on() 方法后,就会返回一个函数,用于移除监听)

Iterator Pattern

迭代器模式定义

迭代器(Iterator)模式,又叫做游标(Cursor)模式。它提供一种方法顺序访问一个聚合对象中的各个元素,而又不需要暴露该对象的内部表示。迭代器模式可以把迭代的过程从业务逻辑中分离出来,在使用迭代器模式之后,即使不关心对象的内部构造,也可以按顺序访问其中的每个元素。

迭代器模式的优缺点

迭代器模式的优点:

  1. 简化了遍历方式,对于对象集合的遍历,还是比较麻烦的,对于数组或者有序列表,我们尚可以通过游标取得,但用户需要在对集合了解的前提下,自行遍历对象,但是对于 hash 表来说,用户遍历起来就比较麻烦。而引入迭代器方法后,用户用起来就简单的多了。
  2. 封装性良好,用户只需要得到迭代器就可以遍历,而不用去关心遍历算法。

迭代器模式的缺点:

  1. 遍历过程是一个单向且不可逆的遍历

ECMAScript 迭代器

在 ECMAScript 中 Iterator 最早其实是要采用类似 Python 的 Iterator 规范,
就是 Iterator 在没有元素之后,执行 next会直接抛出错误;
但后来经过一段时间讨论后,决定采更 functional 的做法,
改成在取得最后一个元素之后执行 next 永远都回传 { done: true, value: undefined }

一个迭代器对象 ,知道如何每次访问集合中的一项, 并记录它的当前在序列中所在的位置。在 JavaScript 中迭代器是一个对象,它提供了一个 next() 方法,返回序列中的下一项。这个方法返回包含 done 和 value 两个属性的对象。对象的取值如下:

  1. 在最后一个元素前:{ done: false, value: elementValue }
  2. 在最后一个元素后:{ done: true, value: undefined }

详细信息可以参考 - 可迭代协议和迭代器协议

ES 5 迭代器

接下来我们来创建一个 makeIterator 函数,该函数的参数类型是数组,当调用该函数后,返回一个包含 next() 方法的 Iterator 对象, 其中 next() 方法是用来获取容器对象中下一个元素。具体示例如下:

function makeIterator(array){
    var nextIndex = 0;

    return {
       next: function(){
           return nextIndex < array.length ?
               {value: array[nextIndex++], done: false} :
               {done: true};
       }
    }
}

一旦初始化, next() 方法可以用来依次访问可迭代对象中的元素:

var it = makeIterator(['yo', 'ya']);
console.log(it.next().value); // 'yo'
console.log(it.next().value); // 'ya'
console.log(it.next().done);  // true

ES 6 迭代器

在 ES 6 中我们可以通过 Symbol.iterator 来创建可迭代对象的内部迭代器,具体示例如下:

let arr = ['a', 'b', 'c'];
let iter = arr[Symbol.iterator]();

调用 next() 方法来获取数组中的元素:

> iter.next()
{ value: 'a', done: false }
> iter.next()
{ value: 'b', done: false }
> iter.next()
{ value: 'c', done: false }
> iter.next()
{ value: undefined, done: true }

ES 6 中可迭代的对象:

Arrays
Strings
Maps
Sets
DOM data structures (work in progress)

Observable

RxJS 是基于观察者模式和迭代器模式以函数式编程思维来实现的。RxJS 中含有两个基本概念:Observables 与 Observer。Observables 作为被观察者,是一个值或事件的流集合;而 Observer 则作为观察者,根据 Observables 进行处理。

Observables 与 Observer 之间的订阅发布关系(观察者模式) 如下:

订阅:Observer 通过 Observable 提供的 subscribe() 方法订阅 Observable。 发布:Observable 通过回调 next 方法向 Observer 发布事件。

Proposal-Observable

1.Proposal-Observable 2.Proposal-Observable Implementations

RxJS 5
zen-observable

3.Observables for ECMAScript

自定义 Observable

如果你想真正了解 Observable,最好的方式就是自己写一个。其实 Observable 就是一个函数,它接受一个 Observer 作为参数然后返回另一个函数。

它的基本特征:

  1. 是一个函数
  2. 接受一个 Observer 对象 (包含 next、error、complete 方法的对象) 作为参数
  3. 返回一个 unsubscribe 函数,用于取消订阅

它的作用:

作为生产者与观察者之间的桥梁,并返回一种方法来解除生产者与观察者之间的联系,其中观察者用于处理时间序列上数据流。接下来我们来看一下 Observable 的基础实现:

DataSource - 数据源

class DataSource {
  constructor() {
    let i = 0;
    this._id = setInterval(() => this.emit(i++), 200); // 创建定时器
  }

  emit(n) {
    const limit = 10;  // 设置数据上限值
    if (this.ondata) {
      this.ondata(n);
    }
    if (n === limit) {
      if (this.oncomplete) {
        this.oncomplete();
      }
      this.destroy();
    }
  }

  destroy() { // 清除定时器
    clearInterval(this._id);
  }
}

myObservable

function myObservable(observer) {
    let datasource = new DataSource(); // 创建数据源
    datasource.ondata = (e) => observer.next(e); // 处理数据流
    datasource.onerror = (err) => observer.error(err); // 处理异常
    datasource.oncomplete = () => observer.complete(); // 处理数据流终止
    return () => { // 返回一个函数用于,销毁数据源
        datasource.destroy();
    };
}

【】【】【】【】【】【】未完成【】【】【】【】【】【】

Christian-Yang commented 7 years ago

RxJS 系列之三 - Operators 详解

Marble diagrams

我们把描绘 Observable 的图称为 Marble diagrams,我们用 - 来表示一小段时间,这些 - 串起来就表示一个 Observable 对象。

----------------

X 表示有错误发生

---------------X

| 表示 Observable 结束

----------------|

在时间序列中,我们可能会持续发出值,如果值是数字则直接用阿拉伯数字表示,其它数据类型使用相近的英文符号表示,接下来我们看一下 interval 操作符对应的 marble 图:

var source = Rx.Observable.interval(1000);

source 对应的 marble 图:

-----0-----1-----2-----3--...

当 observable 同步发送值时,如使用 of 操作符创建如下 Observable 对象:

var source = Rx.Observable.of(1,2,3,4);

source 对应的 marble 图:

(1234)|

小括号表示同步发生。 另外 marble 图也能够表示 operator 的前后转换关系,例如:

var source = Rx.Observable.interval(1000);
var newest = source.map(x => x + 1);

对应的 marble 图如下:

source: -----0-----1-----2-----3--...
            map(x => x + 1)
newest: -----1-----2-----3-----4--...

通过 marble 图,可以帮助我们更好地理解 operator。

详细的信息可以参考 - RxMarbles

Creation Operators

repeat

repeat 操作符签名:

public repeat(count: number): Observable

repeat 操作符作用:

重复 count 次,源 Observable 发出的值。

repeat 操作符示例:

var source = Rx.Observable.from(['a','b','c'])
               .zip(Rx.Observable.interval(500), (x,y) => x);

var example = source.repeat(2);

example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});

.zip(Rx.Observable.interval(500), (x,y) => x); 这段代码的意思是:因为前面是from(['a','b','c'])所以产生的就是一个发出a,b,c的流。而后面的 Rx.Observable.interval(500) 表示的是把这个流中的数据都间隔500毫秒 (x,y) => x 表示的意思是(value,key) => (value)输出value .zip(Rx.Observable.interval(500), (x,y) => x);所以这句话的总的意思是,把原来的流数据之间的间隔拉大,然后输出key和value中的value

示例 marble 图:

source : ----a----b----c|
            repeat(2)
example: ----a----b----c----a----b----c|

以上代码运行后,控制台的输出结果:

a
b
c
a
b
c
complete

Transformation Operators

map

map 操作符签名:

public map(project: function(value: T, index: number): R, thisArg: any): Observable<R>

map 操作符作用:

对 Observable 对象发出的每个值,使用指定的 project 函数,进行映射处理。

map 操作符示例:

var source = Rx.Observable.interval(1000);
var newest = source.map(x => x + 2); 

newest.subscribe(console.log);

示例 marble 图:

source: -----0-----1-----2-----3--...
            map(x => x + 2)
newest: -----2-----3-----4-----5--...

以上代码运行后,控制台的输出结果:

2
3
4

mapTo

mapTo 操作符签名:

public mapTo(value: any): Observable

mapTo 操作符作用: 对 Observable 对象发出的每个值,映射成固定的值。

mapTo 操作符示例:

var source = Rx.Observable.interval(1000);
var newest = source.mapTo(2); 
newest.subscribe(console.log);

示例 marble 图:

source: -----0-----1-----2-----3--...
                mapTo(2)
newest: -----2-----2-----2-----2--...

以上代码运行后,控制台的输出结果:

2
2
2

scan

scan 操作符签名:

public scan(accumulator: function(acc: R, value: T, index: number): R,
    seed: T | R): Observable<R>

scan 操作符作用:

对 Observable 发出值,执行 accumulator 指定的运算,可以简单地认为是 Observable 版本的 Array.prototype.reduce 。

scan 操作符示例:

var source = Rx.Observable.from('hello')
             .zip(Rx.Observable.interval(600), (x, y) => x);

var example = source.scan((origin, next) => origin + next, '');

example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});

示例 marble 图:

source : ----h----e----l----l----o|
    scan((origin, next) => origin + next, '')
example: ----h----(he)----(hel)----(hell)----(hello)|

以上代码运行后,控制台的输出结果:

h
he
hel
hell
hello
complete

(备注:scan 与 reduce 最大的差别就是 scan 最终返回的一定是一个 Observable 对象,而 reduce 的返回类型不是固定的)

buffer

buffer 操作符签名:

public buffer(closingNotifier: Observable<any>): Observable<T[]>

buffer 操作符作用:

缓冲源 Observable 对象已发出的值,直到 closingNotifier 触发后,才统一输出缓存的元素。

buffer 操作符示例:

var source = Rx.Observable.interval(300);
var source2 = Rx.Observable.interval(1000);
var example = source.buffer(source2);

example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});

示例 marble 图:

source : --0--1--2--3--4--5--6--7..
source2: ---------0---------1--------...
            buffer(source2)
example: ---------([0,1,2])---------([3,4,5])

以上代码运行后,控制台的输出结果: 【注意看上面的marble图,可以看到,一个小横线,就是100毫秒】 上面的,source间隔是300毫秒,所以每隔300毫秒就发出一个值,但是先不要发出这个值,而是把他们缓存到一起,直到source2中触发了值这个时候才将缓存的所有值全部都发出。所以这样一下就是发出了 ([0,1,2])等等

[0,1,2]
[3,4,5]
[6,7,8]
....

bufferTime

bufferTime 操作符签名:

public bufferTime(bufferTimeSpan: number, bufferCreationInterval: number, 
       maxBufferSize: number, scheduler: Scheduler): Observable<T[]>

bufferTime 操作符作用:

设定源 Observable 对象已发出的值的缓冲时间。

bufferTime 操作符示例:

var source = Rx.Observable.interval(300);
var example = source.bufferTime(1000);

example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});

示例 marble 图:

source : --0--1--2--3--4--5--6--7..
            bufferTime(1000)
example: ---------([0,1,2])---------([3,4,5])

以上代码运行后,控制台的输出结果:

[0,1,2]
[3,4,5]
[6,7,8]
....

bufferCount

bufferCount 操作符签名:

public bufferCount(bufferSize: number, startBufferEvery: number):     
        Observable<T[]>

bufferCount 操作符作用:

缓冲源 Observable对象已发出的值,直到大小达到给定的最大 bufferSize 。

bufferCount 操作符示例:

var source = Rx.Observable.interval(300);
var example = source.bufferCount(3);

example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});

示例 marble 图:

source : --0--1--2--3--4--5--6--7..
            bufferCount(3)
example: ---------([0,1,2])---------([3,4,5])

以上代码运行后,控制台的输出结果:

[0,1,2]
[3,4,5]
[6,7,8]
....

concatMap

concatMap 操作符签名:

public concatMap(project: function(value: T, ?index: number): ObservableInput, 
    resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, 
    innerIndex: number): any): Observable

concatMap 操作符作用:

对每个 Observable 对象发出的值,进行映射处理,并进行合并。该操作符也会先处理前一个 Observable 对象,在处理下一个 Observable 对象。

concatMap 操作符示例:

var source = Rx.Observable.fromEvent(document.body, 'click');

var example = source.concatMap(
                e => Rx.Observable.interval(100).take(3));

example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});

示例 marble 图:

source : -----------c--c------------------...
        concatMap(c => Rx.Observable.interval(100).take(3))
example: -------------0-1-2-0-1-2---------...

【上面代码的意思,最开始的时候一个“c”表示的意思是,click发生了,然后使用Rx.Observable.interval(100).take(3) 这个函数的作用是首先只要一个click发生了,那么就执行interval(100)这个就是把一个click转换成为0,1,2,3,4,5.......然后使用take(3)这个东西的意思就是,只要其中前面的三个元素】 以上代码运行后,控制台的输出结果:

0
1
2
0
1
2

concatMap 其实就是 map 加上 concatAll 的简化写法。

switchMap

switchMap 操作符签名:

public switchMap(project: function(value: T, ?index: number): ObservableInput, 
  resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, 
  innerIndex: number): any): Observable

switchMap 操作符作用:

对源 Observable 对象发出的值,做映射处理。若有新的 Observable 对象出现,会在新的 Observable 对象发出新值后,退订前一个未处理完的 Observable 对象。

switchMap 操作符示例:

var source = Rx.Observable.fromEvent(document.body, 'click');
var example = source.switchMap(
                    e => Rx.Observable.interval(100).take(3));

example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});

示例 marble 图:

source : -----------c--c-----------------...
        concatMap(c => Rx.Observable.interval(100).take(3))
example: -------------0--0-1-2-----------...

以上代码运行后,控制台的输出结果:

0
0
1
2

Filtering Operators

filter

filter 操作符签名:

public filter(predicate: function(value: T, index: number): boolean, 
    thisArg: any): Observable

filter 操作符作用:

对 Observable 对象发出的每个值,作为参数调用指定的 predicate 函数,若该函数的返回值为 true,则表示保留该项,若返回值为 false,则舍弃该值。

filter 操作符示例:

var source = Rx.Observable.interval(1000);
var newest = source.filter(x => x % 2 === 0); 

newest.subscribe(console.log);

示例 marble 图:

source: -----0-----1-----2-----3-----4-...
            filter(x => x % 2 === 0)
newest: -----0-----------2-----------4-...

以上代码运行后,控制台的输出结果:

0 
2
4
...

take

take 操作符签名:

public take(count: number): Observable<T>

take 操作符作用:

用于获取 Observable 对象发出的前 n 项值,取完后就结束。

take 操作符示例:

var source = Rx.Observable.interval(1000);
var example = source.take(3);

example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});

示例 marble 图:

source : -----0-----1-----2-----3--..
                take(3)
example: -----0-----1-----2|

以上代码运行后,控制台的输出结果:

0
1
2
complete

first

first 操作符签名:

public first(predicate: function(value: T, index: number, source: Observable<T>): boolean,      resultSelector: function(value: T, index: number): R, 
  defaultValue: R): Observable<T | R>

first 操作符作用:

用于获取 Observable 对象发出的第一个元素,取完后就结束。

first 操作符示例:

var source = Rx.Observable.interval(1000);
var example = source.first();

example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});

示例 marble 图:

source : -----0-----1-----2-----3--..
                first()
example: -----0|

以上代码运行后,控制台的输出结果:

0
complete

takeUntil

takeUntil 操作符签名:

public takeUntil(notifier: Observable): Observable<T>

takeUntil 操作符作用:

当 takeUntil 传入的 notifier 发出值时,源 Observable 对象就会直接进入完成状态。

takeUntil 操作符示例:

var source = Rx.Observable.interval(1000);
var click = Rx.Observable.fromEvent(document.body, 'click');
var example = source.takeUntil(click);  

example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});

示例 marble 图:

source : -----0-----1-----2------3--
click  : ----------------------c----
                takeUntil(click)
example: -----0-----1-----2----|

以上代码运行后,控制台的输出结果:

0
1
2
complete

skip

skip 操作符签名:

public skip(count: Number): Observable

skip 操作符作用:

跳过源 Observable 对象前 count 项,并返回新的 Observable 对象。

skip 操作符示例:

var source = Rx.Observable.interval(1000);
var example = source.skip(3);

example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});

示例 marble 图:

source : ----0----1----2----3----4----5--....
                    skip(3)
example: -------------------3----4----5--...

以上代码运行后,控制台的输出结果:

3
4
5
...

takeLast

takeLast 操作符签名:

public takeLast(count: number): Observable<T>

takeLast 操作符作用:

获取源 Observable 对象发出的,后面 count 项的值。

takeLast 操作符示例:

var source = Rx.Observable.interval(1000).take(6);
var example = source.takeLast(2);

example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});

示例 marble 图:

source : ----0----1----2----3----4----5|
                takeLast(2)
example: ------------------------------(45)|

以上代码运行后,控制台的输出结果:

4
5
complete

last

last 操作符签名:

public last(predicate: function): Observable last 操作符作用:

获取源 Observable 对象发出的最后一项的值。

last 操作符示例:

var source = Rx.Observable.interval(1000).take(6);
var example = source.last();

example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});

示例 marble 图:

source : ----0----1----2----3----4----5|
                    last()
example: ------------------------------(5)|

以上代码运行后,控制台的输出结果:

5
complete

debounceTime

debounceTime 操作符签名:

public debounceTime(dueTime: number, scheduler: Scheduler): Observable

debounceTime 操作符作用:

在设定的时间跨度内,若源 Observable 对象没有再发出新值,则返回最近一次发出的值。

debounceTime 操作符示例:

var source = Rx.Observable.interval(300).take(5);
var example = source.debounceTime(1000);

example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});

示例 marble 图:

source : --0--1--2--3--4|
        debounceTime(1000)
example: --------------4|

以上代码运行后,控制台的输出结果:

4 complete debounceTime 的工作方式是每次收到元素时,它会先把元素缓存住并等待给定的时间,如果等待时间内没有收到新的元素,则返回最新的缓存值。如果等待时间内,又收到新的元素,则会替换之前缓存的元素,并重新开始计时。

throttleTime

throttleTime 操作符签名:

public throttleTime(duration: number, scheduler: Scheduler): Observable<T> throttleTime 操作符作用:

从源 Observable 对象发出第一个值开始,忽略等待时间内发出的值,等待时间过后再发出新值。与 debounceTime 不同的是,throttleTime 一开始就会发出值,在等待时间内不会发出任何值,等待时间过后又会发出新的值。

throttleTime 示例:

var source = Rx.Observable.interval(300).take(5);
var example = source.throttleTime(1000);

example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});

示例 marble 图:

source : --0--1--2--3--4|
        throttleTime(1000)
example: --0------------4|

以上代码运行后,控制台的输出结果:

0
4
complete

throttle 比较像是控制行为的最高频率,也就是说如果我们设定 1000 ms,那么该事件最大频率就是每秒触发一次而不会过快。debounce 则比较像是必须等待的时间,要等一定的时间过了才会收到元素。

distinct

distinct 操作符签名:

public distinct(keySelector: function, flushes: Observable): Observable

distinct 操作符的作用:

过滤源 Observable 发出的值,确保不会发出重复出现的值。

distinct 操作符示例:

var source = Rx.Observable.from(['a', 'b', 'c', 'a', 'b'])
                .zip(Rx.Observable.interval(300), (x, y) => x);
var example = source.distinct()

example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});

示例 marble 图:

source : --a--b--c--a--b|
            distinct()
example: --a--b--c------|

以上代码运行后,控制台的输出结果:

a
b
c
complete

distinct 内部会创建一个 Set 集合,当接收到元素时,会判断 Set 集合中,是否已存在相同的值,如果已存在的话,就不会发出值。若不存在的话,会把值存入到 Set 集合中并发出该值。所以尽量不要直接把 distinct 操作符应用在无限的 Observable 对象中,这样会导致 Set 集合越来越大。针对这种场景,大家可以设置 distinct 的第二个参数 (清除已保存的数据),或使用 distinctUntilChanged。

distinctUntilChanged

distinctUntilChanged 操作符签名:

public distinctUntilChanged(compare: function): Observable

distinctUntilChanged 操作符作用:

过滤源 Observable 发出的值,若当前发出的值与前一次值不一致,则发出该值。

distinctUntilChanged 操作符示例:

var source = Rx.Observable.from(['a', 'b', 'c', 'c', 'b'])
               .zip(Rx.Observable.interval(300), (x, y) => x);
var example = source.distinctUntilChanged()

example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});

示例 marble 图:

source : --a--b--c--c--b|
            distinctUntilChanged()
example: --a--b--c-----b|

以上代码运行后,控制台的输出结果:

a
b
c
b
complete

distinctUntilChanged 跟 distinct 一样会把相同的元素过滤掉,但 distinctUntilChanged 只会跟最后一次送出的元素比较,不会每个比较。

Combination Operators

concat

concat 操作符签名:

public concat(other: ObservableInput, scheduler: Scheduler): Observable

concat 操作符作用:

把多个 Observable 对象合并为一个 Observable 对象,Observable 对象会依次执行,即需等前一个 Observable 对象完成后,才会继续订阅下一个。

concat 操作符示例:

var source = Rx.Observable.interval(1000).take(3);
var source2 = Rx.Observable.of(3)
var source3 = Rx.Observable.of(4,5,6)
var example = source.concat(source2, source3);

example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});

示例 marble 图:

source : ----0----1----2|
source2: (3)|
source3: (456)|
            concat()
example: ----0----1----2(3456)|

以上代码运行后,控制台的输出结果:

0 # source
1 # source
2 # source
3 # source2
4 # source3
5 # source3
6 # source3
complete 

concatAll

concatAll 操作符签名:

public concatAll(): Observable

concatAll 操作符作用:

合并多个 Observable 对象,并在上一个 Observable 对象完成后订阅下一个 Observable 对象。

concatAll 操作符示例:

var obs1 = Rx.Observable.interval(1000).take(5);
var obs2 = Rx.Observable.interval(500).take(2);
var obs3 = Rx.Observable.interval(2000).take(1);

var source = Rx.Observable.of(obs1, obs2, obs3);

var example = source.concatAll();

example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});

示例 marble 图:

source : (o1                 o2      o3)|
           \                  \       \
            --0--1--2--3--4|   -0-1|   ----0|

                concatAll()        

example: --0--1--2--3--4-0-1----0|

以上代码运行后,控制台的输出结果:

0 # o1
1 # o1
2 # o1
3 # o1
4 # o1
0 # o2
1 # o2
0 # o3
complete # o3

startWith

startWith 操作符签名:

public startWith(values: ...T, scheduler: Scheduler): Observable

startWith 操作符作用:

在开始发出源 Observable 数据之前发出已设置的参数值,并返回新的 Observable 对象。

startWith 操作符示例:

var source = Rx.Observable.interval(1000);
var example = source.startWith(0);

example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});

示例 marble 图:

source : ----0----1----2----3--...
                startWith(0)
example: (0)----0----1----2----3--...

以上代码运行后,控制台的输出结果:

0
0
1
2
...

(备注:startWith 的值一开始是同步发出的,该操作符常用于保存程序的初始状态)

merge

merge 操作符签名:

public merge(other: ObservableInput, concurrent: number, scheduler: Scheduler): Observable

merge 操作符作用:

合并 Observable 对象,并按给定的时序发出对应值。

merge 操作符示例:

var source = Rx.Observable.interval(500).take(3);
var source2 = Rx.Observable.interval(300).take(6);
var example = source.merge(source2);

example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});

示例 marble 图:

source : ----0----1----2|
source2: --0--1--2--3--4--5|
            merge()
example: --0-01--21-3--(24)--5|

以上代码运行后,控制台的输出结果:

0 # source2
0 # source
1 # source2
2 # source2
1 # source
3 # source2
2 # source
4 # source2
5 # source2
complete

(备注:注意与 concat 操作符的区别,concat 会在前一个 Observable 对象执行完后,再订阅下一个 Observable 对象)

mergeAll

mergeAll 操作符签名:

public mergeAll(concurrent: number): Observable

mergeAll 操作符作用:

将高阶 Observable 对象转换为一阶Observable 对象,并同时处理所有的 Observable 对象。

mergeAll 操作符示例:

var click = Rx.Observable.fromEvent(document.body, 'click');
var source = click.map(e => Rx.Observable.interval(1000));

var example = source.mergeAll();
example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});

示例 marble 图:

click  : ---------c-c------------------c--.. 
        map(e => Rx.Observable.interval(1000))
source : ---------o-o------------------o--..
                   \ \                  \----0----1--...
                    \ ----0----1----2----3----4--...
                     ----0----1----2----3----4--...
                     mergeAll()
example: ----------------00---11---22---33---(04)4--...

以上代码运行后,控制台的输出结果:

00
11
22
33
04
4

mergeAll 不会像 switch 那样退订原有的 Observable 对象,而是会并行处理多个 Observable 对象。

【这里为什么是这个输出结果??】 example: ----------------00---11---22---33---(04)4--...

combineLatest

combineLatest 操作符签名:

public combineLatest(other: ObservableInput, project: function): Observable

combineLatest 操作符作用:

用于合并输入的 Observable 对象,当源 Observable 对象和 other Observable 对象都发出值后,才会调用 project 函数。

combineLatest 操作符示例:

var source = Rx.Observable.interval(500).take(3);
var newest = Rx.Observable.interval(300).take(6);

var example = source.combineLatest(newest, (x, y) => x + y);

example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});

示例 marble 图:

source : ----0----1----2|
newest : --0--1--2--3--4--5|

    combineLatest(newest, (x, y) => x + y);

example: ----01--23-4--(56)--7|

以上代码运行后,控制台的输出结果:

0
1
2
3
4
5
6
7
complete

combineLatest 示例执行过程 (project -> (x, y) => x + y):

newest 发出 0 ,但此时 source 并未发出任何值,所以不会调用 project 函数
source 发出 0 ,此时 newest 最后一次发出的值为 0 ,调用 project 函数,返回值为 0
newest 发出 1 ,此时 source 最后一次发出的值为 0,调用 project 函数,返回值为 1
newest 发出 2 ,此时 source 最后一次发出的值为 0,调用 project 函数,返回值为 2
source 发出 1 ,此时 newest 最后一次发出的值为 2 ,调用 project 函数,返回值为 3
newest 发出 3 ,此时 source 最后一次发出的值为 1,调用 project 函数,返回值为 4
source 发出 2 ,此时 newest 最后一次发出的值为 3 ,调用 project 函数,返回值为 5
newest 发出 4 ,此时 source 最后一次发出的值为 2,调用 project 函数,返回值为 6
newest 发出 5 ,此时 source 最后一次发出的值为 2,调用 project 函数,返回值为 7
newest 和 source 都结束了,所以 example 也结束了。
Christian-Yang commented 7 years ago

RxJS 系列之四 - Subject 详解

Observer Pattern

观察者模式定义

观察者模式又叫发布订阅模式(Publish/Subscribe),它定义了一种一对多的关系,让多个观察者对象同时监听某一个主题对象,这个主题对象的状态发生变化时就会通知所有的观察者对象,使得它们能够自动更新自己。

我们可以使用日常生活中,期刊订阅的例子来形象地解释一下上面的概念。期刊订阅包含两个主要的角色:期刊出版方和订阅者,他们之间的关系如下:

期刊出版方 - 负责期刊的出版和发行工作
订阅者 - 只需执行订阅操作,新版的期刊发布后,就会主动收到通知,如果取消订阅,以后就不会再收到通知

在观察者模式中也有两个主要角色:Subject (主题) 和 Observer (观察者) 。它们分别对应例子中的期刊出版方和订阅者。接下来我们来看张图,从而加深对上面概念的理解。 observable1

观察者模式结构

obser2

观察者模式实战

Subject 类定义

class Subject {

    constructor() {
        this.observerCollection = [];
    }

    addObserver(observer) { // 添加观察者
        this.observerCollection.push(observer);
    }

    deleteObserver(observer) { // 移除观察者
        let index = this.observerCollection.indexOf(observer);
        if(index >= 0) this.observerCollection.splice(index, 1);
    }

    notifyObservers() { // 通知观察者
        this.observerCollection.forEach((observer)=>observer.notify());
    }
}

Observer 类定义

class Observer {
    constructor(name) {
        this.name = name;
    }

    notify() {
        console.log(`${this.name} has been notified.`);
    }
}

使用示例

let subject = new Subject(); // 创建主题对象

let observer1 = new Observer('semlinker'); // 创建观察者A - 'semlinker'
let observer2 = new Observer('lolo'); // 创建观察者B - 'lolo'

subject.addObserver(observer1); // 注册观察者A
subject.addObserver(observer2); // 注册观察者B

subject.notifyObservers(); // 通知观察者

subject.deleteObserver(observer1); // 移除观察者A

subject.notifyObservers(); // 验证是否成功移除

以上代码成功运行后控制台的输出结果:

semlinker has been notified.
lolo has been notified.
lolo has been notified.

Observable subscribe

在介绍 RxJS - Subject 之前,我们先来看个示例:

const interval$ = Rx.Observable.interval(1000).take(3);

interval$.subscribe({
  next: value => console.log('Observer A get value: ' + value);
});

setTimeout(() => {
  interval$.subscribe({
      next: value => console.log('Observer B get value: ' + value);
  });
}, 1000);

以上代码运行后,控制台的输出结果:

Observer A get value: 0
Observer A get value: 1
Observer B get value: 0
Observer A get value: 2
Observer B get value: 1
Observer B get value: 2

通过以上示例,我们可以得出以下结论:

  1. Observable 对象可以被重复订阅
  2. Observable 对象每次被订阅后,都会重新执行

上面的示例,我们可以简单地认为两次调用普通的函数,具体参考以下代码:

function interval() {
  setInterval(() => console.log('..'), 1000);
}

interval();

setTimeout(() => {
  interval();
}, 1000);

Observable 对象的默认行为,适用于大部分场景。但有些时候,我们会希望在第二次订阅的时候,不会从头开始接收 Observable 发出的值,而是从第一次订阅当前正在处理的值开始发送,我们把这种处理方式成为组播 (multicast),那我们要怎么实现呢 ?回想一下我们刚才介绍过观察者模式,你脑海中是不是已经想到方案了。没错,我们可以通过自定义 Subject 来实现上述功能。

自定义 Subject

Subject 类定义

class Subject {   
    constructor() {
        this.observers = [];
    }

    addObserver(observer) { 
        this.observers.push(observer);
    }

    next(value) {  
        this.observers.forEach(o => o.next(value));    
    }

    error(error){ 
        this.observers.forEach(o => o.error(error));
    }

    complete() {
        this.observers.forEach(o => o.complete());
    }
}

使用示例

const interval$ = Rx.Observable.interval(1000).take(3);
let subject = new Subject();

let observerA = {
    next: value => console.log('Observer A get value: ' + value),
    error: error => console.log('Observer A error: ' + error),
    complete: () => console.log('Observer A complete!')
};

var observerB = {
    next: value => console.log('Observer B get value: ' + value),
    error: error => console.log('Observer B error: ' + error),
    complete: () => console.log('Observer B complete!')
};

subject.addObserver(observerA); // 添加观察者A
interval$.subscribe(subject); // 订阅interval$对象
setTimeout(() => {
   subject.addObserver(observerB); // 添加观察者B
}, 1000);

以上代码运行后,控制台的输出结果:

Observer A get value: 0
Observer A get value: 1
Observer B get value: 1
Observer A get value: 2
Observer B get value: 2
Observer A complete!
Observer B complete!

通过自定义 Subject,我们实现了前面提到的功能。接下来我们进入正题 - RxJS Subject。

RxJS Subject

首先我们通过 RxJS Subject 来重写一下上面的示例:

const interval$ = Rx.Observable.interval(1000).take(3);
let subject = new Rx.Subject();

let observerA = {
    next: value => console.log('Observer A get value: ' + value),
    error: error => console.log('Observer A error: ' + error),
    complete: () => console.log('Observer A complete!')
};

var observerB = {
    next: value => console.log('Observer B get value: ' + value),
    error: error => console.log('Observer B error: ' + error),
    complete: () => console.log('Observer B complete!')
};

subject.subscribe(observerA); // 添加观察者A
interval$.subscribe(subject); // 订阅interval$对象
setTimeout(() => {
   subject.subscribe(observerB); // 添加观察者B
}, 1000);

RxJS Subject 源码片段

/**
 * Suject继承于Observable 
 */
export class Subject extends Observable {
    constructor() {
        super();
        this.observers = []; // 观察者列表
        this.closed = false;
        this.isStopped = false;
        this.hasError = false;
        this.thrownError = null;
    }

    next(value) {
        if (this.closed) {
            throw new ObjectUnsubscribedError();
        }
        if (!this.isStopped) {
            const { observers } = this;
            const len = observers.length;
            const copy = observers.slice();
            for (let i = 0; i < len; i++) { // 循环调用观察者next方法,通知观察者
                copy[i].next(value);
            }
        }
    }

    error(err) {
        if (this.closed) {
            throw new ObjectUnsubscribedError();
        }
        this.hasError = true;
        this.thrownError = err;
        this.isStopped = true;
        const { observers } = this;
        const len = observers.length;
        const copy = observers.slice();
        for (let i = 0; i < len; i++) { // 循环调用观察者error方法
            copy[i].error(err);
        }
        this.observers.length = 0;
    }

    complete() {
        if (this.closed) {
            throw new ObjectUnsubscribedError();
        }
        this.isStopped = true;
        const { observers } = this;
        const len = observers.length;
        const copy = observers.slice();
        for (let i = 0; i < len; i++) { // 循环调用观察者complete方法
            copy[i].complete();
        }
        this.observers.length = 0; // 清空内部观察者列表
    }
}

通过 RxJS Subject 示例和源码片段,对于 Subject 我们可以得出以下结论:

  1. Subject 既是 Observable 对象,又是 Observer 对象
  2. 当有新消息时,Subject 会对内部的 observers 列表进行组播 (multicast)

Angular 2 RxJS Subject 应用

在 Angular 2 中,我们可以利用 RxJS Subject 来实现组件通信,具体示例如下:

message.service.ts

import { Injectable } from '@angular/core';
import {Observable} from 'rxjs/Observable';
import { Subject } from 'rxjs/Subject';

@Injectable()
export class MessageService {
    private subject = new Subject<any>();

    sendMessage(message: string) {
        this.subject.next({ text: message });
    }

    clearMessage() {
        this.subject.next();
    }

    getMessage(): Observable<any> {
        return this.subject.asObservable();
    }
}
home.component.ts

import { Component } from '@angular/core';

import { MessageService } from '../_services/index';

@Component({
    moduleId: module.id,
    templateUrl: 'home.component.html'
})

export class HomeComponent {
    constructor(private messageService: MessageService) {}

    sendMessage(): void { // 发送消息
        this.messageService.sendMessage('Message from Home Component to App Component!');
    }

    clearMessage(): void { // 清除消息
        this.messageService.clearMessage();
    }
}
app.component.ts

import { Component, OnDestroy } from '@angular/core';
import { Subscription } from 'rxjs/Subscription';

import { MessageService } from './_services/index';

@Component({
    moduleId: module.id,
    selector: 'app',
    templateUrl: 'app.component.html'
})

export class AppComponent implements OnDestroy {
    message: any;
    subscription: Subscription;

    constructor(private messageService: MessageService) {
        this.subscription = this.messageService.getMessage()
              .subscribe(message => { this.message = message; });
    }

    ngOnDestroy() {
        this.subscription.unsubscribe();
    }
}

以上示例实现的功能是组件之间消息通信,即 HomeComponent 子组件,向 AppComponent 父组件发送消息。代码运行后,浏览器的显示结果如下:

Plunker 示例

Subject 存在的问题

因为 Subject 在订阅时,是把 observer 存放到观察者列表中,并在接收到新值的时候,遍历观察者列表并调用观察者上的 next 方法,具体如下:

next(value) {
        if (this.closed) {
            throw new ObjectUnsubscribedError();
        }
        if (!this.isStopped) {
            const { observers } = this;
            const len = observers.length;
            const copy = observers.slice();
            for (let i = 0; i < len; i++) { // 循环调用观察者next方法,通知观察者
                copy[i].next(value);
            }
        }
}

这样会有一个大问题,如果某个 observer 在执行时出现异常,却没进行异常处理,就会影响到其它的订阅者,具体示例如下:

const source = Rx.Observable.interval(1000);
const subject = new Rx.Subject();

const example = subject.map(x => {
    if (x === 1) {
        throw new Error('oops');
    }
    return x;
});
subject.subscribe(x => console.log('A', x));
example.subscribe(x => console.log('B', x));
subject.subscribe(x => console.log('C', x));

source.subscribe(subject);

以上代码运行后,控制台的输出结果:

A 0
B 0
C 0
A 1
Rx.min.js:74 Uncaught Error: oops

JSBin - Subject Problem Demo

在代码运行前,大家会认为观察者B 会在接收到 1 值时抛出异常,观察者 A 和 C 仍会正常运行。但实际上,在当前的 RxJS 版本中若观察者 B 报错,观察者 A 和 C 也会停止运行。那么应该如何解决这个问题呢?目前最简单的方式就是为所有的观察者添加异常处理,更新后的代码如下:

const source = Rx.Observable.interval(1000);
const subject = new Rx.Subject();

const example = subject.map(x => {
    if (x === 1) {
        throw new Error('oops');
    }
    return x;
});

subject.subscribe(
    x => console.log('A', x),
    error => console.log('A Error:' + error)
);

example.subscribe(
    x => console.log('B', x),
    error => console.log('B Error:' + error)
);

subject.subscribe(
    x => console.log('C', x),
    error => console.log('C Error:' + error)
);

source.subscribe(subject);

Subject 其实是观察者模式的实现,所以当观察者订阅 Subject 对象时,Subject 对象会把订阅者添加到观察者列表中,每当有 subject 对象接收到新值时,它就会遍历观察者列表,依次调用观察者内部的 next() 方法,把值一一送出。

Subject 之所以具有 Observable 中的所有方法,是因为 Subject 类继承了 Observable 类,在 Subject 类中有五个重要的方法:

    next - 每当 Subject 对象接收到新值的时候,next 方法会被调用
    error - 运行中出现异常,error 方法会被调用
    complete - Subject 订阅的 Observable 对象结束后,complete 方法会被调用
    subscribe - 添加观察者
    unsubscribe - 取消订阅 (设置终止标识符、清空观察者列表)

BehaviorSubject

BehaviorSubject 定义

BehaviorSubject 源码片段

export class BehaviorSubject extends Subject {
    constructor(_value) { // 设置初始值
        super();
        this._value = _value;
    }
    get value() { // 获取当前值
        return this.getValue();
    }
    _subscribe(subscriber) {
        const subscription = super._subscribe(subscriber);
        if (subscription && !subscription.closed) {
            subscriber.next(this._value); // 为新的订阅者发送当前最新的值
        }
        return subscription;
    }
    getValue() {
        if (this.hasError) {
            throw this.thrownError;
        }
        else if (this.closed) {
            throw new ObjectUnsubscribedError();
        }
        else {
            return this._value;
        }
    }
    next(value) { // 调用父类Subject的next方法,同时更新当前值
        super.next(this._value = value);
    }
}

BehaviorSubject 应用

有些时候我们会希望 Subject 能保存当前的最新状态,而不是单纯的进行事件发送,也就是说每当新增一个观察者的时候,我们希望 Subject 能够立即发出当前最新的值,而不是没有任何响应。具体我们先看一下示例:

var subject = new Rx.Subject();

var observerA = {
    next: value => console.log('Observer A get value: ' + value),
    error: error => console.log('Observer A error: ' + error),
    complete: () => console.log('Observer A complete!')
};

var observerB = {
    next: value => console.log('Observer B get value: ' + value),
    error: error => console.log('Observer B error: ' + error),
    complete: () => console.log('Observer B complete!')
};

subject.subscribe(observerA);

subject.next(1);
subject.next(2);
subject.next(3);

setTimeout(() => {
  subject.subscribe(observerB); // 1秒后订阅
}, 1000);

以上代码运行后,控制台的输出结果:

Observer A get value: 1
Observer A get value: 2
Observer A get value: 3

通过输出结果,我们发现在 observerB 订阅 Subject 对象后,它再也没有收到任何值了。因为 Subject 对象没有再调用 next() 方法。但很多时候我们会希望 Subject 对象能够保存当前的状态,当新增订阅者的时候,自动把当前最新的值发送给订阅者。要实现这个功能,我们就需要使用 BehaviorSubject。

BehaviorSubject 跟 Subject 最大的不同就是 BehaviorSubject 是用来保存当前最新的值,而不是单纯的发送事件。BehaviorSubject 会记住最近一次发送的值,并把该值作为当前值保存在内部的属性中。接下来我们来使用 BehaviorSubject 重新一下上面的示例:

var subject = new Rx.BehaviorSubject(0); // 设定初始值

var observerA = {
    next: value => console.log('Observer A get value: ' + value),
    error: error => console.log('Observer A error: ' + error),
    complete: () => console.log('Observer A complete!')
};

var observerB = {
    next: value => console.log('Observer B get value: ' + value),
    error: error => console.log('Observer B error: ' + error),
    complete: () => console.log('Observer B complete!')
};

subject.subscribe(observerA);

subject.next(1);
subject.next(2);
subject.next(3);

setTimeout(() => {
  subject.subscribe(observerB); // 1秒后订阅
}, 1000);

以上代码运行后,控制台的输出结果:

Observer A get value: 0
Observer A get value: 1
Observer A get value: 2
Observer A get value: 3
Observer B get value: 3

JSBin - BehaviorSubject

ReplaySubject

ReplaySubject 定义

ReplaySubject 源码片段

export class ReplaySubject extends Subject {
    constructor(bufferSize = Number.POSITIVE_INFINITY, 
                windowTime = Number.POSITIVE_INFINITY, 
                scheduler) {
        super();
        this.scheduler = scheduler;
        this._events = []; // ReplayEvent对象列表
        this._bufferSize = bufferSize < 1 ? 1 : bufferSize; // 设置缓冲区大小
        this._windowTime = windowTime < 1 ? 1 : windowTime;
    }

    next(value) {
        const now = this._getNow();
        this._events.push(new ReplayEvent(now, value));
        this._trimBufferThenGetEvents();
        super.next(value);
    }

  _subscribe(subscriber) {
        const _events = this._trimBufferThenGetEvents(); // 过滤ReplayEvent对象列表
        let subscription;
        if (this.closed) {
            throw new ObjectUnsubscribedError();
        }
        ...
        else {
            this.observers.push(subscriber);
            subscription = new SubjectSubscription(this, subscriber);
        }
          ...
        const len = _events.length;
        // 重新发送设定的最后bufferSize个值
        for (let i = 0; i < len && !subscriber.closed; i++) {
            subscriber.next(_events[i].value);
        }
        ...
        return subscription;
    }
}

class ReplayEvent {
    constructor(time, value) {
        this.time = time;
        this.value = value;
    }
}

ReplaySubject 应用

有些时候我们希望在 Subject 新增订阅者后,能向新增的订阅者重新发送最后几个值,这时我们就可以使用 ReplaySubject ,具体示例如下:

var subject = new Rx.ReplaySubject(2); // 重新发送最后2个值

var observerA = {
    next: value => console.log('Observer A get value: ' + value),
    error: error => console.log('Observer A error: ' + error),
    complete: () => console.log('Observer A complete!')
};

var observerB = {
    next: value => console.log('Observer B get value: ' + value),
    error: error => console.log('Observer B error: ' + error),
    complete: () => console.log('Observer B complete!')
};

subject.subscribe(observerA);

subject.next(1);
subject.next(2);
subject.next(3);

setTimeout(() => {
  subject.subscribe(observerB); // 1秒后订阅
}, 1000);

以上代码运行后,控制台的输出结果:

Observer A get value: 1
Observer A get value: 2
Observer A get value: 3
Observer B get value: 2
Observer B get value: 3

可能会有人认为 ReplaySubject(1) 是不是等同于 BehaviorSubject,其实它们是不一样的。在创建BehaviorSubject 对象时,是设置初始值,它用于表示 Subject 对象当前的状态,而 ReplaySubject 只是事件的重放。

JSBin - ReplaySubject

AsyncSubject

AsyncSubject 定义

AsyncSubject 源码片段

export class AsyncSubject extends Subject {
    constructor() {
        super(...arguments);
        this.value = null;
        this.hasNext = false;
        this.hasCompleted = false; // 标识是否已完成
    }
    _subscribe(subscriber) {
        if (this.hasError) {
            subscriber.error(this.thrownError);
            return Subscription.EMPTY;
        }
        else if (this.hasCompleted && this.hasNext) { // 等到完成后,才发出最后的值
            subscriber.next(this.value);
            subscriber.complete();
            return Subscription.EMPTY;
        }
        return super._subscribe(subscriber);
    }
    next(value) {
        if (!this.hasCompleted) { // 若未完成,保存当前的值
            this.value = value;
            this.hasNext = true;
        }
    }
}

AsyncSubject 应用

AsyncSubject 类似于 last 操作符,它会在 Subject 结束后发出最后一个值,具体示例如下:

var subject = new Rx.AsyncSubject();

  var observerA = {
    next: value => console.log('Observer A get value: ' + value),
    error: error => console.log('Observer A error: ' + error),
    complete: () => console.log('Observer A complete!')
  };

  var observerB = {
    next: value => console.log('Observer B get value: ' + value),
    error: error => console.log('Observer B error: ' + error),
    complete: () => console.log('Observer B complete!')
  };

  subject.subscribe(observerA);

  subject.next(1);
  subject.next(2);
  subject.next(3);

  subject.complete();

  setTimeout(() => {
    subject.subscribe(observerB); // 1秒后订阅
  }, 1000);

以上代码运行后,控制台的输出结果:

Observer A get value: 3
Observer A complete!
Observer B get value: 3
Observer B complete!

参考资源

Understanding Subjects in RxJS
30 天精通 RxJS (22) - 什么是 Subject
Communicating Between Components with Observable & Subject
Christian-Yang commented 7 years ago

http://www.jianshu.com/p/e106a153e5dc

http://www.tuicool.com/articles/zaAVre3

https://juejin.im/post/58dca39861ff4b006b03b80c

Christian-Yang commented 7 years ago

RxJS 核心概念之Subject,原文地址:http://www.tuicool.com/articles/zaAVre3

什么是Subject?在RxJS中,Subject是一类特殊的Observable,它可以向多个Observer多路推送数值。普通的Observable并不具备多路推送的能力(每一个Observer都有自己独立的执行环境),而Subject可以共享一个执行环境。

Subject是一种可以多路推送的可观察对象。与EventEmitter类似,Subject维护着自己的Observer。

每一个Subject都是一个Observable(可观察对象)对于一个Subject,你可以订阅( subscribe )它,Observer会和往常一样接收到数据。从Observer的视角看,它并不能区分自己的执行环境是普通Observable的单路推送还是基于Subject的多路推送。

Subject的内部实现中,并不会在被订阅( subscribe )后创建新的执行环境。它仅仅会把新的Observer注册在由它本身维护的Observer列表中,这和其他语言、库中的 addListener 机制类似。

每一个Subject也可以作为Observer(观察者)Subject同样也是一个由 next(v) , error(e) ,和 complete() 这些方法组成的对象。调用 next(theValue) 方法后,Subject会向所有已经在其上注册的Observer多路推送 theValue 。

下面的例子中,我们在Subject上注册了两个Observer,并且多路推送了一些数值:

var subject = new Rx.Subject();

subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});
subject.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

subject.next(1);
subject.next(2);

控制台输出结果如下:

observerA: 1
observerB: 1
observerA: 2
observerB: 2

既然Subject是一个Observer,你可以把它作为 subscribe (订阅)普通Observable时的参数,如下面例子所示:

var subject = new Rx.Subject();

subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});
subject.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

var observable = Rx.Observable.from([1, 2, 3]);

observable.subscribe(subject); // 你可以传递Subject来订阅observable

执行后结果如下:

observerA: 1
observerB: 1
observerA: 2
observerB: 2
observerA: 3
observerB: 3

通过上面的实现:我们发现可以通过Subject将普通的Observable单路推送转换为多路推送。这说明了Subject的作用——作为单路Observable转变为多路Observable的桥梁。

还有几种特殊的 Subject 类型,分别是 BehaviorSubject , ReplaySubject ,和 AsyncSubject 。

多路推送的Observable

在以后的语境中,每当提到“多路推送的Observable”,我们特指通过Subject构建的Observable执行环境。否则“普通的Observable”只是一个不会共享执行环境并且被订阅后才生效的一系列值。

通过使用Subject可以创建拥有相同执行环境的多路的Observable。

下面展示了 多路 的运作方式:Subject从普通的Observable订阅了数据,然后其他Observer又订阅了这个Subject,示例如下:

var source = Rx.Observable.from([1, 2, 3]);
var subject = new Rx.Subject();
var multicasted = source.multicast(subject);

// 通过`subject.subscribe({...})`订阅Subject的Observer:
multicasted.subscribe({
  next: (v) => console.log('observerA: ' + v)
});
multicasted.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

// 让Subject从数据源订阅开始生效:
multicasted.connect();

multicast 方法返回一个类似于Observable的可观察对象,但是在其被订阅后,它会表现Subject的特性。 multicast 返回的对象同时是 ConnectableObservable 类型的,拥有 connect() 方法。

connect() 方法非常的重要,它决定Observable何时开始执行。由于调用 connect() 后,Observable开始执行,因此, connect() 会返回一个 Subscription 供调用者来终止执行。

引用计数

通过手动调用 connect() 返回的Subscription控制执行十分繁杂。通常,我们希望在有第一个Observer订阅Subject后 自动 connnect ,当所有Observer都取消订阅后终止这个Subject。

我们来分析一下下面例子中subscription的过程:

第一个Observer 订阅了多路推送的 Observable

多路Observable被连接

向第一个Observer发送 值为 0 的 next 通知

第二个Observer订阅了多路推送的 Observable

向第一个Observer发送 值为 1 的 next 通知

向第二个Observer发送 值为 1 的 next 通知

第一个Observer取消了对多路推送的Observable的订阅

向第二个Observer发送 值为 2 的 next 通知

第二个Observer取消了对多路推送的Observable的订阅

取消对多路推送的Observable的连接

通过显式地调用 connect() ,代码如下:

var source = Rx.Observable.interval(500);
var subject = new Rx.Subject();
var multicasted = source.multicast(subject);
var subscription1, subscription2, subscriptionConnect;

subscription1 = multicasted.subscribe({
  next: (v) => console.log('observerA: ' + v)
});
subscriptionConnect = multicasted.connect();

setTimeout(() => {
  subscription2 = multicasted.subscribe({
    next: (v) => console.log('observerB: ' + v)
  });
}, 600);

setTimeout(() => {
  subscription1.unsubscribe();
}, 1200);

setTimeout(() => {
  subscription2.unsubscribe();
  subscriptionConnect.unsubscribe(); 
}, 2000);

如果你不想显式地调用 connect() 方法,可以在ConnectableObservable类型的Observable上调用 refCount() 方法。方法会进行引用计数:记录Observable被订阅的行为。当订阅数从 0 到 1 时 refCount() 会调用 connect() 方法。到订阅数从 1 到 0 ,他会终止整个执行过程。

refCount 使得多路推送的Observable在被订阅后自动执行,在所有观察者取消订阅后,停止执行。

下面是示例:

var source = Rx.Observable.interval(500);
var subject = new Rx.Subject();
var refCounted = source.multicast(subject).refCount();
var subscription1, subscription2, subscriptionConnect;

console.log('observerA subscribed');
subscription1 = refCounted.subscribe({
  next: (v) => console.log('observerA: ' + v)
});

setTimeout(() => {
  console.log('observerB subscribed');
  subscription2 = refCounted.subscribe({
    next: (v) => console.log('observerB: ' + v)
  });
}, 600);

setTimeout(() => {
  console.log('observerA unsubscribed');
  subscription1.unsubscribe();
}, 1200);

setTimeout(() => {
  console.log('observerB unsubscribed');
  subscription2.unsubscribe();
}, 2000);

执行输出结果如下:

observerA subscribed
observerA: 0
observerB subscribed
observerA: 1
observerB: 1
observerA unsubscribed
observerB: 2
observerB unsubscribed

只有ConnectableObservables拥有 refCount() 方法,调用后会返回一个 Observable 而不是新的ConnectableObservable。

BehaviorSubject

BehaviorSubject 是Subject的一个衍生类,具有“最新的值”的概念。它总是保存最近向数据消费者发送的值,当一个Observer订阅后,它会即刻从 BehaviorSubject 收到“最新的值”。

BehaviorSubjects非常适于表示“随时间推移的值”。举一个形象的例子,Subject表示一个人的生日,而Behavior则表示一个人的岁数。(生日只是一天,一个人的岁数会保持到下一次生日之前。)

下面例子中,展示了如何用 0 初始化BehaviorSubject,当Observer订阅它时, 0 是第一个被推送的值。紧接着,在第二个Observer订阅BehaviorSubject之前,它推送了 2 ,虽然订阅在推送 2 之后,但是第二个Observer仍然能接受到 2 :

var subject = new Rx.BehaviorSubject(0 /* 初始值 */);

subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});

subject.next(1);
subject.next(2);

subject.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

subject.next(3);

输出结果如下:

observerA: 0
observerA: 1
observerA: 2
observerB: 2
observerA: 3
observerB: 3

ReplaySubject

ReplaySubject 如同于 BehaviorSubject 是 Subject 的子类。通过 ReplaySubject 可以向新的订阅者推送旧数值,就像一个录像机 ReplaySubject 可以记录Observable的一部分状态(过去时间内推送的值)。

.一个 ReplaySubject 可以记录Observable执行过程中推送的多个值,并向新的订阅者回放它们。

你可以指定回放值的数量:

var subject = new Rx.ReplaySubject(3 /* 回放数量 */);

subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});

subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);

subject.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

subject.next(5);

输出如下:

observerA: 1
observerA: 2
observerA: 3
observerA: 4
observerB: 2
observerB: 3
observerB: 4
observerA: 5
observerB: 5

除了回放数量,你也可以以毫秒为单位去指定“窗口时间”,决定ReplaySubject记录多久以前Observable推送的数值。下面的例子中,我们把回放数量设置为 100 ,把窗口时间设置为 500 毫秒:

var subject = new Rx.ReplaySubject(100, 500 /* windowTime */);

subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});

var i = 1;
setInterval(() => subject.next(i++), 200);

setTimeout(() => {
  subject.subscribe({
    next: (v) => console.log('observerB: ' + v)
  });
}, 1000);

第二个Observer接受到 3 (600ms), 4 (800ms) 和 5 (1000ms),这些值均在订阅之前的 500 毫秒内推送(窗口长度 1000ms – 600ms = 400ms < 500ms):

observerA: 1
observerA: 2
observerA: 3
observerA: 4
observerA: 5
observerB: 3
observerB: 4
observerB: 5
observerA: 6
observerB: 6
...

AsyncSubject

AsyncSubject是Subject的另外一个衍生类,Observable仅会在执行完成后,推送执行环境中的最后一个值。

var subject = new Rx.AsyncSubject();

subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});

subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);

subject.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

subject.next(5);
subject.complete();

输出结果如下:


observerA: 5
observerB: 5

AsyncSubject 与 last() 操作符相似,等待完成通知后推送执行过程的最后一个值。