Christian-health / StudyNote2017

2017年学习笔记
0 stars 0 forks source link

RxJs操作符笔记 #2

Open Christian-health opened 6 years ago

Christian-health commented 6 years ago

RxJs操作符笔记,原文:http://reactivex.io/documentation/observable.html 屏幕尺子:http://www.chizi.cc/ RxJs给出的操作符名字变更和删除文档的地址:https://github.com/ReactiveX/RxJS/blob/master/MIGRATION.md#operators-renamed-or-removed

RxJs操作符汉语命名好的:http://cw.hubwiz.com/card/c/569d92e3acf9a45a69b05154/1/2/2/

RxJs操作符汉语命名好的另外一个:http://xc.hubwiz.com/course/569d92e3acf9a45a69b05154?affid=csdn

RxJs操作符笔记,原文:http://reactivex.io/documentation/observable.html中的每一个函数解释的时候,都有这样的一段话,这句话的意思是,这个函数timeInterval 在下面这些文件中有实现: rx.all.js rx.all.compat.js rx.time.js rx.lite.js rx.lite.compat.js

timeInterval is found in each of the following distributions:

rx.all.js
rx.all.compat.js
rx.time.js (requires rx.js or rx.compat.js)
rx.lite.js
rx.lite.compat.js

================================================================================

scan 操作符

apply a function to each item emitted by an Observable, sequentially, and emit each successive value. 对Observable发出的每个项目顺序应用一个函数,并发出每个连续的值。 The Scan operator applies a function to the first item emitted by the source Observable and then emits the result of that function as its own first emission. 扫描运算符对源Observable发出的第一个项目应用一个函数,然后将该函数的结果作为自己的第一个发射发出。 It also feeds the result of the function back into the function along with the second item emitted by the source Observable in order to generate its second emission. 它还将功能的结果与源Observable发射的第二个项目一起馈送到功能中,以产生其第二个发射。 It continues to feed back its own subsequent emissions along with the subsequent emissions from the source Observable in order to create the rest of its sequence. 它继续反馈自己的后续排放以及源Observable的后续排放,以创建其余的序列。 This sort of operator is sometimes called an “accumulator” in other contexts. 这种运算符有时在其他上下文中称为“累加器”。 scan-demo

ar source = Rx.Observable.range(1, 3)
    .scan(
        function (acc, x) {
            return acc + x;
        });

var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x); },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); });
Next: 1
Next: 3
Next: 6
Completed

没有提供acc初始值的情况下,acc初始值为0,应该是。 qq1

var source = Rx.Observable.range(1, 3)
    .scan( function (acc, x) {
            return acc * x;
           }, 1 );

var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x); },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); });
Next: 1
Next: 2
Next: 6
Completed

这里为acc提供了一个初始的值acc =1

demo code

<!DOCTYPE html>
<html>
<head>
    <title></title>
    <script type="text/javascript" src="http://apps.bdimg.com/libs/jquery/2.1.4/jquery.min.js"></script>

    <script type="text/javascript" src="./rxjs/dist/rx.lite.js"></script>
</head>
<body>
<script>
var source = Rx.Observable.range(1, 3)
    .scan(
        function (acc, x) {
            console.log('x value :',x);
            return acc + x;
        },10);

var subscription = source.subscribe(
    function (x) {
        console.log('Next: ' + x);
    },
    function (err) {
        console.log('Error: ' + err);   
    },
    function () {
        console.log('Completed');   
    });

// => Next: 1
// => Next: 3
// => Next: 6
// => Completed 
</script>
</body>
</html>

console print

x value : 1
Next: 11
x value : 2
Next: 13
x value : 3
Next: 16
Christian-health commented 6 years ago

throttleTime操作符

throttle节流,节流阀,扼杀

only emit an item from an Observable if a particular timespan has passed without it emitting another item 只有一个特定的时间段已经过去,并且没有发出其他的item的情况下,才会发出一个从Observable中生成的item

The Debounce operator filters out items emitted by the source Observable that are rapidly followed by another emitted item. Debounce运算符过滤掉源Observable发出的项目,后面紧跟着另一个发送的项目。 (也就是说如果一个item1后面紧跟着另外的一个item2,那么item1就会被过滤掉) debouce 红球出现之后的指定时间之内没有任何新的球出现,所以红球留下。而黄球出现之后,在指定的时间还没有过去的情况下,绿球就出现了(因该是绿色,我是色盲分不清出),所以黄球被过滤掉了,绿球留了下来,接着在绿球出现的指定时间之内没有新的球出现了。那么绿球最终留下。 The first variant — called either debounce or throttleWithTimeout — accepts as its parameter a duration, defined as an integer number of milliseconds, and it suppresses any emitted items that are followed by other emitted items during that duration since the first item’s emission. 第一个变量 - 称为debounce(抖动)或throttleWithTimeout - 接受作为其参数的持续时间,定义为整数毫秒,并且在第一个项目发射之后的该持续时间内抑制其他发射项目后面的任何发射项目。 debounce 下面代码的解释,见上面图片:

var times = [
    { value: 0, time: 100 },
    { value: 1, time: 600 },
    { value: 2, time: 400 },
    { value: 3, time: 700 },
    { value: 4, time: 200 }
];

// Delay each item by time and project value;
var source = Rx.Observable.from(times)
  .flatMap(function (item) {
    return Rx.Observable
      .of(item.value)
      .delay(item.time);
  })
  .debounce(500 /* ms */);

var subscription = source.subscribe(
  function (x) {
    console.log('Next: %s', x);
  },
  function (err) {
    console.log('Error: %s', err);
  },
  function () {
    console.log('Completed');
  });
Next: 0
Next: 2
Next: 4
Completed
Christian-health commented 6 years ago

Delay 操作符

shift the emissions from an Observable forward in time by a particular amount

The Delay operator modifies its source Observable by pausing for a particular increment of time (that you specify) before emitting each of the source Observable’s items. This has the effect of shifting the entire sequence of items emitted by the Observable forward in time by that specified increment. 延迟运算符通过在发出每个源Observable的项目之前暂停一段特定的时间增量(您指定的)来修改其来源可观察值。 这具有将Observable发射的整个序列的时间向前移动指定的增量的效果 delay1 In RxJS you can set the per-item delay in two ways: by passing a number of milliseconds into the delay operator (which will delay each emission by that amount of time), or by passing in a Date object [which will delay the beginning of the sequence of emissions until that absolute point in time]. 在RxJS中,您可以通过两种方式设置每个项目的延迟:通过传递延迟运算符(将延迟每个发射的时间量),或通过传递一个Date对象[这将延迟队列排放开始时间,直到绝对时间点]。

This operator operates by default on the timeout Scheduler, but you can override this by passing in another Scheduler as an optional second parameter. 此运算符默认在timeout Scheduler (超时计划程序)上运行,但您可以通过将另一个调度程序作为可选的第二个参数传递来覆盖此操作。 [意思是说delay这个函数有第二个参数?这个参数可以覆盖掉默认的timeout Scheduler]

第二种使用方式:通过传递一个Date对象[这将延迟队列排放开始时间,直到绝对时间点

var source = Rx.Observable.range(0, 3)
    .delay(new Date(Date.now() + 1000));

var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x.toString()); },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); }); 
Next: 0
Next: 1
Next: 2
Completed

第一种使用方式:通过传递延迟运算符(将延迟每个发射的时间量)

var source = Rx.Observable.range(0, 3)
    .delay(1000);

var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x.toString()); },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); });
Next: 0
Next: 1
Next: 2
Completed
Christian-health commented 6 years ago

FlatMap (mergeMap)操作符

transform the items emitted by an Observable into Observables, then flatten the emissions from those into a single Observable 将Observable发出的item转换为Observables,然后将其中的排放物平坦化为单个Observable (注意,这个意思是:会把原来的stream投射出的元素转换为另一个stream,然后最后 将这些stream合并为一个stream进行投射出来。flat的意思就是扁平,平面) [为了理解好这句话,下面转载的FlatMap详情非常有用:]https://juejin.im/entry/58be21af570c3500622a6a3c flatmap The FlatMap operator transforms an Observable by applying a function that you specify to each item emitted by the source Observable, where that function returns an Observable that itself emits items. FlatMap then merges the emissions of these resulting Observables, emitting these merged results as its own sequence. FlatMap操作符通过将您指定的函数应用于源Observable发出的每个item来转换Observable,该函数返回自身发出的Observable。 FlatMap然后合并这些结果的可观察物的排放,将这些合并结果作为自己的队列发布。 This method is useful, for example, when you have an Observable that emits a series of items that themselves have Observable members or are in other ways transformable into Observables, so that you can create a new Observable that emits the complete collection of items emitted by the sub-Observables of these items. 这个方法很有用,例如,当你有一个Observable发出一系列自己拥有Observable成员或者以其他方式转换成Observables的项目,这样你就可以创建一个新的Observable来发出完整的项目集合(通过合并)这些项目的子观察值。 Note that FlatMap merges the emissions of these Observables, so that they may interleave. 请注意,FlatMap会合并这些Observables的排放,所以它们可能会交错。 In several of the language-specific implementations there is also an operator that does not interleave the emissions from the transformed Observables, but instead emits these emissions in strict order, often called ConcatMap or something similar. 在一些特定语言的实现中,还有一个运算符不会交换已变换的“可观测值”的排放,而是以严格的顺序排放这些排放,通常称为ConcatMap或类似的。 (这里的意思可以通过下面的代码看出区别:)

<!DOCTYPE html>
<html>
<head>
    <title></title>
    <script type="text/javascript" src="http://apps.bdimg.com/libs/jquery/2.1.4/jquery.min.js"></script>

    <script type="text/javascript" src="./rxjs/dist/rx.lite.js"></script>
</head>
<body>
<script>
// flatMap example
var stream1 = Rx.Observable.from([10, 20, 30]);
var stream2 = function(n) {
  return Rx.Observable.from([n+1, n+2, n+3]);
};

stream1.flatMap(function(n) {
  console.log("(" + n + ")");
  return stream2(n);
}).subscribe(function(n) {
  console.log(n);
}, function () {
  console.log('err');
}, function () {
  console.log('finished');
});
</script>
</body>
</html>

flatmapresult

<!DOCTYPE html>
<html>
<head>
    <title></title>
    <script type="text/javascript" src="http://apps.bdimg.com/libs/jquery/2.1.4/jquery.min.js"></script>

    <script type="text/javascript" src="./rxjs/dist/rx.lite.js"></script>
</head>
<body>
<script>
// flatMap example
var stream1 = Rx.Observable.from([10, 20, 30]);
var stream2 = function(n) {
  return Rx.Observable.from([n+1, n+2, n+3]);
};

stream1.concatMap(function(n) {
  console.log("(" + n + ")");
  return stream2(n);
}).subscribe(function(n) {
  console.log(n);
}, function () {
  console.log('err');
}, function () {
  console.log('finished');
});
</script>
</body>
</html>

concatmapresult

FlatMap和Map的区别:

原文转自:http://blog.csdn.net/u012631731/article/details/71597541

let source1: string = Observable.range(1,4).map((item) => {
        return item;
});
source1.subscribe((itam) => {
       console.log('return value: ' + item);
});
return value:1
return value:2
return value:3
return value:4
let source2: string = Observable.range(1,4).flatMap((item) => {
        return Observable.of(item);
});
source2.subscribe((itam) => {
       console.log('return value: ' + item);
});
return value:1
return value:2
return value:3
return value:4

在RxJS中map和flatMap都可以对单个对象进行操作,最主要的区别在于返回的数据类型,map的返回就是原本的数据类型即可,而flatMap返回的是一个流数据,即要在数据返回是增加Observable的转换。 这里的return的单粒的,如果想直接返回一个数组,直接使用toArray()方法即可。

let source2: string = Observable.range(1,4).flatMap((item) => {
      return Observable.of(item);
}).toArray();
source2.subscribe((itam) => {
    console.log('return value: ' + item);
});
return value:[1,2,3,4]

关于FlatMap的详细的解释,转自:https://juejin.im/entry/58be21af570c3500622a6a3c

Note: stream <==> Observable sequence

看官网或者其他文档对flatMap的解释是,会把原来的stream投射出的元素转换为另一个stream,然后最后 将这些stream合并为一个stream进行投射出来。所以,对于下面的代码,我的直觉认为,输出结果应该是: 11, 12, 13, 21, 22, 23, 31, 32, 33。但是在jsbin上运行输出后,输出的结果 是:11, 12, 21, 13, 22, 31, 23, 32, 33,虽然这肯定是正确的输出,但是不明白。

// flatMap example
var stream1 = Rx.Observable.from([10, 20, 30]);
var stream2 = function(n) {
      return Rx.Observable.from([n+1, n+2, n+3]);
};

stream1.flatMap(function(n) {
       return stream2(n);
}).subscribe(function(n) {
       console.log(n);
}, function () {
      console.log('err');
}, function () {
     console.log('finished'); 
});

我绞尽脑汁想了想也不明白为什么12后面就是21了,而不是13。我去翻了翻 rxjs 的源码,想去 一探究竟,但是除了发现 flatMap 是 mergeMap 的别名外,似乎没其他收获。

后来我在代码里添加了一行输出:

stream1.flatMap(function(n) {
     console.log("(" + n + ")");
      return stream2(n);
}).subscribe(function(n) {
      console.log(n);
}, function () {
     console.log('err');
}, function () {
     console.log('finished');
});

现在输出结果变成了:(10), 11, (20), 12, 21, (30) 13, 22, 31, 23, 32, 33,从这个输出中 我立马发现了一些东西,然后明白这个flatMap后的stream投射方式是怎么样的了。

第一个stream1,在投射元素的时候,通过flatMap会将投射出的元素转换成stream,我们称为stream2-1, 因为它是stream2的第一部分。这个时候,投射图是这样的:

-10------------------> stream1时间线
---11-------------------> stream2-1时间线,比stream1差一个时间点。
// (10), 11

也就是说,10 转换出来的stream2-1也立马开始投射它自己的元素了,先紧跟 stream1投射出来的10, 投射出11,毕竟stream2-1是10转换出来的。10这时转换完成了,紧接着20开始转换,转换出的stream我们 称为stream2-2。在这个时间点上(20转换的点上),stream2-1已经开始投射12,然后stream2-2转换出来了 并投射21,所以有下面的图:

-10---20---------------> stream1时间线
---11-12-----------------> stream2-1时间线
--------21---------------> stream2-2时间线

// (10), 11, (20), 12, 21

这时stream1剩余最后一个元素,30,紧跟着20的投射转换后,也开始转换,这个点上,stream2-1开始投射 13,并且stream2-2开始投射22,这两个投射完后,转换完成的stream2-3开始投射元素31,所以有下面的图:

-10---20---30------------> stream1时间线
---11-12---13------------> stream2-1时间线
--------21-22------------> stream2-2时间线
-------------31-----------> stream2-3时间线
// (10), 11, (20), 12, 21, (30), 13, 22, 31

到这里应该就很容易理解了。flatMap最后flat的是stream2中投射出来的,stream2是由stream2-1, stream2-2,stream2-3投射出来的元素组成。 flatmaplatest

扩展理解 flatMapFirst 和 flatMapLatest

flatMapFirst instead propagates the first Observable exclusively until it completes before it begins subscribes to the next Observable.

从这里也可以很好的理解flatMapLatest了。假如stream1中的元素都是网络请求,10代表网络请求发出 去了,而由10转换出来的stream2-1代表着整个和这个网络请求相关操作,也就是可以认为stream2-1代表着收到网络 请求结果后,对数据处理,更新界面等等。然后这时,20开始投射并进行转换为stream2-2,这可能代表着 用户再次发送了一个网络请求,这个时候,从上面的图中可以看出,20转换出来的stream2-2开始投射的时候, stream2-1已经投射到了12了,这个12可以代表着stream2-1发出的网络请求已经返回结果,要对结果进行 处理。而stream2-2这时候的开始,明显是一个重复的网络请求操作,是应该被取消的,毕竟第一个网络请求 还没被处理完成呢。这个时候使用flatMapFirst,可以取消后续(stream2-2)的操作,除非第一个操作(stream2-1)已经完成了。 然后理解 flatMapLatest 也就不难了,就是要取消stream2-1(未完成),使用stream2-2(最新的)。这时如果用户 不小心又触发一个网络请求(stream2-3),那么flatMapLatest会取消steram2-2,使用stream2-3。 当然,在这样的场景下,我们是应该使用flatMapFirst的!

<!DOCTYPE html>
<html>
<head>
    <title></title>
    <script type="text/javascript" src="http://apps.bdimg.com/libs/jquery/2.1.4/jquery.min.js"></script>

    <script type="text/javascript" src="./rxjs/dist/rx.lite.js"></script>
</head>
<body>
<script>
// flatMap example
var stream1 = Rx.Observable.from([10, 20, 30]);
var stream2 = function(n) {
  return Rx.Observable.from([n+1, n+2, n+3]);
};

stream1.flatMapLatest(function(n) {
  console.log("(" + n + ")");
  return stream2(n);
}).subscribe(function(n) {
  console.log(n);
}, function () {
  console.log('err');
}, function () {
  console.log('finished');
});
</script>
</body>
</html>

flatmaplatest1

关于FlatMap的另外的一个讲解,转自:http://blog.csdn.net/tianjun2012/article/details/51253877

如果你有一个Observable它的结果是许多嵌套的Observable将则怎么做?大部分的时间,你想到的就是统一这些嵌套的Observable的元素到一个单个的序列中。这也是flatMap所要明确干的事。 (看下面的那个图就能明白这个句话的意思:先看这句,“如果你有一个Observable”,在图上就是A这个Observable这个东西,它就是一个Observable,接着看“它的结果是许多嵌套的Observable这句的意思是,Observable A这个可观察对象本身是由三个A1,A2,A3可观察对象组成的。”所以这个时候,A本身可以被订阅,这个时候,订阅A的人,可以获得A1,A2,A3这三个可观察对象。而A1,A2,A3,这三个东西本身也是可观察对象,所以他们也能被订阅) flatMap操作符需要一个Observable参数,这个参数的元素也是Observable,并且返回只有一个孩子Observable的平坦值的Observable。 (就是说flatMap把上面的三个Observable A1,A2,A3发射出的值,合并成了一个Observable) flatmap

flatMap是一个强大的操作符,但是它比起我们目前学到的其他操作符都要难理解些,把它认为是这些Observable的concatAll()函数。

contcatAll是需要一个数组的数组函数,并且返回一个平坦的单一的数组,这个数组包含所有子数组的值,而不是这些子数组本身。我们可以使用reduce去建一个这样的函数:

function concatAll(source) {
      return source.reduce(function(a, b) {
                 return a.concat(b);
      });
}

我们可以这样使用它:

concatAll([[0, 1, 2], [3, 4, 5], [6, 7, 8]]);
// [0, 1, 2, 3, 4, 5, 6, 7, 8]

flatMap做同样的事,但不是平坦的arrays而是observable…… qweqwe 理解flatMap操作符,可以这么想,有一根大水管,向外出水,但是当我们切开这个打水管,看到切面,原来这根大水管里面由三个小水管组成,flatMap相当于一个转换头,将三根小水管的水合并成为了大水管中的水。

那么这个时候,这个问题也就能想明白了:https://stackoverflow.com/questions/36280565/angular2-http-get-location-header-of-201-response/36283142 为什么下面的这段代码是错误的:

 return this._http.post(this._heroesUrl, body, options)
            // Hero is created, but unable to get URL from the Location header!
            .map((res:Response) => this._http.get(res.headers.get('Location')).map((res:Response) => res.json()))
            .catch(this.handleError)

而下面的这段代码是正确的:

return this._http.post(this._heroesUrl, body, options)
        .flatMap((res:Response) => {
          var location = res.headers.get('Location');
          return this._http.get(location);
        }).map((res:Response) => res.json()))
        .catch(this.handleError)

最里面用http.get.map()没有问题,返回的是Observable。但是外面不能用map而必须用flatMap。 因为get.map()返回的一系列都是Observable,而map不具有flat的能力,只有flatMap能将一系列的Observable合并成为一个Observable从而达到扁平的效果。

flatMap适用于Observable里面还是Observable的情况

 return this._http.get(location);
        }).map((res:Response) => res.json())

关于flatMap操作符,可以参考这篇文章:https://segmentfault.com/a/1190000010088631#articleHeader5

使用 RxJS 处理多个 Http 请求

有时候进入某个页面时,我们需要从多个 API 地址获取数据然后进行显示。管理多个异步数据请求会比较困难,但我们可以借助 Angular Http 服务和 RxJS 库提供的功能来实现上述的功能。处理多个请求有多种方式,使用串行或并行的方式。

基础知识

mergeMap mergeMap 操作符用于从内部的 Observable 对象中获取值,然后返回给父级流对象。

合并 Observable 对象 ( jsBin)

const source = Rx.Observable.of('Hello');
//map to inner observable and flatten
const example = source.mergeMap(val => Rx.Observable.of(`${val} World!`));

const subscribe = example.subscribe(val => console.log(val)); //output: 'Hello World!'

在上面示例中包含两种 Observable 类型:

  1. 源 Observable 对象 - 即 source 对象
  2. 内部 Observable 对象 - 即 Rx.Observable.of(${val} World!) 对象

仅当内部的 Observable 对象发出值后,才会合并源 Observable 对象输出的值,并最终输出合并的值。

Map 和 Subscribe

有些时候,当我们发送下一个请求时,需要依赖于上一个请求的数据。即我们在需要在上一个请求的回调函数中获取相应数据,然后在发起另一个 HTTP 请求。

import { Component, OnInit } from '@angular/core';
import { Http } from '@angular/http';
import 'rxjs/add/operator/map';

@Component({
  selector: 'app-root',
  template: `
    <p>{{username}} Detail Info</p>
    {{user | json}}
  `
})
export class AppComponent implements OnInit {
  constructor(private http: Http) { }

  apiUrl = 'https://jsonplaceholder.typicode.com/users';
  username: string = '';
  user: any;

  ngOnInit() {
    this.http.get(this.apiUrl)
      .map(res => res.json())
      .subscribe(users => {
        let username = users[6].username;
        this.http.get(`${this.apiUrl}?username=${username}`)
          .map(res => res.json())
          .subscribe(
            user => {
              this.username = username;
              this.user = user;
            });
      });
  }
}

在上面示例中,我们先从 https://jsonplaceholder.typicode.com/users 地址获取所有用户的信息,然后再根据指定用户的 username 进一步获取用户的详细信息。虽然功能实现了,但有没有更好的解决方案呢?答案是有的,可以通过 RxJS 库中提供的 mergeMap 操作符来优化上述的流程。 使用mergeMap

import { Component, OnInit } from '@angular/core';
import { Http } from '@angular/http';
import 'rxjs/add/operator/map';
import 'rxjs/add/operator/mergeMap';

@Component({
  selector: 'app-root',
  template: `
    <p>{{username}} Detail Info</p>
    {{user | json}}
  `
})
export class AppComponent implements OnInit {
  constructor(private http: Http) { }

  apiUrl = 'https://jsonplaceholder.typicode.com/users';

  username: string = '';

  user: any;

  ngOnInit() {
    this.http.get(this.apiUrl)
      .map(res => res.json())
      .mergeMap(users => {
        this.username = users[6].username;
        return this.http.get(`${this.apiUrl}?username=${this.username}`)
          .map(res => res.json())
      })
      .subscribe(user => this.user = user);
  }
}

在上面示例中,我们通过 mergeMap 操作符,解决了嵌套订阅的问题。最后我们来看一下如何处理多个并行的 Http 请求。

Christian-health commented 6 years ago

switchMap (flatMapLatest)操作符

注意两段代码:一段使用switchMap 另外一段使用flatMap

使用switchMap如下:

<!DOCTYPE html>
<html>
<head>
    <title></title>
    <script type="text/javascript" src="http://apps.bdimg.com/libs/jquery/2.1.4/jquery.min.js"></script>

    <script type="text/javascript" src="./rxjs/dist/rx.lite.js"></script>
</head>
<body>
<script>
// flatMap example
var stream1 = Rx.Observable.from([10, 20, 30]);
var stream2 = function(n) {
  return Rx.Observable.from([n+1, n+2, n+3]);
};

stream1.switchMap(function(n) {
  console.log("(" + n + ")");
  return stream2(n);
}).subscribe(function(n) {
  console.log(n);
}, function () {
  console.log('err');
}, function () {
  console.log('finished');
});
</script>
</body>
</html>

使用flatMapLatest如下:

<!DOCTYPE html>
<html>
<head>
    <title></title>
    <script type="text/javascript" src="http://apps.bdimg.com/libs/jquery/2.1.4/jquery.min.js"></script>

    <script type="text/javascript" src="./rxjs/dist/rx.lite.js"></script>
</head>
<body>
<script>
// flatMap example
var stream1 = Rx.Observable.from([10, 20, 30]);
var stream2 = function(n) {
  return Rx.Observable.from([n+1, n+2, n+3]);
};

stream1.flatMapLatest(function(n) {
  console.log("(" + n + ")");
  return stream2(n);
}).subscribe(function(n) {
  console.log(n);
}, function () {
  console.log('err');
}, function () {
  console.log('finished');
});
</script>
</body>
</html>

这两段代码的运行结果都是如下: switchmap

后来发现 flatMapLatest 被重命名为 switchMap

Github上一个讨论的帖子地址为:https://github.com/ReactiveX/rxjs/issues/1855

RxJs官方给出的名字变更地址和删除地址为:https://github.com/ReactiveX/RxJS/blob/master/MIGRATION.md#operators-renamed-or-removed

switchmap

Christian-health commented 6 years ago

From 操作符

convert various other objects and data types into Observables 将各种其他对象和数据类型转换为Observables

When you work with [Observables], it can be more convenient if all of the data you mean to work with can be represented as [Observables], rather than as a mixture of [Observables] and other types. This allows you to use a single set of operators to govern the entire lifespan of the data stream. 当您使用[Observables]时,如果您要使用的所有数据都可以表示为[Observables],而不是[Observables]和其他类型的混合,则可以更方便。 这允许您使用一组运算符来管理数据流的整个生命周期。

Iterables, for example, can be thought of as a sort of synchronous Observable; Futures, as a sort of Observable that always emits only a single item. By explicitly converting such objects to Observables, you allow them to interact as peers with other Observables. 例如,Iterables可以被认为是一种同步的Observable; Futures,作为一种可观察,总是只发出一个项目。 通过明确地将这些对象转换为Observables,您可以让它们与其他Observables进行交互。

For this reason, most ReactiveX implementations have methods that allow you to convert certain language-specific objects and data structures into Observables. 因此,大多数ReactiveX实现具有允许您将某些特定语言的对象和数据结构转换为Observables的方法。 from In RxJS, the from operator converts an array-like or iterable object into an Observable that emits the items in that array or iterable. A String, in this context, is treated as an array of characters.

在RxJS中,from操作符将数组或可迭代对象转换为Observable,该对象可发出该数组中的项或iterable(迭代器)。 在此上下文中,String被视为字符数组。

This operator also takes three additional, optional parameters: 该操作符还需要三个可选参数: 2:a transforming function that takes an item from the array or iterable as input and produces an item to be emitted by the resulting Observable as output 一个转换函数,它从数组中获取一个项,或者一个迭代器作为输入,并产生由Observable作为输出发出的项目 3:a second argument to pass into the transforming function as additional context information 第二个参数作为附加上下文信息传递给转换函数 4:a Scheduler on which this operator should operate 这个操作符应该操作的一个计划程序(Scheduler)

from(参数1,参数2,参数3,参数4)
            ||
            ||
            \/
from(参数1,转换函数,转换函数的另外一个参数,Scheduler)

代码举例:

这里的arguments是JavaScript arguments对象,可以看这里进行了解:http://www.cnblogs.com/lwbqqyumidi/archive/2012/12/03/2799833.html

 Array-like object (arguments) to Observable
类似Array的对象(arguments)到Observable
function f() {
// 这里的arguments是JavaScript arguments对象,可以看这里进行了解:

http://www.cnblogs.com/lwbqqyumidi/archive/2012/12/03/2799833.html

  return Rx.Observable.from(arguments);
}
f(1, 2, 3).subscribe(
  function (x) { console.log('Next: ' + x); },
  function (err) { console.log('Error: ' + err); },
  function () { console.log('Completed'); });
// Any iterable object...
// Set
var s = new Set(['foo', window]);
Rx.Observable.from(s).subscribe(
  function (x) { console.log('Next: ' + x); },
  function (err) { console.log('Error: ' + err); },
  function () { console.log('Completed'); });
Next: foo
Next: window
Completed
// Map
var m = new Map([[1, 2], [2, 4], [4, 8]]);
Rx.Observable.from(m).subscribe(
  function (x) { console.log('Next: ' + x); },
  function (err) { console.log('Error: ' + err); },
  function () { console.log('Completed'); });
Next: [1, 2]
Next: [2, 4]
Next: [4, 8]
Completed
// String
Rx.Observable.from("foo").subscribe(
  function (x) { console.log('Next: ' + x); },
  function (err) { console.log('Error: ' + err); },
  function () { console.log('Completed'); });
Next: f
Next: o
Next: o
Completed

使用from函数第二个参数的例子:from([1, 2, 3], function (x) { return x + x; })

// Using an arrow function as the map function to manipulate the elements
Rx.Observable.from([1, 2, 3], function (x) { return x + x; }).subscribe(
  function (x) { console.log('Next: ' + x); },
  function (err) { console.log('Error: ' + err); },
  function () { console.log('Completed'); });
Next: 2
Next: 4
Next: 6
Completed

Rx.Observable.from({length: 5}, function(v, k) { return k; })的工作原理:

array 原理就是,javascript使用鸭子类型,当你想调用一个Object中的foo方法的时候,如果这个Object是bar类型。也就是说typeof Object = bar 。它并不检查这个Object到底是否真是一个bar类型,javascript只检测这个Object中是否有这个foo方法。

To sum up: 总结一下:

let fakeArray = {length:5};
fakeArray.length //5
let realArray = [1,2,3,4,5];
realArray.length //5

第一个就像“假的”JavaScript数组(具有属性长度)。 当Array.from检查属性长度时,它返回5,因此它创建具有长度为5的真实数组。您可以想数组一样的调用它,并认为它是一个数组。然后会自动的使用Array.from()函数创建一个数组,Array.from()函数的信息: https://developer.mozilla.org/zh-CN/docs/Web/JavaScript/Reference/Global_Objects/Array/from 第二部分只是lamba,它使用索引值(第二个参数)填充数组。也就是调用Array.from这个函数来填充数组。 这种技术对于mock某些对象(仿真某些对象,假对象)进行测试非常有用。 例如:

let ourFileReader = {}
ourFileReader.result = "someResult"
//ourFileReader will mock real FileReader
我们的FileReader会模拟真正的FileReader

var arr1 = Array.from({ length: 5 // Create 5 indexes with undefined values }, function(v, k) { // Run a map function on said indexes using v(alue)[undefined] and k(ey)[0 to 4] return k; // Return k(ey) as value for this index } ); console.log(arr1);

https://stackoverflow.com/questions/40528557/how-does-array-fromlength-5-v-k-k-work

// Generate a sequence of numbers
Rx.Observable.from({length: 5}, function(v, k) { return k; }).subscribe(
  function (x) { console.log('Next: ' + x); },
  function (err) { console.log('Error: ' + err); },
  function () { console.log('Completed'); });
Next: 0
Next: 1
Next: 2
Next: 3
Next: 4
Completed
Christian-health commented 6 years ago

Create 操作符

create an Observable from scratch by means of a function. 通过一个函数从头创建一个Observable。

create

You can create an Observable from scratch by using the Create operator. You pass this operator a function that accepts the observer as its parameter. Write this function so that it behaves as an Observable — by calling the observer’s onNext, onError, and onCompleted methods appropriately.

您可以使用Create操作符从头创建一个Observable。 您通过此操作符接受观察者作为其参数的函数。 编写这个函数,使其表现为一个Observable - 通过适当地调用观察者的onNext,onError和onCompleted方法。 (也就是说这个create函数的参数应该像一个observer,这个observer里面应该有onNext,onError和onCompleted方法)

A well-formed finite Observable must attempt to call either the observer’s onCompleted method exactly once or its onError method exactly once, and must not thereafter attempt to call any of the observer’s other methods. 一个格式良好的Observable必须尝试一次调用观察者的onCompleted方法或者其onError方法一次,而且此后不能尝试调用任何观察者的其他方法。

/* Using a function  使用一个函数*/
var source = Rx.Observable.create(function (observer) {
    observer.onNext(42);
    observer.onCompleted();

    // Note that this is optional, you do not have to return this if you require no cleanup
    //请注意,这是可选的,如果不需要清理,则不需要返回
    return function () { console.log('disposed'); };
});

var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x); },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); });

运行结果

Next: 42
Completed
/* Using a disposable 使用一次性 */
var source = Rx.Observable.create(function (observer) {
    observer.onNext(42);
    observer.onCompleted();

    // Note that this is optional, you do not have to return this if you require no cleanup
   //请注意,这是可选的,如果不需要清理,则不需要返回
    return Rx.Disposable.create(function () {
        console.log('disposed');
    });
});

var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x); },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); });

Next: 42
Completed

Rx.Observable.create

var observable = Rx.Observable
    .create(function(observer) {
        observer.next('Semlinker'); // RxJS 4.x 以前的版本用 onNext
        observer.next('Lolo');
    });

// 订阅这个 Observable    
observable.subscribe(function(value) {
    console.log(value);
});

以上代码运行后,控制台会依次输出 'Semlinker' 和 'Lolo' 两个字符串。

需要注意的是,很多人认为 RxJS 中的所有操作都是异步的,但其实这个观念是错的。RxJS 的核心特性是它的异步处理能力,但它也是可以用来处理同步的行为。具体示例如下:

var observable = Rx.Observable
    .create(function(observer) {
        observer.next('Semlinker'); // RxJS 4.x 以前的版本用 onNext
        observer.next('Lolo');
    });

console.log('start');
observable.subscribe(function(value) {
    console.log(value);
});
console.log('end');

以上代码运行后,控制台的输出结果:

start
Semlinker
Lolo
end

当然我们也可以用它处理异步行为:

var observable = Rx.Observable
    .create(function(observer) {
        observer.next('Semlinker'); // RxJS 4.x 以前的版本用 onNext
        observer.next('Lolo');

        setTimeout(() => {
            observer.next('RxJS Observable');
        }, 300);
    })

console.log('start');
observable.subscribe(function(value) {
    console.log(value);
});
console.log('end');

以上代码运行后,控制台的输出结果:

start
Semlinker
Lolo
end
RxJS Observable

从以上例子中,我们可以得出一个结论 - Observable 可以应用于同步和异步的场合。

Christian-health commented 6 years ago

Distinct (distinctUntilChanged)

suppress duplicate items emitted by an Observable 抑制Observable发出的重复项目

dis1

The Distinct operator filters an Observable by only allowing items through that have not already been emitted.

Distinct操作符可以过滤一个Observable,只允许没有已经被发出的item通过。

In some implementations there are variants that allow you to adjust the criteria by which two items are considered “distinct.” 在一些RX的实现中,有一些变体,允许你调整标准,通过考虑两个item的区别。 In some, there is a variant of the operator that only compares an item against its immediate predecessor for distinctness, thereby filtering only consecutive duplicate items from the sequence. 在某些情况下,有一个操作符的变体,它只比较一个项目与其直接上一个元素的不同,从而仅从序列中过滤连续的重复项目。 dis2

In RxJS, the distinct operator has two optional parameters: 在RxJS中,distinct运算符有两个可选参数: a function that accepts an item emitted by the source Observable and returns a key which will be used instead of the item itself when comparing two items for distinctness 一个函数接受源Observable发出的item,并返回一个将用于替代item本身的key,当比较两个item的区别的时候。 a function that accepts two items (or two keys) and compares them for distinctness, returning false if they are distinct (an equality function is the default if you do not supply your own function here) 一个函数可以接受两个item(或两个key),并将它们区分开来,如果它们不同,则返回false(如果在这里不提供自己的函数,则默认为相等函数)

/* Without key selector */
var source = Rx.Observable.fromArray([
        42, 24, 42, 24
    ])
    .distinct();

var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x.toString()); },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); });
Next: 42
Next: 24
Completed
/* With key selector */
var source = Rx.Observable.fromArray([
        {value: 42}, {value: 24}, {value: 42}, {value: 24}
    ])
    .distinct(function (x) { return x.value; });

var subscription = source.subscribe(
    function (x) {
        console.log('Next: ' + x.toString());
    },
    function (err) {
        console.log('Error: ' + err);
    },
    function () {
        console.log('Completed');
    });
Next: { value: 42 }
Next: { value: 24 }
Completed

dis3

RxJS also has a distinctUntilChanged operator. It only compares emitted items from the source Observable against their immediate predecessors in order to determine whether or not they are distinct. It takes the same two optional parameters as the distinct operator. RxJS有一个distinctUntilChanged操作符, 它只比较源Observable与其直接前辈的发射项目,以确定它们是否不同。 (只要与前一个不同就会被保存下来,而不管之前是否发出过)它采用与distinct运算符相同的两个可选参数。

/* Without key selector */
var source = Rx.Observable.fromArray([
        24, 42, 24, 24
    ])
    .distinctUntilChanged();

var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x); },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); });
Next: 24
Next: 42
Next: 24
Completed
/* With key selector */
var source = Rx.Observable.fromArray([
        {value: 24}, {value: 42}, {value: 42}, {value: 24}
    ])
    .distinctUntilChanged(function (x) { return x.value; });

var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x.toString()); },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); });
Next: { value: 24 }
Next: { value: 42 }
Next: { value: 24 }
Completed
Christian-health commented 6 years ago

forkJoin (zip)

在http://reactivex.io/documentation/operators.html页面点击forkJoin之后会自动跳转到zip页面。

forkJoin在rxjs中的实现是zip,页面上有一句话:RxJS implements this operator as zip and zipArray.

combine the emissions of multiple Observables together via a specified function and emit single items for each combination based on the results of this function 通过指定的函数将多个Observables的emit合并在一起,并根据此函数的结果为每个组合emit单个items

The Zip method returns an Observable that applies a function of your choosing to the combination of items emitted, in sequence, by two (or more) other Observables, with the results of this function becoming the items emitted by the returned Observable

Zip方法返回一个Observable,它将您选择的函数应用于由两个(或多个)其他Observables序列发射的项目的组合,该函数的结果将成为返回的Observable发出的项目。

It applies this function in strict sequence, so the first item emitted by the new Observable will be the result of the function applied to the first item emitted by Observable #1 and the first item emitted by Observable #2; the second item emitted by the new zip-Observable will be the result of the function applied to the second item emitted by Observable #1 and the second item emitted by Observable #2; and so forth. It will only emit as many items as the number of items emitted by the source Observable that emits the fewest items. 它以严格的顺序应用此功能,因此新的Observable发出的第一个项目将是应用于由Observable#1发出的第一个项目和Observable#2发出的第一个项目的函数的结果; 新的zip-Observable发出的第二个项目将是应用于Observable#1发出的第二个项目的函数和Observable#2发出的第二个项目的结果; 等等。 它只会发出与发出最少项目的源Observable发出的项目数量相同的项目。

image zip accepts a variable number of Observables or Promises as parameters, followed by a function that accepts one item emitted by each of those Observables or resolved by those Promises as input and produces a single item to be emitted by the resulting Observable. zip接受可变数量的Observables或Promises作为参数,后面接受一个函数,该函数接受这些Observables中的每一个发出的一个项目,或者由Promises作为输入解析,并生成由所得到的Observable发出的单个项目。

/* Using arguments */
var range = Rx.Observable.range(0, 5);

var source = Observable.zip(
    range,                                //0,1,2,3,4
    range.skip(1),                    //1,2,3,4
    range.skip(2),                    //2,3,4
    function (s1, s2, s3) {
        return s1 + ':' + s2 + ':' + s3;
    }
);

var subscription = source.subscribe(
    function (x) {
        console.log('Next: ' + x);
    },
    function (err) {
        console.log('Error: ' + err);
    },
    function () {
        console.log('Completed');
    });
Next: 0:1:2
Next: 1:2:3
Next: 2:3:4
Completed
/* Using promises and Observables */
var range = Rx.Observable.range(0, 5);

var source = Observable.zip(
    RSVP.Promise.resolve(0), //参数为Observable和Promise的情况
    RSVP.Promise.resolve(1),
    Rx.Observable.return(2)
    function (s1, s2, s3) {
        return s1 + ':' + s2 + ':' + s3;
    }
);

var subscription = source.subscribe(
    function (x) {
        console.log('Next: ' + x);
    },
    function (err) {
        console.log('Error: ' + err);
    },
    function () {
        console.log('Completed');
    });
Next: 0:1:2
Completed

image

zipArray accepts a variable number of Observables as parameters and returns an Observable that emits arrays, each one containing the nth item from each source Observable. zipArray接受可变数量的Observables作为参数,并返回一个发出数组的Observable,每个数组包含来自每个源Observable的第n个项目。

zipArray在rxjs中没有实现,在这个链接的github软件库中没有这个函数:https://github.com/Reactive-Extensions/RxJS,报出如下错误:index.html:14 Uncaught TypeError: Rx.Observable.zipArray is not a function。比如如下程序:

<!DOCTYPE html>
<html>
<head>
    <title></title>
    <script type="text/javascript" src="http://apps.bdimg.com/libs/jquery/2.1.4/jquery.min.js"></script>

    <script type="text/javascript" src="./RxJS-master/dist/rx.lite.js"></script>
</head>
<body>
<script>
/* Using arguments */
var range = Rx.Observable.range(0, 5);

var source = Rx.Observable.zipArray(
    range,
    range.skip(1), 
    range.skip(2)
);

var subscription = source.subscribe(
    function (x) {
        console.log('Next: ' + x);
    },
    function (err) {
        console.log('Error: ' + err);   
    },
    function () {
        console.log('Completed');   
    });
</script>
</body>
</html>
Next: [0,1,2]
Next: [1,2,3]
Next: [2,3,4]
Completed

image

RxJS also implements a similar operator, forkJoin. There are two varieties of this operator. The first collects the last element emitted by each of the source Observables into an array and emits this array as its own sole emitted item. You can either pass a list of Observables to forkJoin as individual parameters or as an array of Observables. RxJS还实现了一个类似的操作符,forkJoin。 这个操作员有两个品种。 第一个将每个源Observables发出的最后一个元素收集到数组中,并将该数组作为其唯一发出的项发出。 您可以将一个Observables列表作为单独的参数传递给forkJoin,也可以作为一个Observables数组。

var source = Rx.Observable.forkJoin(
    Rx.Observable.return(42),
    Rx.Observable.range(0, 10),
    Rx.Observable.fromArray([1,2,3]),
    RSVP.Promise.resolve(56)
);

var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x); },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); });
取最后一个元素:
Next: [42, 9, 3, 56]
Completed

image

A second variant of forkJoin exists as a prototype function, and you call it on an instance of one source Observable, passing it another source Observable as a parameter. As a second parameter, you pass it a function that combines the final item emitted by the two source Observables into the sole item to be emitted by the resulting Observable. forkJoin的第二个变体作为原型函数存在,并且在一个源Observable的实例上调用它,将另一个源Observable作为参数传递。 作为第二个参数,您传递一个将由两个源观察器发出的最终项目合并到由所得到的Observable发出的唯一项目的函数。

var source1 = Rx.Observable.return(42);
var source2 = Rx.Observable.range(0, 3);

var source = source1.forkJoin(source2, function (s1, s2) {
    return s1 + s2;
});

var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x); },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); });
Next: 44
Completed

关于forkJoin的使用方法,参考:https://segmentfault.com/a/1190000010088631#articleHeader5

接下来的示例,我们将使用 forkJoin 操作符。如果你熟悉 Promises 的话,该操作符与 Promise.all() 实现的功能类似。forkJoin 操作符接收一个 Observable 对象列表,然后并行地执行它们。一旦列表的 Observable 对象都发出值后,forkJoin 操作符返回的 Observable 对象会发出新的值,即包含所有 Observable 对象输出值的列表。具体示例如下:

import { Component, OnInit } from '@angular/core';
import { Http } from '@angular/http';

import { Observable } from 'rxjs/Observable';
import 'rxjs/add/operator/map';
import 'rxjs/add/observable/forkJoin';

@Component({
  selector: 'app-root',
  template: `
    <p>Post Detail Info</p>
    <ul>
      <li>{{post1 | json}}</li>
      <li>{{post2 | json}}</li>
    </ul>
  `
})
export class AppComponent implements OnInit {
  constructor(private http: Http) { }

  apiUrl = 'https://jsonplaceholder.typicode.com/posts';

  post1: any;

  post2: any;

  ngOnInit() {
    let post1 = this.http.get(`${this.apiUrl}/1`);
    let post2 = this.http.get(`${this.apiUrl}/2`);

    Observable.forkJoin([post1, post2])
      .subscribe(results => {
        this.post1 = results[0];
        this.post2 = results[1];
      });
  }
}
Christian-health commented 6 years ago

Skip

suppress the first n items emitted by an Observable 抑制Observable发出的前n个项目

image

You can ignore the first n items emitted by an Observable and attend only to those items that come after, by modifying the Observable with the Skip operator.

您可以忽略Observable发出的前n个项目,仅通过使用“跳过”运算符修改“可观察”来仅参与那些后续项目。

image

var source = Rx.Observable.range(0, 5)
    .skip(3);

var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x); },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); });

Next: 3
Next: 4
Completed

image

skipUntilWithTime

RxJS also implements a skipUntilWithTime operator that does not skip a particular quantity of items from the source Observable, but skips items based on chronology. You set this skip period by passing in a parameter to skipUntilWithTime, in either of these formats:

RxJS还实现了一个skipUntilWithTime运算符,它不会从源Observable中跳过特定数量的项目,但是会根据chronology(年表)跳过项目。 通过将参数传递给skipUntilWithTime来设置此跳过周期,以下列格式之一:

a number 一个数字 skips items from the source Observable until this many milliseconds have passed since the Observable was subscribed to 当这个Observable被订阅之后,经过指定的毫秒数,才开始输出数据,也就是过滤掉指定毫秒之内的数据,然后开始发送

a Date 一个日期 skips items from the source Observable until this absolute time 定制一个指定的日期,一个绝对时间,这个时间之内Observable发出的数据都被过滤掉了。

You may also, optionally, pass in a Scheduler as a second parameter, and the timer will operate on that Scheduler (skipUntilWithTime uses the timeout Scheduler by default). 你还可以传递一个Scheduler 作为第二个可选的参数,并且定时器将在该调度程序上运行(skipUntilWithTime默认使用超时计划程序)。

var source = Rx.Observable.timer(0, 1000)//0,1,2,3,4,5,6,7,8,9
    .skipUntilWithTime(5000);//从5秒中之后发出的数值。所以我们看到的第一个值是5,因为0-4被湮掉了

var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x); },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); });
Next: 6
Next: 7
Next: 8
Completed
Christian-health commented 6 years ago

Timer

create an Observable that emits a particular item after a given delay 创建一个Observable,在给定的延迟后发出特定的项目

image

In RxJS there are two versions of the timer operator. 在RxJS中有两个版本的定时器运算符 image

定时器的第一个版本返回一个Observable,它在您指定的延迟时间后发出单个项目。 您可以将延迟指定为Date对象(这意味着延迟到绝对时刻)或作为整数(这意味着延迟许多毫秒)。

There is also a version of timer that returns an Observable that emits a single item after a specified delay, and then emits items periodically thereafter on a specified periodicity. In this way it behaves a bit more like the Interval operator. 还有一个定时器版本,返回一个Observable,在指定的延迟后发出一个项目,然后以指定的周期性周期性地周期性地发出项目。 这样它的行为更像Interval操作符。

var source = Rx.Observable.timer(200, 100) 
//第一个参数表示200毫秒之后,每隔100毫秒开始发出一个数值
    .timeInterval()
    .pluck('interval')
    .take(3);

var subscription = source.subscribe(
    function (x) {
        console.log('Next: ' + x);
    },
    function (err) {
        console.log('Error: ' + err);
    },
    function () {
        console.log('Completed');
    });

注意下面的这段代码生成的结构:

<!DOCTYPE html>
<html>
<head>
    <title></title>
    <script type="text/javascript" src="http://apps.bdimg.com/libs/jquery/2.1.4/jquery.min.js"></script>

    <script type="text/javascript" src="./RxJS-master/dist/rx.all.js"></script>
    <script type="text/javascript" src="./a/lib/rsvp.js"></script>
</head>
<body>
<script>
var source = Rx.Observable.timer(200, 100) 
//第一个参数表示200毫秒之后,每隔100毫秒开始发出一个数值
    .timeInterval()     
    .take(3);

var subscription = source.subscribe(
    function (x) {
        console.log('Next: ' + x.value,'+',x.interval);
    },
    function (err) {
        console.log('Error: ' + err);
    },
    function () {
        console.log('Completed');
    });
</script>
</body>
</html>

Next: 0 + 201
Next: 1 + 100
Next: 2 + 100
Completed
Christian-health commented 6 years ago

TimeInterval

convert an Observable that emits items into one that emits indications of the amount of time elapsed between those emissions

image

The TimeInterval operator intercepts the items from the source Observable and emits in their place objects that indicate the amount of time that elapsed between pairs of emissions.

image

The timeInterval operator converts a source Observable into an Observable that emits indications of the amount of time lapsed between consecutive emissions of the source Observable. The first emission from this new Observable indicates the amount of time lapsed between the time when the observer subscribed to the Observable and the time when the source Observable emitted its first item. There is no corresponding emission marking the amount of time lapsed between the last emission of the source Observable and the subsequent call to onCompleted.

timeInterval运算符将源Observable转换为Observable,它发出源Observable的连续排放之间的时间量的指示。 这个新的Observable的第一个排放表示观察者订阅Observable时间和源Observable发出第一个项目的时间之间的时间。 没有相应的排放标记在最后发射源Observable之间的时间量和随后的onCompleted调用之间的时间量。 【这段话的意思就是说:这个timeInterval操作符号的功能是,把原的Observable转换成为一个新的Observable,原Observable和新的Observable之间的区别是,新的Observable中有原来的Observable值,同时有Observable发出的两个item之间的时间间隔,第一个{值:时间间隔}表示订阅了这个Observable之后,到源Observable发出第一个值时,他们之间的时间间隔,而最后一个发出的元素和之后调用的onCompleted之间是没有时间间隔的】

var source = Rx.Observable.timer(0, 1000)//从时间0开始,每隔1000ms产生一个值
    .timeInterval()//把值和它的前一个值之间的时间间隔放到值里
    .map(function (x) { return x.value + ':' + x.interval; })
    .take(5);

var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x); },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); });

Next: 0:0
Next: 1:1000// 注意冒号前面是值,冒号后面是时间间隔
Next: 2:1000
Next: 3:1000
Next: 4:1000
Completed
Christian-health commented 6 years ago

Take

emit only the first n items emitted by an Observable 仅发射由Observable发出的前n个项目 image 您只能发出一个Observable发出的前n个项目,然后完成,而忽略其余部分,通过使用Take操作符修改Observable。

RxJS implements the take operator. RxJS实现take操作符。

var source = Rx.Observable.range(0, 5)//0,1,2,3,4
    .take(3);

var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x); },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); });
Next: 0
Next: 1
Next: 2
Completed

For the special case of take(0) you can also pass as a second parameter a Scheduler that take will use to immediately schedule a call to onCompleted. 对于take(0)的特殊情况,您还可以作为第二个参数传递一个调度程序,该调度程序将用于立即调度对onCompleted的调度。

Christian-health commented 6 years ago

pluck

image

There is also an operator called pluck which is a simpler version of this operator. It transforms the elements emitted by the source Observable by extracting a single named property from those elements and emitting that property in their place.

还有一个叫做pluck的操作符,这个操作符是一个更简单的版本。 它通过从这些元素中提取单个命名属性并在其中放置该属性来转换源Observable发出的元素。

【这个根underscore中的plunk操作符一样,就是取一个结构体中的某个key的值】

var source = Rx.Observable
    .fromArray([
        { value: 0 },
        { value: 1 },
        { value: 2 }
    ])
    .pluck('value');//提取上面的每个Object中的value属性的值

var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x); },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); });

Next: 0
Next: 1
Next: 2
Completed
Christian-health commented 6 years ago

Defer

do not create the Observable until the observer subscribes, and create a fresh Observable for each observer 在观察者订阅之前不要创建Observable,并为每个观察者创建一个新的Observable image The Defer operator waits until an observer subscribes to it, and then it generates an Observable, typically with an Observable factory function. It does this afresh for each subscriber, so although each subscriber may think it is subscribing to the same Observable, in fact each subscriber gets its own individual sequence. Defer操作符等待观察者订阅它,然后生成Observable,通常具有Observable工厂功能。 它为每个用户重新进行,所以虽然每个用户可能认为它正在订阅相同的Observable,实际上每个用户都获得自己的个人序列。 In some circumstances, waiting until the last minute (that is, until subscription time) to generate the Observable can ensure that this Observable contains the freshest data. 在某些情况下,等待最后一分钟(即直到订阅时间)生成Observable可以确保该Observable包含最新数据。 image

RxJS implements this operator as defer. This operator takes as its sole parameter an Observable factory function of your choosing. This function takes no parameters and returns an Observable or a Promise. RxJs实现了一个叫做defer的操作符。这个操作符只有一个参数,这个参数是一个Observe的工厂函数,这个函数没有参数,并且返回一个Observable或者一个Promise

/* Using an observable sequence */
var source = Rx.Observable.defer(function () {
    return Rx.Observable.return(42); //返回一个Observable
});

var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x); },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); } );

Next: 42
Completed
var source = Rx.Observable.defer(function () {
    return RSVP.Promise.resolve(42);//返回一个Promise
});

var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x); },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); } );
Next: 42
Completed
Christian-health commented 6 years ago

if

RxJS also implements the if operator. It takes as parameters a function that returns a boolean, an Observable to mirror if that function returns a true value, and optionally a second Observable to mirror if that function returns a false value (if you omit this parameter, if will mirror an empty Observable in such a case). RxJS也提供一个叫做if的操作符号,这个操作符号有三个参数,第一个参数是一个函数,这个函数返回一个boolean值,第二个参数是一个Observable,如果第一个参数返回的是true,那么就返回这个Observable。 第三个参数是一个可选的参数,如果返回false那么就返回这个Observable,当然如果没有写第三个参数,那么就返回一个空的Observable。

var shouldRun = false;

var source = Rx.Observable.if(
    function () { return shouldRun; },
    Rx.Observable.return(42),  //var shouldRun = true;返回这个对象
    Rx.Observable.return(56)   //var shouldRun = false;返回这个对象
);

var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x); },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); });
Next: 56
Completed
Christian-health commented 6 years ago

case

image

RxJS implements a somewhat similar operator called case (or “switchCase”). This operator conditionally creates and returns one of a set of possible Observables. It takes the following parameters:

RxJS实现了一个类似的操作符,称为case(或“switchCase”)。 该运算符有条件地创建并返回一组可能的可观察值。 它需要以下参数:

  1. a function that returns the key that determines which Observable to emit 一个返回确定发出哪个Observable的键的函数(也就是说返回一个键,这个键决定是哪个Observable对象)
  2. an object that associates those keys with particular Observables 将这些键与特定Observables相关联的对象
  3. (optional) a Scheduler or an Observable: (可选)调度程序或可观察对象: Scheduler 调度 the Scheduler you want this operator to use 您希望此操作员使用的Scheduler 计划程序 Observable 可观察对象 the default Observable to emit if the key does not associate with any Observables 如果该键与任何Observables不相关,则使用这个默认的Observable
var sources = {//通过匹配不同的键“foo”和“bar”来决定到底返回这里面的两个Observable对象中的哪一个
    'foo': Rx.Observable.return(42),
    'bar': Rx.Observable.return(56)
};

var defaultSource = Rx.Observable.empty();//一个空的Observable对象

var source = Rx.Observable.case(
    function () {
        return 'foo';//这个函数返回一个“foo”那么在sources中匹配第一个,所以返回42
    },
    sources,
    defaultSource);

var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x); },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); } );

Next: 42
Completed
Christian-health commented 6 years ago

Empty Never Throw

Empty

create an Observable that emits no items but terminates normally 创建一个Observable,它不会发送任何东西但是正常终止 image

Never

create an Observable that emits no items and does not terminate 创建一个不发送任何项目的Observable,并且不会终止

image

Throw

create an Observable that emits no items and terminates with an error 创建一个Observable,它不会发出任何项目,并以错误的方式终止

image

The Empty, Never, and Throw operators generate Observables with very specific and limited behavior. These are useful for testing purposes, and sometimes also for combining with other Observables or as parameters to operators that expect other Observables as parameters.

var source = Rx.Observable.empty();

var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x); },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); });
Completed
// This will never produce a value, hence never calling any of the callbacks
var source = Rx.Observable.never();

var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x); },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); });
var source = Rx.Observable.return(42)
    .selectMany(Rx.Observable.throw(new Error('error!')));

var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x); },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); });
Error: Error: error!
Christian-health commented 6 years ago

selectMany

selectMany在http://reactivex.io/documentation/operators/flatmap.html 查找实际上就是flatMap

Christian-health commented 6 years ago

Interval

create an Observable that emits a sequence of integers spaced by a given time interval 创建一个Observable,发出一个以给定时间间隔间隔的整数序列 image The Interval operator returns an Observable that emits an infinite sequence of ascending integers, with a constant interval of time of your choosing between emissions. Interval操作符返回一个Observable,它发出一个无限序列的上升整数,您可以选择排放间隔的时间间隔。

RxJS implements this operator as interval. It accepts as its parameter the number of milliseconds to wait between emissions. RxJS将此运算符实现为Interval。 它接受作为其参数的排放之间等待的毫秒数。 interval operates by default on the timeout Scheduler, or you can optionally pass in a different Scheduler as a second parameter, and interval will operate on that Scheduler instead. interval 默认在timeout Scheduler上运行,或者您可以选择作为第二个参数传入不同的调度程序,interval 将在该Scheduler程序上运行。

var source = Rx.Observable
    .interval(500 /* ms */)
    .timeInterval()
    .take(3);

var subscription = source.subscribe(
    function (x) {
        console.log('Next: ' + x);
    },
    function (err) {
        console.log('Error: ' + err);
    },
    function () {
        console.log('Completed');
    });
Next: {value: 0, interval: 500}
Next: {value: 1, interval: 500}
Next: {value: 2, interval: 500}
Completed
Christian-health commented 6 years ago

Just

create an Observable that emits a particular item 创建一个发出特定项目的Observable

image

The Just operator converts an item into an Observable that emits that item. Just操作符将一个item转换为发出该item的Observable。

Just is similar to From, but note that From will dive into an array or an iterable or something of that sort to pull out items to emit, while Just will simply emit the array or iterable or what-have-you as it is, unchanged, as a single item. Just和from是有区别的,From中传递进来的是一个数组或者是一个可迭代的对象,或者是一个能够从里面拉出东西的东西,这样from会一个一个的把数据发射出来,但是Just则不是这样的,它只是简单的发射一个数组或者是一个迭代器你所传入的,没有任何改变。 (这段话的意思是Observable.from([1,2,3]会发出1,2,3。但是Observable.Just([1,2,3]))则会发出[1,2,3]这就是区别) Note that if you pass null to Just, it will return an Observable that emits null as an item. Do not make the mistake of assuming that this will return an empty Observable (one that emits no items at all). For that, you will need the Empty operator. 请注意,如果将null传递给Just,它将返回一个作为项目发出null的Observable。 不要假设这将返回一个空的Observable(根本不发出任何项)。 为此,您将需要Empty运算符。 ( just(null) ==> Observable会发出一个null。而不是产生一个Observable.empty)

RxJS implements this operator as return and as just (two names for the same operator with the same behavior). It accepts a single item as a parameter and returns an Observable that emits that single item as its sole emission. RxJS将此运算符实现为【return】和【Just】(同一运算符的两个名称具有相同的行为)。 它接受单个项目作为参数,并返回一个可发现单个项目作为唯一的排放的Observable。 return/just operates by default on the immediate Scheduler, but you can also pass in a Scheduler of your choosing as an optional second parameter, in which case it will operate on that Scheduler instead. 默认情况下,“return / just”将在 immediate Scheduler调度程序中运行,但您也可以将选择的计划程序作为可选的第二个参数传递,在这种情况下,它将在该调度程序上运行。

var source = Rx.Observable.just(42);

var subscription = source.subscribe(
  function (x) { console.log('Next: %s', x); },
  function (err) { console.log('Error: %s', err); },
  function () { console.log('Completed'); });
Next: 42
Completed
Christian-health commented 6 years ago

do

register an action to take upon a variety of Observable lifecycle events 注册一个动作来处理各种可观察的生命周期事件

image 【看这个图,也能猜出这个图中的意思,就是注册了一个回调函数,这个回调函数,就是那个五角星和后面的红色的圆,这个回调函数的作用就是,如果发出了事件源发出了一个元素,那么我就把这个发出的元素上应用一个五角星操作,如果事件源没有发出任何的数据,那么我就应用红色圆这个操作】 You can register callbacks that ReactiveX will call when certain events take place on an Observable, where those callbacks will be called independently from the normal set of notifications associated with an Observable cascade. There are a variety of operators that various ReactiveX implementations have designed to allow for this.

你可以注册一个回调函数,当有事件发生在一个Observable上的时候,ReactiveX将会调用这个回调函数,这些回调函数将被调用,其中这些回调将独立于与可观察级联相关联的通常通知集进行调用,各种ReactiveX实现已经设计允许这样的操作。

RxJS implements the basic Do operator as do or tap (two names for the same operator). You have two choices for how to use this operator: RxJS实现了两种基本的Do操作符,一种是do 另外一个是tap,他们是同样的一个操作符的不同的名字。 你有以下的两种方式能使用这个操作符。

You can pass it an Observer, in which case do/tap will call that Observer’s methods as though that Observer had subscribed to the resulting Observable. 第一种方式你可以给它传递一个Observer,这种情况下do或者tap操作符能调用这个Observer的方法,通过Observer订阅,就像那个Observer订阅了Observable的结果一样。

You can pass in a set of 1–3 individual functions (onNext, onError, and onCompleted) that do/tap will call along with the similarly-named functions of any of its observers. 第二种方式你可以传递一个拥有三个独立函数的集合,这个集合中有三个函数(onNext, onError, and onCompleted) ,do或者tap将会其任何observers的类似命名的函数一起调用。

/* Using an observer  方式一 */
var observer = Rx.Observer.create(
  function (x) { console.log('Do Next: %s', x); },
  function (err) { console.log('Do Error: %s', err); },
  function () { console.log('Do Completed'); }
);

var source = Rx.Observable.range(0, 3)
    .do(observer);

var subscription = source.subscribe(
  function (x) { console.log('Next: %s', x); },
  function (err) { console.log('Error: %s', err); },
  function () { console.log('Completed'); });

Do Next: 0
Next: 0
Do Next: 1
Next: 1
Do Next: 2
Next: 2
Do Completed
Completed
/* Using a function   方式二*/
var source = Rx.Observable.range(0, 3)
  .do(
    function (x)   { console.log('Do Next:', x); },
    function (err) { console.log('Do Error:', err); },
    function ()    { console.log('Do Completed'); }
  );

var subscription = source.subscribe(
  function (x) { console.log('Next: %s', x); },
  function (err) { console.log('Error: %s', err); },
  function () { console.log('Completed'); });

Do Next: 0
Next: 0
Do Next: 1
Next: 1
Do Next: 2
Next: 2
Do Completed
Completed

doOnNext

image

RxJS also implements doOnNext or tapOnNext (two names for the same operator). It is a specialized form of Do that responds only to the onNext case, by calling a callback function you provide as a parameter. RxJS还实现了doOnNext或tapOnNext(同一操作符的两个名称)。 它是一种专门的Do形式,只响应onNext的情况,通过调用您以参数形式提供的回调函数。 You may also optionally pass a second parameter that will be the “this” object from the point of view of your callback function when it executes. 你还可以传递一个可选的第二个参数,就是你这个回调函数执行的时候的this。

var source = Rx.Observable.range(0, 3)
  .doOnNext(
    function () { this.log('Do Next: %s', x); },
    console
  );

var subscription = source.subscribe(
  function (x) { console.log('Next: %s', x); },
  function (err) { console.log('Error: %s', err); },
  function () { console.log('Completed'); });
Do Next: 0
Next: 0
Do Next: 1
Next: 1
Do Next: 2
Next: 2
Completed

doOnError

image RxJS also implements doOnError or tapOnError (two names for the same operator). It is a specialized form of Do that responds only to the onError case, by calling a callback function you provide as a parameter. You may also optionally pass a second parameter that will be the “this” object from the point of view of your callback function when it executes. RxJS实现了doOnError和tapOnError。这是同样的一个操作符的两个不同的名字。它指定了一个特定形式的Do。只有当onError情况下才会被调用。这个函数接收一个函数参数,你可以传递一个可选的第二个参数,这个参数是这个函数执行时候的this对象。

var source = Rx.Observable.throw(new Error());
  .doOnError(
    function (err) { this.log('Do Error: %s', err); },
    console
  );

var subscription = source.subscribe(
  function (x) { console.log('Next: %s', x); },
  function (err) { console.log('Error: %s', err); },
  function () { console.log('Completed'); });
Do Error: Error
Error: Error

doOnCompleted

image

RxJS also implements doOnCompleted or tapOnCompleted (two names for the same operator). It is a specialized form of Do that responds only to the onCompleted case, by calling a callback function you provide as a parameter. You may also optionally pass a second parameter that will be the “this” object from the point of view of your callback function when it executes. RxJS实现了一个doOnCompleted或者tapOnCompleted操作符,同样的一个操作符两个不同的名字。 这是一个特定的形式的Do函数,只有当onCompleted情况的时候,才会被调用。它接收一个函数参数。 你还可以传递一个可选的参数,这个参数是this。

var source = Rx.Observable.range(0, 3)
  .doOnCompleted(
    function () { this.log('Do Completed'); },
    console
  );

var subscription = source.subscribe(
  function (x) { console.log('Next: %s', x); },
  function (err) { console.log('Error: %s', err); },
  function () { console.log('Completed'); });
Next: 0
Next: 1
Next: 2
Do Completed
Completed

finally

image

RxJS also implements a finally operator. It takes a function that will be called after the resulting Observable terminates, whether normally (onCompleted) or abnormally (onError). RxJS实现了一个finally操作符,如果Observable终结,那么这个函数将会被调用。不论Observable是正常结束 (onCompleted)还是异常结束 (onError)。

var source = Rx.Observable.throw(new Error())
    .finally(function () {
        console.log('Finally');
    });

var subscription = source.subscribe(
  function (x) { console.log('Next: ' + x); },
  function (err) { console.log('Error: ' + err); },
  function () { console.log('Completed'); });
Error: Error
Finally

【上面的各种操作符,可以看到do是针对Observable的各个事件都进行处理。而doNext,doError,doCompletet是针对各个具体的时间。】

Christian-health commented 6 years ago

Replay

ensure that all observers see the same sequence of emitted items, even if they subscribe after the Observable has begun emitting items

确保所有观察者看到相同的发射物品序列,即使它们在Observable已经开始排放物品之后订阅

image

A connectable Observable resembles an ordinary Observable, except that it does not begin emitting items when it is subscribed to, but only when the Connect operator is applied to it. In this way you can prompt an Observable to begin emitting items at a time of your choosing. 一个connectable(可连接的)Observable类似于一个普通的Observable,除了它不会在订阅时开始发出items项目,而只有当Connect操作符被应用时才会开始。 这样,您可以促使Observable在您选择的时候开始发送item。(也就是说可以把一个如果我们想做到不是订阅的时候就开始发送而是选择的时候才开始发送,那么我就写一个connectable Observable 什么时候我把connectable操作符应用于它那么什么时候它才开始发送数据) (这段话的意思是有一种Observable叫做connectable Observable。这个东西和普通的Observable是有区别的,普通的Observable订阅之后就会发出数据,但是这种connectable Observable 只有当connectable操作符号被应用到connectable Observable上的时候,那么connectable Observable才会发出数据) If you apply the Replay operator to an Observable before you convert it into a connectable Observable, the resulting connectable Observable will always emit the same complete sequence to any future observers, even those observers that subscribe after the connectable Observable has begun to emit items to other subscribed observers. 如果您将Replay操作符应用于Observable,然后再将其转换为connectable可连接的Observable,则所得到的可连接Observable将始终向任何将来的观察者发出相同的完整序列,即使是在可连接的Observable已经开始向其他方式发出项目之后订阅的观察者 订阅观察员。

Christian-health commented 6 years ago

http://reactivex.io/documentation/operators/replay.html

Christian-health commented 6 years ago

repeat

image

create an Observable that emits a particular item multiple times 创建一个可以发射多个特定项目的Observable

The Repeat operator emits an item repeatedly. Some implementations of this operator allow you to repeat a sequence of items, and some permit you to limit the number of repetitions. 重复操作符反复发出一个项目。 该操作符的一些实现允许您重复一系列的项目,有些允许您限制重复次数。

RxJS implements this operator as repeat. It accepts as its parameter the item to repeat, and optionally two other parameters: the number of times you want the item to repeat, and the Scheduler on which you want to perform this operation (it uses the immediate Scheduler by default).

RxJS实现了一个repeat操作符。 它接受要重复的项目的参数,以及可选的其他两个参数:您希望项目重复的次数以及要执行此操作的Scheduler 计划程序(默认情况下使用immediate 即时调度程序)。

var source = Rx.Observable.repeat(42, 3);

var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x); },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); });
Next: 42
Next: 42
Next: 42
Completed
Christian-health commented 6 years ago

doWhile

image

RxJS also implements the doWhile operator. It repeats the source Observable’s sequence of emissions only so long as a condition you specify remains true.

RxJS还实现了doWhile运算符。 只要您指定的条件保持为真,它就会重复Observable的排放序列。

var i = 0;

var source = Rx.Observable.return(42).doWhile(
    function (x) { return ++i < 2; });

var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x); },
    function (err) { console.log('Error: ' + err); },
   function () { console.log('Completed'); });
Next: 42
Next: 42
Completed
Christian-health commented 6 years ago

while

image

RxJS also implements the while operator. It repeats the source Observable’s sequence of emissions only if a condition you specify is true. RxJS还实现了while运算符。 只有当您指定的条件为真时,它才会重复Observable的排放序列。

var i = 0;

// Repeat until condition no longer holds
var source = Rx.Observable.while(
    function () { return i++ < 3 },
    Rx.Observable.return(42)
);

var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x); },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); });
Next: 42
Next: 42
Next: 42
Completed
Christian-health commented 6 years ago

Connect

instruct a connectable Observable to begin emitting items to its subscribers 指示connectable Observable(可连接的Observable)开始向其订阅者发送物品

image

A connectable Observable resembles an ordinary Observable, except that it does not begin emitting items when it is subscribed to, but only when the Connect operator is applied to it. In this way you can wait for all intended observers to subscribe to the Observable before the Observable begins emitting items.

一个可连接的Observable类似于一个普通的Observable,除了它不会在订阅时开始发出项目,而只有当Connect操作符被应用时才会开始。 这样,您可以等待所有预期的观察者在Observable开始发射物品之前订阅Observable。

【这段话的意思是,不同的Observable只要一个Observer订阅了它,那么这个Observable就开始发送数据了,那么其他的后面的新来的Observable订阅之后得到的就是全部的发出对象,比如现在有一个Observable,第一个Observer订阅了它,那么它就开始发送数据,0,1,2,3,4,5.............比如发送到了5的时候,有来了一个Observer,这个时候它也订阅了这个Observable,那么后来的这个Observer能接受到的数据就是从5开始,而前面到的的那些就已经收不到了,但是connectable Observable的优点就是你可以等到所有想要订阅这个Observable的Observer都到了,然后你把一个connect应用到这个Observable上,那么这个时候才开始发出数据,那么所有Observer都能从0开始接受数据,那么所有的Observer拿到的都是完整的数据】

In RxJS, the connect operator is a method of the ConnectableObservable prototype. You can use the publish operator to convert an ordinary Observable into a ConnectableObservable.

在RxJS中,connect操作符是一个ConnectableObservable 的原型方法。你可以使用publish操作符把一个普通的Observable转换成为一个ConnectableObservable

Ordinary Observable   ======> ConnectableObservable
  普通的Observable                           可连接的Observable

Call a ConnectableObservable’s connect method to instruct (通知)it to begin emitting the items from its underlying Observable to its Subscribers.

调用一个ConnectableObservable的connect方法来(通知)它从其基础的Observable开始发射item给其订阅者。

The connect method returns a Disposable. You can call that Disposable object’s dispose method to instruct the Observable to stop emitting items to its Subscribers.

connect方法返回一个Disposable。 您可以调用Disposable对象的dispose 方法来指示Observable停止向其订阅者发送项目。

可任意处理的; 一次性的,用后就抛弃的; 免洗的; 可供使用的;

Disposable.dispose

You can also use the connect method to instruct an Observable to begin emitting items (or, to begin generating items that would be emitted) even before any Subscriber has subscribed to it. In this way you can turn a cold Observable into a hot one.

您也可以使用connect方法来通知Observable开始发布item(或者开始生成将要发出的item),甚至在任何订阅者订阅之前(也就是说在没有任何的订阅者订阅到这个Observable之前就把要发送的数据准备好了,那么这个Observable就不是冷的了,而是热的)。 以这种方式你可以把一个冷的可观察变成一个热的。

//有两个订阅者
<!DOCTYPE html>
<html>
<head>
    <title></title>
    <script type="text/javascript" src="http://apps.bdimg.com/libs/jquery/2.1.4/jquery.min.js"></script>

    <script type="text/javascript" src="./RxJS-master/dist/rx.all.js"></script>
    <!-- <script type="text/javascript" src="./yang.js"></script> -->
</head>
<body>
<script>
var interval = Rx.Observable.interval(1000);
// [{0,1000},{1,1000}]

var source = interval
    .take(2)
    .do(function (x) { console.log('Side effect'); });

var published = source.publish();

published.subscribe(createObserver('SourceA'));
published.subscribe(createObserver('SourceB'));

// Connect the source
var connection = published.connect();

function createObserver(tag) {
    return Rx.Observer.create(
        function (x) { console.log('Next: ' + tag + x); },
        function (err) { console.log('Error: ' + err); },
        function () { console.log('Completed'); });
}
</script>
</body>
</html>

 Side effect
 Next: SourceA0
 Next: SourceB0
 Side effect
 Next: SourceA1
 Next: SourceB1
 Completed
 Completed
//有一个订阅者
<!DOCTYPE html>
<html>
<head>
    <title></title>
    <script type="text/javascript" src="http://apps.bdimg.com/libs/jquery/2.1.4/jquery.min.js"></script>

    <script type="text/javascript" src="./RxJS-master/dist/rx.all.js"></script>
    <!-- <script type="text/javascript" src="./yang.js"></script> -->
</head>
<body>
<script>
var interval = Rx.Observable.interval(1000);
// [{0,1000},{1,1000}]

var source = interval
    .take(2)
    .do(function (x) { console.log('Side effect'); });

var published = source.publish();

published.subscribe(createObserver('SourceA'));
// published.subscribe(createObserver('SourceB'));

// Connect the source
var connection = published.connect();

function createObserver(tag) {
    return Rx.Observer.create(
        function (x) { console.log('Next: ' + tag + x); },
        function (err) { console.log('Error: ' + err); },
        function () { console.log('Completed'); });
}
</script>
</body>
</html>

Side effect
Next: SourceA0
Side effect
Next: SourceA1
Completed

这个东西的使用方式就是:首先建立一个普通的Observable,然后把这个普通的Observable应用一下publish,那么这个时候它就变成了一个connect Observable,然后就可以有很多的Observer订阅它,最后,如果我们想让它开始发送数据的时候,只需要在这个connect Observable上应用connect操作符。

                 Ordinary  Observable          普通的Observable
                             ||
                             ||
                             ||
                            \|/
                 publish操作符号(这里就变成了一个connect  Observable)
                             ||
                             ||
                             ||
                            \|/
                各个Observer开始进行订阅
                             ||
                             ||
                             ||
                            \|/
                应用connect操作符号开始发送数据
                             ||
                             ||
                             ||
                            \|/
               各个Observer开始接收数据
Christian-health commented 6 years ago

publish

image

convert an ordinary Observable into a connectable Observable 将普通的Observable转换成可连接的Observable

         ordinary Observable           普通Observable
                  ||
                  ||
                  ||                 publish操作符
                 \|/
         connectable Observable    可连接的Observable

A connectable Observable resembles an ordinary Observable, except that it does not begin emitting items when it is subscribed to, but only when the Connect operator is applied to it. In this way you can prompt an Observable to begin emitting items at a time of your choosing. 一个可连接的Observable类似于一个普通的Observable,除了它不会在订阅时开始发出项目,而只有当Connect操作符被应用时才会开始。 这样,您可以提示Observable在您选择的时候开始发送物品。

var interval = Rx.Observable.interval(1000);

var source = interval
    .take(2)
    .doAction(function (x) {
        console.log('Side effect');
    });

var published = source.publish();

published.subscribe(createObserver('SourceA'));
published.subscribe(createObserver('SourceB'));

var connection = published.connect();

function createObserver(tag) {
    return Rx.Observer.create(
        function (x) { console.log('Next: ' + tag + x); },
        function (err) { console.log('Error: ' + err); },
        function () { console.log('Completed'); });
}
Side effect
Next: SourceA0
Next: SourceB0
Side effect
Next: SourceA1
Next: SourceB1
Completed
Completed

publishValue

image The publishValue operator takes, in addition to the function described above, an initial item to be emitted by the resulting ConnectableObservable at connection time before emitting the items from the source Observable. It will not, however, emit this initial item to observers that subscribe after the time of connection. 除了上述功能之外,publishValue运算符除了从源Observable中发出项之外,还将连接时连接的ConnectableObservable发出的初始项。 然而,不会将此初始项目发送给在连接时间之后订阅的观察者。

publishValue 操作符的功能除去上面的描述的那些之外。有一个初始的item将会被emit发出,什么时候发出?就是当这个ConnectableObservable 被connect之后,并且在源Observable发出item之前,发出一个初始的item。但是请注意,那些在connect之后才订阅的observer不会收到这个初始的item。

var interval = Rx.Observable.interval(1000);

var source = interval
    .take(2)
    .doAction(function (x) {
        console.log('Side effect');
    });

var published = source.publishValue(42);

published.subscribe(createObserver('SourceA'));
published.subscribe(createObserver('SourceB'));

var connection = published.connect();

function createObserver(tag) {
    return Rx.Observer.create(
        function (x) { console.log('Next: ' + tag + x); },
        function (err) { console.log('Error: ' + err); },
        function () { console.log('Completed'); });
}
Next: SourceA42
Next: SourceB42
Side effect
Next: SourceA0
Next: SourceB0
Side effect
Next: SourceA1
Next: SourceB1
Completed
Completed

publishLast

image

The publishLast operator is similar to publish, and takes a similarly-behaving function as its parameter. It differs from publish in that instead of applying that function to, and emitting an item for every item emitted by the source Observable subsequent to the connection, it only applies that function to and emits an item for the last item that was emitted by the source Observable, when that source Observable terminates normally. publishLast 操作符类似于publish操作符,并且有一个相同行为的函数作为其参数。这个函数与publish不同的是,publise会发出每一个源Observable发出的item,但是这个publishLast只会发出源操作符发出的最后一个item,当该源可观察终止正常。 【也就是说publishLast只会发出最后一个操作符,并且什么时候发出,就是在源Observable终止的时候】

var interval = Rx.Observable.interval(1000);

var source = interval
    .take(2)
    .doAction(function (x) {
        console.log('Side effect');
    });

var published = source.publishLast();

published.subscribe(createObserver('SourceA'));
published.subscribe(createObserver('SourceB'));

var connection = published.connect();

function createObserver(tag) {
    return Rx.Observer.create(
        function (x) { console.log('Next: ' + tag + x); },
        function (err) { console.log('Error: ' + err); },
        function () { console.log('Completed'); });
}
Side effect
Side effect
Next: SourceA1
Completed
Next: SourceB1
Completed

multicast

RxJS also has a multicast operator which operates on an ordinary Observable, multicasts that Observable by means of a particular Subject that you specify, applies a transformative function to each emission, and then emits those transformed values as its own ordinary Observable sequence. Each subscription to this new Observable will trigger a new subscription to the underlying multicast Observable.

var subject = new Rx.Subject();
var source = Rx.Observable.range(0, 3)
    .multicast(subject);

var observer = Rx.Observer.create(
    function (x) { console.log('Next: ' + x); },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); }
);

var subscription = source.subscribe(observer);
subject.subscribe(observer);

var connected = source.connect();

subscription.dispose();
Next: 0
Next: 0
Next: 1
Next: 1
Next: 2
Next: 2
Completed

let

There is also a let operator (the alias letBind is available for browsers such as Internet Explorer before IE9 where “let” is forbidden). It is similar to multicast but does not multicast the underlying Observable through a Subject:

var obs = Rx.Observable.range(1, 3);

var source = obs.let(function (o) { return o.concat(o); });

var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x); },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); });

var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x); },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); });
Next: 1
Next: 2
Next: 3
Next: 1
Next: 2
Next: 3
Completed
Christian-health commented 6 years ago

RefCount

make a Connectable Observable behave like an ordinary Observable image A connectable Observable resembles an ordinary Observable, except that it does not begin emitting items when it is subscribed to, but only when the Connect operator is applied to it. In this way you can prompt an Observable to begin emitting items at a time of your choosing.

The RefCount operator automates the process of connecting to and disconnecting from a connectable Observable. It operates on a connectable Observable and returns an ordinary Observable. When the first observer subscribes to this Observable, RefCount connects to the underlying connectable Observable. RefCount then keeps track of how many other observers subscribe to it and does not disconnect from the underlying connectable Observable until the last observer has done so.

RxJava implements this operator as refCount.

var interval = Rx.Observable.interval(1000);

var source = interval
    .take(2)
    .doAction(function (x) { console.log('Side effect'); });

var published = source.publish().refCount();

published.subscribe(createObserver('SourceA'));
published.subscribe(createObserver('SourceB'));

function createObserver(tag) {
    return Rx.Observer.create(
        function (x) { console.log('Next: ' + tag + x); },
        function (err) { console.log('Error: ' + err); },
        function () { console.log('Completed'); });
}
Side effect
Next: SourceA0
Next: SourceB0
Side effect
Next: SourceA1
Next: SourceB1
Completed
Completed

share

There is also a share operator, which is the equivalent of applying both the publish and refCount operators to an Observable, in that order. A variant called shareValue takes as a parameter a single item that it will emit to any subscribers before beginning to emit items from the source Observable.

var interval = Rx.Observable.interval(1000);

var source = interval
    .take(2)
    .do(
        function (x) { console.log('Side effect'); });

var published = source.share();

// When the number of observers subscribed to published observable goes from
// 0 to 1, we connect to the underlying observable sequence.
published.subscribe(createObserver('SourceA'));
// When the second subscriber is added, no additional subscriptions are added to the
// underlying observable sequence. As a result the operations that result in side
// effects are not repeated per subscriber.
published.subscribe(createObserver('SourceB'));

function createObserver(tag) {
    return Rx.Observer.create(
        function (x) { console.log('Next: ' + tag + x); },
        function (err) { console.log('Error: ' + err); },
        function () { console.log('Completed'); });
}
Side effect
Next: SourceA0
Next: SourceB0
Side effect
Next: SourceA1
Next: SourceB1
Completed
Christian-health commented 6 years ago

https://segmentfault.com/a/1190000008464065#articleHeader0

Christian-health commented 6 years ago

Start

create an Observable that emits the return value of a function-like directive image There are a number of ways that programming languages have for obtaining values as the result of calculations, with names like functions, futures, actions, callables, runnables, and so forth. The operators grouped here under the Start operator category make these things behave like Observables so that they can be chained with other Observables in an Observable cascade

RxJS implements the start operator. It takes as its parameters a function whose return value will be the emission from the resulting Observable, and, optionally, any additional parameter to that function and a Scheduler on which to run the function.

var context = { value: 42 };

var source = Rx.Observable.start(
    function () {
        return this.value;
    },
    context,
    Rx.Scheduler.timeout
);

var subscription = source.subscribe(
    function (x) {
        console.log('Next: ' + x);
    },
    function (err) {
        console.log('Error: ' + err);
    },
    function () {
        console.log('Completed');
    });
Next: 42
Complete

startAsync

image RxJS also implements the startAsync operator. It takes as its parameters an asynchronous function whose return value will be the emission from the resulting Observable.

You can convert a function into an asynchronous function with the toAsync method. It takes a function, function parameter, and Scheduler as parameters, and returns an asynchronous function that will be invoked on the specified Scheduler. The last two parameters are optional; if you do not specify a Scheduler, the timeout Scheduler will be used by default.

var source = Rx.Observable.startAsync(function () {
    return RSVP.Promise.resolve(42);
});

var subscription = source.subscribe(
    function (x) {
        console.log('Next: ' + x);
    },
    function (err) {
        console.log('Error: ' + err);
    },
    function () {
        console.log('Completed');
    });
Next: 42
Completed
Christian-health commented 6 years ago

doAction

doAction就是do操作符,在官网上查询doAction查到的结果就是do。

Christian-health commented 6 years ago

继续from 还有文章

https://segmentfault.com/a/1190000008464065

Christian-health commented 6 years ago

http://www.jianshu.com/p/504bde348956

Christian-health commented 6 years ago

BehaviorSubject

BehaviorSubject 能够保留最近的数据,使得当有subscribe的时候,立马发射出去。看代码:

var subject = new Rx.BehaviorSubject(0); // 0 is the initial value

subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});

subject.next(1);
subject.next(2);

subject.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

subject.next(3);

observerA: 0
observerA: 1
observerA: 2
observerB: 2
observerA: 3
observerB: 3

image

注意看这张图,看这个BehaviorSubject函数有一个参数,如果从最开始订阅这个Subject的时候,首先会接收到这个Subject的参数,然后接收这个Subject发出的各个item。如果不是从最开始就订阅了这个Subject的话,那么订阅者会收到Observable被订阅之前发出的最后一个元素,然后继续接收新的元素。

Christian-health commented 6 years ago

TakeLast

emit only the final n items emitted by an Observable 只发出Observable发出的最后的N个item。

image

You can emit only the final n items emitted by an Observable and ignore those items that come before them, by modifying the Observable with the TakeLast operator. 取得一个Observable中的最后的几个item,忽略掉前面的所有的。

image

You can emit only the final n items emitted by an Observable and ignore those items that precede them, by modifying the Observable with the takeLast(n) operator. Note that this will delay the emission of any item from the source Observable until that Observable completes. 您只能发出一个Observable发出的最后n个项目,并忽略它们之前的那些项目,通过使用takeLast(n)运算符修改Observable。 请注意,这将延迟源Observable中任何项目的排放,直到该Observable完成。

var source = Rx.Observable.range(0, 5)
    .takeLast(3);

var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x); },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); });
Next: 2
Next: 3
Next: 4
Completed

takeLastWithTime

image

The takeLastWithTime operator takes a temporal duration rather than a quantity of items. It emits only those items that are emitted during that final duration of the source Observable’s lifespan. You set this duration by passing in a number of milliseconds as a parameter to takeLastWithTime. takeLastWithTime运算符需要一个持续时间而不是一个数量的项。 它仅发射在源可观测的寿命的最后持续时间内发射的那些物品。 通过传递几毫秒作为takeLastWithTime的参数来设置此持续时间。

Note that the mechanism by which this is implemented will delay the emission of any item from the source Observable until that Observable completes. 请注意,执行此操作的机制将会延迟源Observable中任何项目的排放,直到该Observable完成。

takeLastWithTime by default operates the timer on the timeout Scheduler and emits items on the currentThread Scheduler, but you may also pass in Schedulers of your choosing to override these, as an optional second and third parameters, respectively.

takeLastWithTime默认情况下会在超时计划程序上运行定时器,并在currentThread Scheduler上发出项目,但您也可以分别选择第二和第三参数,将您选择的计划程序传递给它们,以覆盖这些计划。

var source = Rx.Observable.timer(0, 1000)
    .take(10)
    .takeLastWithTime(5000);

var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x); },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); });
Next: 5
Next: 6
Next: 7
Next: 8
Next: 9
Completed

takeLastBuffer

image

There is also an operator called takeLastBuffer. It differs in behavior from takeLast by emitting its items not individually but collected into a single array of items that is emitted as a single item. 还有一个名为takeLastBuffer的运算符。 它的行为与takeLast的不同之处在于,它们不是单独发送,而是收集到单个作为单个项目排放的项目的数组中。 (就是说把指定个数的元素,组成一个数组,然后把这个数组发射出来)

There is also an operator called takeLastBuffer. It differs in behavior from takeLast by emitting its items not individually but collected into a single array of items that is emitted as a single item.

var source = Rx.Observable.range(0, 5)
    .takeLastBuffer(3);

var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x); },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); });
Next: 2,3,4
Completed

takeLastBufferWithTime

image

takeLastBuffer also has its duration-based variant, takeLastBufferWithTime, which is similar to takeLastWithTime except that it emits its items not individually but collected into a single array of items that is emitted as a single item.

takeLastBuffer还具有基于持续时间的变体takeLastBufferWithTime,它类似于takeLastWithTime,除了它不单独发送其项目,而是收集到单个项目中发出的单个数组中。

var source = Rx.Observable
    .timer(0, 1000)
    .take(10)
    .takeLastBufferWithTime(5000);

var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x); },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); });
Next: 5,6,7,8,9
Completed
Christian-health commented 6 years ago

Aggregate (reduce)

apply a function to each item emitted by an Observable, sequentially, and emit the final value 对Observable发出的每个项目顺序应用一个函数,并发出最终的值

image

The Reduce operator applies a function to the first item emitted by the source Observable and then feeds the result of the function back into the function along with the second item emitted by the source Observable, continuing this process until the source Observable emits its final item and completes, whereupon the Observable returned from Reduce emits the final value returned from the function. Reduce操作符对源Observable发出的第一个项目应用一个函数,然后将该函数的结果与源Observable发出的第二个项目一起反馈到该函数中,继续此过程,直到源Observable发出其最终项目, 完成,然后从Reduce返回的Observable发出从函数返回的最终值。

This sort of operation is sometimes called “accumulate,” “aggregate,” “compress,” “fold,” or “inject” in other contexts. 这种操作有时在其他上下文中被称为“累加”,“聚合”,“压缩”,“折叠”或“注入”。

image

RxJS implements the reduce operator. Pass it an accumulator function, and, optionally, a seed value to pass into the accumulator function with the first item emitted by the source Observable. RxJS实现了reduce操作符。 传递一个累加器函数,以及可选的种子值,以便通过源Observable发出的第一个项目传入累加器函数。

var source = Rx.Observable.range(1, 3)
    .reduce(function (acc, x) {
        return acc * x;
    }, 1)

var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x); },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); });
Next: 6
Completed
Christian-health commented 6 years ago

All / every / jortSort / jortSortUntil

determine whether all items emitted by an Observable meet some criteria 确定Observable发出的所有项目是否符合某些标准 image

Pass a predicate function to the All operator that accepts an item emitted by the source Observable and returns a boolean value based on an evaluation of that item. All returns an Observable that emits a single boolean value: true if and only if the source Observable terminates normally and every item emitted by the source Observable evaluated as true according to this predicate; false if any item emitted by the source Observable evaluates as false according to this predicate.

将一个谓词函数传递给接受Source Observable发出的一个项目的All运算符,并根据该项目的评估返回一个布尔值。 全部返回一个可发出单个布尔值的Observable:当且仅当源Observable正常终止且源Observable发出的每个项目根据该谓词计算为true时为true。 如果源Observable发出的任何项目根据该谓词计算为false,则为false。

image

var source = Rx.Observable.of(1,2,3,4,5)
  .every(function (x) {
    return x < 6;
  });

var subscription = source.subscribe(
  function (x) { console.log('Next: %s', x); },
  function (err) { console.log('Error: %s', err); },
  function () { console.log('Completed'); });
Next: true
Completed

There is also a jortSort operator that performs a test on the entire sequence of items emitted by the source Observable. If those items are emitted in sorted order, upon the successful completion of the source Observable, the Observable returned from jortSort will emit true and then complete. If any of the items emitted by the source Observable is out of sort order, upon the successful completion of the source Observable, the Observable returned from jortSort will emit false and then complete. 还有一个jortSort操作符,对源Observable发出的整个项目序列进行测试。 如果这些项目按排序顺序排列,则在成功完成源Observable后,从jortSort返回的Observable将会发出true,然后完成。 如果源Observable发出的任何项目不符合排序顺序,则在成功完成源Observable后,从jortSort返回的Observable将发出false并完成。 There is also a jortSortUntil operator. It does not wait until the source Observable completes to evaluate its sequence for sortedness, as jortSort does, but waits until a second Observable emits an item to do so.

var source = Rx.Observable.of(1,2,3,4) // already sorted
               .jortSort();

var subscription = source.subscribe(
  function (x) { console.log('Next: %s', x); },
  function (e) { console.log('Error: %s', e); },
  function ( ) { console.log('Completed'); });
Next: true
Completed
var source = Rx.Observable.of(3,1,2,4) // not sorted
               .jortSort();

var subscription = source.subscribe(
  function (x) { console.log('Next: %s', x); },
  function (e) { console.log('Error: %s', e); },
  function ( ) { console.log('Completed'); });
Next: false
Completed
var just = Rx.helpers.just;

var source = Rx.Observable.of(1,2,3,4) // already sorted          //这段代码根本直接跑不起来,没看懂
               .flatmap(function (x) {
                 return Rx.Observable.timer(1000).map(just(x));
               }).jortSortUntil(Rx.Observable.timer(3000);

var subscription = source.subscribe(
  function (x) { console.log('Next: %s', x); },
  function (e) { console.log('Error: %s', e); },
  function ( ) { console.log('Completed'); });
Next: true
Completed
var just = Rx.helpers.just; //这段代码根本直接跑不起来,没看懂

var source = Rx.Observable.of(3,1,2,4) // not sorted
               .flatmap(function (x) {
                 return Rx.Observable.timer(1000).map(just(x));
               }).jortSortUntil(Rx.Observable.timer(3000);

var subscription = source.subscribe(
  function (x) { console.log('Next: %s', x); },
  function (e) { console.log('Error: %s', e); },
  function ( ) { console.log('Completed'); });
Next: false
Completed
Christian-health commented 6 years ago

Amb ($q.race)

given two or more source Observables, emit all of the items from only the first of these Observables to emit an item or notification 从两个或者更多个Observable中,选择出第一个开始向往发射item的Observable然后,把这个Observable的所有东西全部输出。

image

When you pass a number of source Observables to Amb, it will pass through the emissions and notifications of exactly one of these Observables: the first one that sends a notification to Amb, either by emitting an item or sending an onError or onCompleted notification. Amb will ignore and discard the emissions and notifications of all of the other source Observables. 当你传递多个Observable到Amb中的时候,它将通过这些Observables中的一个的排放和通知: amb会读取并使用第一个发出item的Observable或者说,不是发出一个正常的item,onError,或者onCompleted也可以。

image RxJS implements this operator as amb. It takes a variable number of parameters, which may be either Observables or Promises (or combinations of the two). RxJS将此运算符实现为amb。 它需要可变数量的参数,它们可以是Observables或Promises(或两者的组合)。

/* Using Observable sequences */
var source = Rx.Observable.amb(
    Rx.Observable.timer(500).select(function () { return 'foo'; }),// 延迟500毫秒
    Rx.Observable.timer(200).select(function () { return 'bar'; })//延迟200毫秒
);

var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x); },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); });
Next: bar
Completed
/* Using Promises and Observables */
var source = Rx.Observable.amb(
    RSVP.Promise.resolve('foo')
    Rx.Observable.timer(200).select(function () { return 'bar'; }) //select (alternate name of Filter)
);

var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x); },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); });
Next: foo
Completed