Christian-Yang / Translate-and-save

Translate and save for my self
1 stars 0 forks source link

RxJs操作符笔记 #36

Open Christian-Yang opened 7 years ago

Christian-Yang commented 7 years ago

RxJs操作符笔记,原文:http://reactivex.io/documentation/observable.html 屏幕尺子:http://www.chizi.cc/ RxJs给出的操作符名字变更和删除文档的地址:https://github.com/ReactiveX/RxJS/blob/master/MIGRATION.md#operators-renamed-or-removed

================================================================================

scan 操作符

apply a function to each item emitted by an Observable, sequentially, and emit each successive value. 对Observable发出的每个项目顺序应用一个函数,并发出每个连续的值。 The Scan operator applies a function to the first item emitted by the source Observable and then emits the result of that function as its own first emission. 扫描运算符对源Observable发出的第一个项目应用一个函数,然后将该函数的结果作为自己的第一个发射发出。 It also feeds the result of the function back into the function along with the second item emitted by the source Observable in order to generate its second emission. 它还将功能的结果与源Observable发射的第二个项目一起馈送到功能中,以产生其第二个发射。 It continues to feed back its own subsequent emissions along with the subsequent emissions from the source Observable in order to create the rest of its sequence. 它继续反馈自己的后续排放以及源Observable的后续排放,以创建其余的序列。 This sort of operator is sometimes called an “accumulator” in other contexts. 这种运算符有时在其他上下文中称为“累加器”。 scan-demo

ar source = Rx.Observable.range(1, 3)
    .scan(
        function (acc, x) {
            return acc + x;
        });

var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x); },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); });
Next: 1
Next: 3
Next: 6
Completed

没有提供acc初始值的情况下,acc初始值为0,应该是。 qq1

var source = Rx.Observable.range(1, 3)
    .scan( function (acc, x) {
            return acc * x;
           }, 1 );

var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x); },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); });
Next: 1
Next: 2
Next: 6
Completed

这里为acc提供了一个初始的值acc =1

demo code

<!DOCTYPE html>
<html>
<head>
    <title></title>
    <script type="text/javascript" src="http://apps.bdimg.com/libs/jquery/2.1.4/jquery.min.js"></script>

    <script type="text/javascript" src="./rxjs/dist/rx.lite.js"></script>
</head>
<body>
<script>
var source = Rx.Observable.range(1, 3)
    .scan(
        function (acc, x) {
            console.log('x value :',x);
            return acc + x;
        },10);

var subscription = source.subscribe(
    function (x) {
        console.log('Next: ' + x);
    },
    function (err) {
        console.log('Error: ' + err);   
    },
    function () {
        console.log('Completed');   
    });

// => Next: 1
// => Next: 3
// => Next: 6
// => Completed 
</script>
</body>
</html>

console print

x value : 1
Next: 11
x value : 2
Next: 13
x value : 3
Next: 16
Christian-Yang commented 7 years ago

throttleTime操作符

throttle节流,节流阀,扼杀

only emit an item from an Observable if a particular timespan has passed without it emitting another item 只有一个特定的时间段已经过去,并且没有发出其他的item的情况下,才会发出一个从Observable中生成的item

The Debounce operator filters out items emitted by the source Observable that are rapidly followed by another emitted item. Debounce运算符过滤掉源Observable发出的项目,后面紧跟着另一个发送的项目。 (也就是说如果一个item1后面紧跟着另外的一个item2,那么item1就会被过滤掉) debouce 红球出现之后的指定时间之内没有任何新的球出现,所以红球留下。而黄球出现之后,在指定的时间还没有过去的情况下,绿球就出现了(因该是绿色,我是色盲分不清出),所以黄球被过滤掉了,绿球留了下来,接着在绿球出现的指定时间之内没有新的球出现了。那么绿球最终留下。 The first variant — called either debounce or throttleWithTimeout — accepts as its parameter a duration, defined as an integer number of milliseconds, and it suppresses any emitted items that are followed by other emitted items during that duration since the first item’s emission. 第一个变量 - 称为debounce(抖动)或throttleWithTimeout - 接受作为其参数的持续时间,定义为整数毫秒,并且在第一个项目发射之后的该持续时间内抑制其他发射项目后面的任何发射项目。 debounce 下面代码的解释,见上面图片:

var times = [
    { value: 0, time: 100 },
    { value: 1, time: 600 },
    { value: 2, time: 400 },
    { value: 3, time: 700 },
    { value: 4, time: 200 }
];

// Delay each item by time and project value;
var source = Rx.Observable.from(times)
  .flatMap(function (item) {
    return Rx.Observable
      .of(item.value)
      .delay(item.time);
  })
  .debounce(500 /* ms */);

var subscription = source.subscribe(
  function (x) {
    console.log('Next: %s', x);
  },
  function (err) {
    console.log('Error: %s', err);
  },
  function () {
    console.log('Completed');
  });
Next: 0
Next: 2
Next: 4
Completed
Christian-Yang commented 7 years ago

Delay 操作符

shift the emissions from an Observable forward in time by a particular amount

The Delay operator modifies its source Observable by pausing for a particular increment of time (that you specify) before emitting each of the source Observable’s items. This has the effect of shifting the entire sequence of items emitted by the Observable forward in time by that specified increment. 延迟运算符通过在发出每个源Observable的项目之前暂停一段特定的时间增量(您指定的)来修改其来源可观察值。 这具有将Observable发射的整个序列的时间向前移动指定的增量的效果 delay1 In RxJS you can set the per-item delay in two ways: by passing a number of milliseconds into the delay operator (which will delay each emission by that amount of time), or by passing in a Date object [which will delay the beginning of the sequence of emissions until that absolute point in time]. 在RxJS中,您可以通过两种方式设置每个项目的延迟:通过传递延迟运算符(将延迟每个发射的时间量),或通过传递一个Date对象[这将延迟队列排放开始时间,直到绝对时间点]。

This operator operates by default on the timeout Scheduler, but you can override this by passing in another Scheduler as an optional second parameter. 此运算符默认在timeout Scheduler (超时计划程序)上运行,但您可以通过将另一个调度程序作为可选的第二个参数传递来覆盖此操作。 [意思是说delay这个函数有第二个参数?这个参数可以覆盖掉默认的timeout Scheduler]

第二种使用方式:通过传递一个Date对象[这将延迟队列排放开始时间,直到绝对时间点

var source = Rx.Observable.range(0, 3)
    .delay(new Date(Date.now() + 1000));

var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x.toString()); },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); }); 
Next: 0
Next: 1
Next: 2
Completed

第一种使用方式:通过传递延迟运算符(将延迟每个发射的时间量)

var source = Rx.Observable.range(0, 3)
    .delay(1000);

var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x.toString()); },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); });
Next: 0
Next: 1
Next: 2
Completed
Christian-Yang commented 7 years ago

FlatMap (mergeMap)操作符

transform the items emitted by an Observable into Observables, then flatten the emissions from those into a single Observable 将Observable发出的item转换为Observables,然后将其中的排放物平坦化为单个Observable (注意,这个意思是:会把原来的stream投射出的元素转换为另一个stream,然后最后 将这些stream合并为一个stream进行投射出来。flat的意思就是扁平,平面) [为了理解好这句话,下面转载的FlatMap详情非常有用:]https://juejin.im/entry/58be21af570c3500622a6a3c flatmap The FlatMap operator transforms an Observable by applying a function that you specify to each item emitted by the source Observable, where that function returns an Observable that itself emits items. FlatMap then merges the emissions of these resulting Observables, emitting these merged results as its own sequence. FlatMap操作符通过将您指定的函数应用于源Observable发出的每个item来转换Observable,该函数返回自身发出的Observable。 FlatMap然后合并这些结果的可观察物的排放,将这些合并结果作为自己的队列发布。 This method is useful, for example, when you have an Observable that emits a series of items that themselves have Observable members or are in other ways transformable into Observables, so that you can create a new Observable that emits the complete collection of items emitted by the sub-Observables of these items. 这个方法很有用,例如,当你有一个Observable发出一系列自己拥有Observable成员或者以其他方式转换成Observables的项目,这样你就可以创建一个新的Observable来发出完整的项目集合(通过合并)这些项目的子观察值。 Note that FlatMap merges the emissions of these Observables, so that they may interleave. 请注意,FlatMap会合并这些Observables的排放,所以它们可能会交错。 In several of the language-specific implementations there is also an operator that does not interleave the emissions from the transformed Observables, but instead emits these emissions in strict order, often called ConcatMap or something similar. 在一些特定语言的实现中,还有一个运算符不会交换已变换的“可观测值”的排放,而是以严格的顺序排放这些排放,通常称为ConcatMap或类似的。 (这里的意思可以通过下面的代码看出区别:)

<!DOCTYPE html>
<html>
<head>
    <title></title>
    <script type="text/javascript" src="http://apps.bdimg.com/libs/jquery/2.1.4/jquery.min.js"></script>

    <script type="text/javascript" src="./rxjs/dist/rx.lite.js"></script>
</head>
<body>
<script>
// flatMap example
var stream1 = Rx.Observable.from([10, 20, 30]);
var stream2 = function(n) {
  return Rx.Observable.from([n+1, n+2, n+3]);
};

stream1.flatMap(function(n) {
  console.log("(" + n + ")");
  return stream2(n);
}).subscribe(function(n) {
  console.log(n);
}, function () {
  console.log('err');
}, function () {
  console.log('finished');
});
</script>
</body>
</html>

flatmapresult

<!DOCTYPE html>
<html>
<head>
    <title></title>
    <script type="text/javascript" src="http://apps.bdimg.com/libs/jquery/2.1.4/jquery.min.js"></script>

    <script type="text/javascript" src="./rxjs/dist/rx.lite.js"></script>
</head>
<body>
<script>
// flatMap example
var stream1 = Rx.Observable.from([10, 20, 30]);
var stream2 = function(n) {
  return Rx.Observable.from([n+1, n+2, n+3]);
};

stream1.concatMap(function(n) {
  console.log("(" + n + ")");
  return stream2(n);
}).subscribe(function(n) {
  console.log(n);
}, function () {
  console.log('err');
}, function () {
  console.log('finished');
});
</script>
</body>
</html>

concatmapresult

FlatMap和Map的区别:

原文转自:http://blog.csdn.net/u012631731/article/details/71597541

let source1: string = Observable.range(1,4).map((item) => {
        return item;
});
source1.subscribe((itam) => {
       console.log('return value: ' + item);
});
return value:1
return value:2
return value:3
return value:4
let source2: string = Observable.range(1,4).flatMap((item) => {
        return Observable.of(item);
});
source2.subscribe((itam) => {
       console.log('return value: ' + item);
});
return value:1
return value:2
return value:3
return value:4

在RxJS中map和flatMap都可以对单个对象进行操作,最主要的区别在于返回的数据类型,map的返回就是原本的数据类型即可,而flatMap返回的是一个流数据,即要在数据返回是增加Observable的转换。 这里的return的单粒的,如果想直接返回一个数组,直接使用toArray()方法即可。

let source2: string = Observable.range(1,4).flatMap((item) => {
      return Observable.of(item);
}).toArray();
source2.subscribe((itam) => {
    console.log('return value: ' + item);
});
return value:[1,2,3,4]

关于FlatMap的详细的解释,转自:https://juejin.im/entry/58be21af570c3500622a6a3c

Note: stream <==> Observable sequence

看官网或者其他文档对flatMap的解释是,会把原来的stream投射出的元素转换为另一个stream,然后最后 将这些stream合并为一个stream进行投射出来。所以,对于下面的代码,我的直觉认为,输出结果应该是: 11, 12, 13, 21, 22, 23, 31, 32, 33。但是在jsbin上运行输出后,输出的结果 是:11, 12, 21, 13, 22, 31, 23, 32, 33,虽然这肯定是正确的输出,但是不明白。

// flatMap example
var stream1 = Rx.Observable.from([10, 20, 30]);
var stream2 = function(n) {
      return Rx.Observable.from([n+1, n+2, n+3]);
};

stream1.flatMap(function(n) {
       return stream2(n);
}).subscribe(function(n) {
       console.log(n);
}, function () {
      console.log('err');
}, function () {
     console.log('finished'); 
});

我绞尽脑汁想了想也不明白为什么12后面就是21了,而不是13。我去翻了翻 rxjs 的源码,想去 一探究竟,但是除了发现 flatMap 是 mergeMap 的别名外,似乎没其他收获。

后来我在代码里添加了一行输出:

stream1.flatMap(function(n) {
     console.log("(" + n + ")");
      return stream2(n);
}).subscribe(function(n) {
      console.log(n);
}, function () {
     console.log('err');
}, function () {
     console.log('finished');
});

现在输出结果变成了:(10), 11, (20), 12, 21, (30) 13, 22, 31, 23, 32, 33,从这个输出中 我立马发现了一些东西,然后明白这个flatMap后的stream投射方式是怎么样的了。

第一个stream1,在投射元素的时候,通过flatMap会将投射出的元素转换成stream,我们称为stream2-1, 因为它是stream2的第一部分。这个时候,投射图是这样的:

-10------------------> stream1时间线
---11-------------------> stream2-1时间线,比stream1差一个时间点。
// (10), 11

也就是说,10 转换出来的stream2-1也立马开始投射它自己的元素了,先紧跟 stream1投射出来的10, 投射出11,毕竟stream2-1是10转换出来的。10这时转换完成了,紧接着20开始转换,转换出的stream我们 称为stream2-2。在这个时间点上(20转换的点上),stream2-1已经开始投射12,然后stream2-2转换出来了 并投射21,所以有下面的图:

-10---20---------------> stream1时间线
---11-12-----------------> stream2-1时间线
--------21---------------> stream2-2时间线

// (10), 11, (20), 12, 21

这时stream1剩余最后一个元素,30,紧跟着20的投射转换后,也开始转换,这个点上,stream2-1开始投射 13,并且stream2-2开始投射22,这两个投射完后,转换完成的stream2-3开始投射元素31,所以有下面的图:

-10---20---30------------> stream1时间线
---11-12---13------------> stream2-1时间线
--------21-22------------> stream2-2时间线
-------------31-----------> stream2-3时间线
// (10), 11, (20), 12, 21, (30), 13, 22, 31

到这里应该就很容易理解了。flatMap最后flat的是stream2中投射出来的,stream2是由stream2-1, stream2-2,stream2-3投射出来的元素组成。 flatmaplatest

扩展理解 flatMapFirst 和 flatMapLatest

flatMapFirst instead propagates the first Observable exclusively until it completes before it begins subscribes to the next Observable.

从这里也可以很好的理解flatMapLatest了。假如stream1中的元素都是网络请求,10代表网络请求发出 去了,而由10转换出来的stream2-1代表着整个和这个网络请求相关操作,也就是可以认为stream2-1代表着收到网络 请求结果后,对数据处理,更新界面等等。然后这时,20开始投射并进行转换为stream2-2,这可能代表着 用户再次发送了一个网络请求,这个时候,从上面的图中可以看出,20转换出来的stream2-2开始投射的时候, stream2-1已经投射到了12了,这个12可以代表着stream2-1发出的网络请求已经返回结果,要对结果进行 处理。而stream2-2这时候的开始,明显是一个重复的网络请求操作,是应该被取消的,毕竟第一个网络请求 还没被处理完成呢。这个时候使用flatMapFirst,可以取消后续(stream2-2)的操作,除非第一个操作(stream2-1)已经完成了。 然后理解 flatMapLatest 也就不难了,就是要取消stream2-1(未完成),使用stream2-2(最新的)。这时如果用户 不小心又触发一个网络请求(stream2-3),那么flatMapLatest会取消steram2-2,使用stream2-3。 当然,在这样的场景下,我们是应该使用flatMapFirst的!

<!DOCTYPE html>
<html>
<head>
    <title></title>
    <script type="text/javascript" src="http://apps.bdimg.com/libs/jquery/2.1.4/jquery.min.js"></script>

    <script type="text/javascript" src="./rxjs/dist/rx.lite.js"></script>
</head>
<body>
<script>
// flatMap example
var stream1 = Rx.Observable.from([10, 20, 30]);
var stream2 = function(n) {
  return Rx.Observable.from([n+1, n+2, n+3]);
};

stream1.flatMapLatest(function(n) {
  console.log("(" + n + ")");
  return stream2(n);
}).subscribe(function(n) {
  console.log(n);
}, function () {
  console.log('err');
}, function () {
  console.log('finished');
});
</script>
</body>
</html>

flatmaplatest1

关于FlatMap的另外的一个讲解,转自:http://blog.csdn.net/tianjun2012/article/details/51253877

如果你有一个Observable它的结果是许多嵌套的Observable将则怎么做?大部分的时间,你想到的就是统一这些嵌套的Observable的元素到一个单个的序列中。这也是flatMap所要明确干的事。 (看下面的那个图就能明白这个句话的意思:先看这句,“如果你有一个Observable”,在图上就是A这个Observable这个东西,它就是一个Observable,接着看“它的结果是许多嵌套的Observable这句的意思是,Observable A这个可观察对象本身是由三个A1,A2,A3可观察对象组成的。”所以这个时候,A本身可以被订阅,这个时候,订阅A的人,可以获得A1,A2,A3这三个可观察对象。而A1,A2,A3,这三个东西本身也是可观察对象,所以他们也能被订阅) flatMap操作符需要一个Observable参数,这个参数的元素也是Observable,并且返回只有一个孩子Observable的平坦值的Observable。 (就是说flatMap把上面的三个Observable A1,A2,A3发射出的值,合并成了一个Observable) flatmap

flatMap是一个强大的操作符,但是它比起我们目前学到的其他操作符都要难理解些,把它认为是这些Observable的concatAll()函数。

contcatAll是需要一个数组的数组函数,并且返回一个平坦的单一的数组,这个数组包含所有子数组的值,而不是这些子数组本身。我们可以使用reduce去建一个这样的函数:

function concatAll(source) {
      return source.reduce(function(a, b) {
                 return a.concat(b);
      });
}

我们可以这样使用它:

concatAll([[0, 1, 2], [3, 4, 5], [6, 7, 8]]);
// [0, 1, 2, 3, 4, 5, 6, 7, 8]

flatMap做同样的事,但不是平坦的arrays而是observable…… qweqwe 理解flatMap操作符,可以这么想,有一根大水管,向外出水,但是当我们切开这个打水管,看到切面,原来这根大水管里面由三个小水管组成,flatMap相当于一个转换头,将三根小水管的水合并成为了大水管中的水。

那么这个时候,这个问题也就能想明白了:https://stackoverflow.com/questions/36280565/angular2-http-get-location-header-of-201-response/36283142 为什么下面的这段代码是错误的:

 return this._http.post(this._heroesUrl, body, options)
            // Hero is created, but unable to get URL from the Location header!
            .map((res:Response) => this._http.get(res.headers.get('Location')).map((res:Response) => res.json()))
            .catch(this.handleError)

而下面的这段代码是正确的:

return this._http.post(this._heroesUrl, body, options)
        .flatMap((res:Response) => {
          var location = res.headers.get('Location');
          return this._http.get(location);
        }).map((res:Response) => res.json()))
        .catch(this.handleError)

最里面用http.get.map()没有问题,返回的是Observable。但是外面不能用map而必须用flatMap。 因为get.map()返回的一系列都是Observable,而map不具有flat的能力,只有flatMap能将一系列的Observable合并成为一个Observable从而达到扁平的效果。

flatMap适用于Observable里面还是Observable的情况

 return this._http.get(location);
        }).map((res:Response) => res.json())
Christian-Yang commented 7 years ago

switchMap (flatMapLatest)操作符

注意两段代码:一段使用switchMap 另外一段使用flatMap

使用switchMap如下:

<!DOCTYPE html>
<html>
<head>
    <title></title>
    <script type="text/javascript" src="http://apps.bdimg.com/libs/jquery/2.1.4/jquery.min.js"></script>

    <script type="text/javascript" src="./rxjs/dist/rx.lite.js"></script>
</head>
<body>
<script>
// flatMap example
var stream1 = Rx.Observable.from([10, 20, 30]);
var stream2 = function(n) {
  return Rx.Observable.from([n+1, n+2, n+3]);
};

stream1.switchMap(function(n) {
  console.log("(" + n + ")");
  return stream2(n);
}).subscribe(function(n) {
  console.log(n);
}, function () {
  console.log('err');
}, function () {
  console.log('finished');
});
</script>
</body>
</html>

使用flatMapLatest如下:

<!DOCTYPE html>
<html>
<head>
    <title></title>
    <script type="text/javascript" src="http://apps.bdimg.com/libs/jquery/2.1.4/jquery.min.js"></script>

    <script type="text/javascript" src="./rxjs/dist/rx.lite.js"></script>
</head>
<body>
<script>
// flatMap example
var stream1 = Rx.Observable.from([10, 20, 30]);
var stream2 = function(n) {
  return Rx.Observable.from([n+1, n+2, n+3]);
};

stream1.flatMapLatest(function(n) {
  console.log("(" + n + ")");
  return stream2(n);
}).subscribe(function(n) {
  console.log(n);
}, function () {
  console.log('err');
}, function () {
  console.log('finished');
});
</script>
</body>
</html>

这两段代码的运行结果都是如下: switchmap

后来发现 flatMapLatest 被重命名为 switchMap

Github上一个讨论的帖子地址为:https://github.com/ReactiveX/rxjs/issues/1855

RxJs官方给出的名字变更地址和删除地址为:https://github.com/ReactiveX/RxJS/blob/master/MIGRATION.md#operators-renamed-or-removed

switchmap

Christian-Yang commented 7 years ago

From 操作符

convert various other objects and data types into Observables 将各种其他对象和数据类型转换为Observables

When you work with [Observables], it can be more convenient if all of the data you mean to work with can be represented as [Observables], rather than as a mixture of [Observables] and other types. This allows you to use a single set of operators to govern the entire lifespan of the data stream. 当您使用[Observables]时,如果您要使用的所有数据都可以表示为[Observables],而不是[Observables]和其他类型的混合,则可以更方便。 这允许您使用一组运算符来管理数据流的整个生命周期。

Iterables, for example, can be thought of as a sort of synchronous Observable; Futures, as a sort of Observable that always emits only a single item. By explicitly converting such objects to Observables, you allow them to interact as peers with other Observables. 例如,Iterables可以被认为是一种同步的Observable; Futures,作为一种可观察,总是只发出一个项目。 通过明确地将这些对象转换为Observables,您可以让它们与其他Observables进行交互。

For this reason, most ReactiveX implementations have methods that allow you to convert certain language-specific objects and data structures into Observables. 因此,大多数ReactiveX实现具有允许您将某些特定语言的对象和数据结构转换为Observables的方法。 from In RxJS, the from operator converts an array-like or iterable object into an Observable that emits the items in that array or iterable. A String, in this context, is treated as an array of characters.

在RxJS中,from操作符将数组或可迭代对象转换为Observable,该对象可发出该数组中的项或iterable(迭代器)。 在此上下文中,String被视为字符数组。

This operator also takes three additional, optional parameters: 该操作符还需要三个可选参数: 2:a transforming function that takes an item from the array or iterable as input and produces an item to be emitted by the resulting Observable as output 一个转换函数,它从数组中获取一个项,或者一个迭代器作为输入,并产生由Observable作为输出发出的项目 3:a second argument to pass into the transforming function as additional context information 第二个参数作为附加上下文信息传递给转换函数 4:a Scheduler on which this operator should operate 这个操作符应该操作的一个计划程序(Scheduler)

from(参数1,参数2,参数3,参数4)
            ||
            ||
            \/
from(参数1,转换函数,转换函数的另外一个参数,Scheduler)

代码举例:

这里的arguments是JavaScript arguments对象,可以看这里进行了解:http://www.cnblogs.com/lwbqqyumidi/archive/2012/12/03/2799833.html

 Array-like object (arguments) to Observable
类似Array的对象(arguments)到Observable
function f() {
// 这里的arguments是JavaScript arguments对象,可以看这里进行了解:

http://www.cnblogs.com/lwbqqyumidi/archive/2012/12/03/2799833.html

  return Rx.Observable.from(arguments);
}
f(1, 2, 3).subscribe(
  function (x) { console.log('Next: ' + x); },
  function (err) { console.log('Error: ' + err); },
  function () { console.log('Completed'); });
// Any iterable object...
// Set
var s = new Set(['foo', window]);
Rx.Observable.from(s).subscribe(
  function (x) { console.log('Next: ' + x); },
  function (err) { console.log('Error: ' + err); },
  function () { console.log('Completed'); });
Next: foo
Next: window
Completed
// Map
var m = new Map([[1, 2], [2, 4], [4, 8]]);
Rx.Observable.from(m).subscribe(
  function (x) { console.log('Next: ' + x); },
  function (err) { console.log('Error: ' + err); },
  function () { console.log('Completed'); });
Next: [1, 2]
Next: [2, 4]
Next: [4, 8]
Completed
// String
Rx.Observable.from("foo").subscribe(
  function (x) { console.log('Next: ' + x); },
  function (err) { console.log('Error: ' + err); },
  function () { console.log('Completed'); });
Next: f
Next: o
Next: o
Completed

使用from函数第二个参数的例子:from([1, 2, 3], function (x) { return x + x; })

// Using an arrow function as the map function to manipulate the elements
Rx.Observable.from([1, 2, 3], function (x) { return x + x; }).subscribe(
  function (x) { console.log('Next: ' + x); },
  function (err) { console.log('Error: ' + err); },
  function () { console.log('Completed'); });
Next: 2
Next: 4
Next: 6
Completed

Rx.Observable.from({length: 5}, function(v, k) { return k; })的工作原理:

array 原理就是,javascript使用鸭子类型,当你想调用一个Object中的foo方法的时候,如果这个Object是bar类型。也就是说typeof Object = bar 。它并不检查这个Object到底是否真是一个bar类型,javascript只检测这个Object中是否有这个foo方法。

To sum up: 总结一下:

let fakeArray = {length:5};
fakeArray.length //5
let realArray = [1,2,3,4,5];
realArray.length //5

第一个就像“假的”JavaScript数组(具有属性长度)。 当Array.from检查属性长度时,它返回5,因此它创建具有长度为5的真实数组。您可以想数组一样的调用它,并认为它是一个数组。然后会自动的使用Array.from()函数创建一个数组,Array.from()函数的信息: https://developer.mozilla.org/zh-CN/docs/Web/JavaScript/Reference/Global_Objects/Array/from 第二部分只是lamba,它使用索引值(第二个参数)填充数组。也就是调用Array.from这个函数来填充数组。 这种技术对于mock某些对象(仿真某些对象,假对象)进行测试非常有用。 例如:

let ourFileReader = {}
ourFileReader.result = "someResult"
//ourFileReader will mock real FileReader
我们的FileReader会模拟真正的FileReader

var arr1 = Array.from({ length: 5 // Create 5 indexes with undefined values }, function(v, k) { // Run a map function on said indexes using v(alue)[undefined] and k(ey)[0 to 4] return k; // Return k(ey) as value for this index } ); console.log(arr1);

https://stackoverflow.com/questions/40528557/how-does-array-fromlength-5-v-k-k-work

// Generate a sequence of numbers
Rx.Observable.from({length: 5}, function(v, k) { return k; }).subscribe(
  function (x) { console.log('Next: ' + x); },
  function (err) { console.log('Error: ' + err); },
  function () { console.log('Completed'); });
Next: 0
Next: 1
Next: 2
Next: 3
Next: 4
Completed
Christian-Yang commented 7 years ago

Create 操作符

create an Observable from scratch by means of a function. 通过一个函数从头创建一个Observable。

create

You can create an Observable from scratch by using the Create operator. You pass this operator a function that accepts the observer as its parameter. Write this function so that it behaves as an Observable — by calling the observer’s onNext, onError, and onCompleted methods appropriately.

您可以使用Create操作符从头创建一个Observable。 您通过此操作符接受观察者作为其参数的函数。 编写这个函数,使其表现为一个Observable - 通过适当地调用观察者的onNext,onError和onCompleted方法。 (也就是说这个create函数的参数应该像一个observer,这个observer里面应该有onNext,onError和onCompleted方法)

A well-formed finite Observable must attempt to call either the observer’s onCompleted method exactly once or its onError method exactly once, and must not thereafter attempt to call any of the observer’s other methods. 一个格式良好的Observable必须尝试一次调用观察者的onCompleted方法或者其onError方法一次,而且此后不能尝试调用任何观察者的其他方法。

/* Using a function  使用一个函数*/
var source = Rx.Observable.create(function (observer) {
    observer.onNext(42);
    observer.onCompleted();

    // Note that this is optional, you do not have to return this if you require no cleanup
    //请注意,这是可选的,如果不需要清理,则不需要返回
    return function () { console.log('disposed'); };
});

var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x); },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); });

运行结果

Next: 42
Completed
/* Using a disposable 使用一次性 */
var source = Rx.Observable.create(function (observer) {
    observer.onNext(42);
    observer.onCompleted();

    // Note that this is optional, you do not have to return this if you require no cleanup
   //请注意,这是可选的,如果不需要清理,则不需要返回
    return Rx.Disposable.create(function () {
        console.log('disposed');
    });
});

var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x); },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); });

Next: 42
Completed

Rx.Observable.create

var observable = Rx.Observable
    .create(function(observer) {
        observer.next('Semlinker'); // RxJS 4.x 以前的版本用 onNext
        observer.next('Lolo');
    });

// 订阅这个 Observable    
observable.subscribe(function(value) {
    console.log(value);
});

以上代码运行后,控制台会依次输出 'Semlinker' 和 'Lolo' 两个字符串。

需要注意的是,很多人认为 RxJS 中的所有操作都是异步的,但其实这个观念是错的。RxJS 的核心特性是它的异步处理能力,但它也是可以用来处理同步的行为。具体示例如下:

var observable = Rx.Observable
    .create(function(observer) {
        observer.next('Semlinker'); // RxJS 4.x 以前的版本用 onNext
        observer.next('Lolo');
    });

console.log('start');
observable.subscribe(function(value) {
    console.log(value);
});
console.log('end');

以上代码运行后,控制台的输出结果:

start
Semlinker
Lolo
end

当然我们也可以用它处理异步行为:

var observable = Rx.Observable
    .create(function(observer) {
        observer.next('Semlinker'); // RxJS 4.x 以前的版本用 onNext
        observer.next('Lolo');

        setTimeout(() => {
            observer.next('RxJS Observable');
        }, 300);
    })

console.log('start');
observable.subscribe(function(value) {
    console.log(value);
});
console.log('end');

以上代码运行后,控制台的输出结果:

start
Semlinker
Lolo
end
RxJS Observable

从以上例子中,我们可以得出一个结论 - Observable 可以应用于同步和异步的场合。

Christian-Yang commented 7 years ago

Distinct (distinctUntilChanged)

suppress duplicate items emitted by an Observable 抑制Observable发出的重复项目

dis1

The Distinct operator filters an Observable by only allowing items through that have not already been emitted.

Distinct操作符可以过滤一个Observable,只允许没有已经被发出的item通过。

In some implementations there are variants that allow you to adjust the criteria by which two items are considered “distinct.” 在一些RX的实现中,有一些变体,允许你调整标准,通过考虑两个item的区别。 In some, there is a variant of the operator that only compares an item against its immediate predecessor for distinctness, thereby filtering only consecutive duplicate items from the sequence. 在某些情况下,有一个操作符的变体,它只比较一个项目与其直接上一个元素的不同,从而仅从序列中过滤连续的重复项目。 dis2

In RxJS, the distinct operator has two optional parameters: 在RxJS中,distinct运算符有两个可选参数: a function that accepts an item emitted by the source Observable and returns a key which will be used instead of the item itself when comparing two items for distinctness 一个函数接受源Observable发出的item,并返回一个将用于替代item本身的key,当比较两个item的区别的时候。 a function that accepts two items (or two keys) and compares them for distinctness, returning false if they are distinct (an equality function is the default if you do not supply your own function here) 一个函数可以接受两个item(或两个key),并将它们区分开来,如果它们不同,则返回false(如果在这里不提供自己的函数,则默认为相等函数)

/* Without key selector */
var source = Rx.Observable.fromArray([
        42, 24, 42, 24
    ])
    .distinct();

var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x.toString()); },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); });
Next: 42
Next: 24
Completed
/* With key selector */
var source = Rx.Observable.fromArray([
        {value: 42}, {value: 24}, {value: 42}, {value: 24}
    ])
    .distinct(function (x) { return x.value; });

var subscription = source.subscribe(
    function (x) {
        console.log('Next: ' + x.toString());
    },
    function (err) {
        console.log('Error: ' + err);
    },
    function () {
        console.log('Completed');
    });
Next: { value: 42 }
Next: { value: 24 }
Completed

dis3

RxJS also has a distinctUntilChanged operator. It only compares emitted items from the source Observable against their immediate predecessors in order to determine whether or not they are distinct. It takes the same two optional parameters as the distinct operator. RxJS有一个distinctUntilChanged操作符, 它只比较源Observable与其直接前辈的发射项目,以确定它们是否不同。 (只要与前一个不同就会被保存下来,而不管之前是否发出过)它采用与distinct运算符相同的两个可选参数。

/* Without key selector */
var source = Rx.Observable.fromArray([
        24, 42, 24, 24
    ])
    .distinctUntilChanged();

var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x); },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); });
Next: 24
Next: 42
Next: 24
Completed
/* With key selector */
var source = Rx.Observable.fromArray([
        {value: 24}, {value: 42}, {value: 42}, {value: 24}
    ])
    .distinctUntilChanged(function (x) { return x.value; });

var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x.toString()); },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); });
Next: { value: 24 }
Next: { value: 42 }
Next: { value: 24 }
Completed
Christian-Yang commented 7 years ago

forkJoin (zip)

在http://reactivex.io/documentation/operators.html页面点击forkJoin之后会自动跳转到zip页面。

forkJoin在rxjs中的实现是zip,页面上有一句话:RxJS implements this operator as zip and zipArray.

combine the emissions of multiple Observables together via a specified function and emit single items for each combination based on the results of this function 通过指定的函数将多个Observables的emit合并在一起,并根据此函数的结果为每个组合emit单个items

The Zip method returns an Observable that applies a function of your choosing to the combination of items emitted, in sequence, by two (or more) other Observables, with the results of this function becoming the items emitted by the returned Observable

Zip方法返回一个Observable,它将您选择的函数应用于由两个(或多个)其他Observables序列发射的项目的组合,该函数的结果将成为返回的Observable发出的项目。

It applies this function in strict sequence, so the first item emitted by the new Observable will be the result of the function applied to the first item emitted by Observable #1 and the first item emitted by Observable #2; the second item emitted by the new zip-Observable will be the result of the function applied to the second item emitted by Observable #1 and the second item emitted by Observable #2; and so forth. It will only emit as many items as the number of items emitted by the source Observable that emits the fewest items. 它以严格的顺序应用此功能,因此新的Observable发出的第一个项目将是应用于由Observable#1发出的第一个项目和Observable#2发出的第一个项目的函数的结果; 新的zip-Observable发出的第二个项目将是应用于Observable#1发出的第二个项目的函数和Observable#2发出的第二个项目的结果; 等等。 它只会发出与发出最少项目的源Observable发出的项目数量相同的项目。

image zip accepts a variable number of Observables or Promises as parameters, followed by a function that accepts one item emitted by each of those Observables or resolved by those Promises as input and produces a single item to be emitted by the resulting Observable. zip接受可变数量的Observables或Promises作为参数,后面接受一个函数,该函数接受这些Observables中的每一个发出的一个项目,或者由Promises作为输入解析,并生成由所得到的Observable发出的单个项目。

/* Using arguments */
var range = Rx.Observable.range(0, 5);

var source = Observable.zip(
    range,                                //0,1,2,3,4
    range.skip(1),                    //1,2,3,4
    range.skip(2),                    //2,3,4
    function (s1, s2, s3) {
        return s1 + ':' + s2 + ':' + s3;
    }
);

var subscription = source.subscribe(
    function (x) {
        console.log('Next: ' + x);
    },
    function (err) {
        console.log('Error: ' + err);
    },
    function () {
        console.log('Completed');
    });
Next: 0:1:2
Next: 1:2:3
Next: 2:3:4
Completed
/* Using promises and Observables */
var range = Rx.Observable.range(0, 5);

var source = Observable.zip(
    RSVP.Promise.resolve(0), //参数为Observable和Promise的情况
    RSVP.Promise.resolve(1),
    Rx.Observable.return(2)
    function (s1, s2, s3) {
        return s1 + ':' + s2 + ':' + s3;
    }
);

var subscription = source.subscribe(
    function (x) {
        console.log('Next: ' + x);
    },
    function (err) {
        console.log('Error: ' + err);
    },
    function () {
        console.log('Completed');
    });
Next: 0:1:2
Completed

image

zipArray accepts a variable number of Observables as parameters and returns an Observable that emits arrays, each one containing the nth item from each source Observable. zipArray接受可变数量的Observables作为参数,并返回一个发出数组的Observable,每个数组包含来自每个源Observable的第n个项目。

zipArray在rxjs中没有实现,在这个链接的github软件库中没有这个函数:https://github.com/Reactive-Extensions/RxJS,报出如下错误:index.html:14 Uncaught TypeError: Rx.Observable.zipArray is not a function。比如如下程序:

<!DOCTYPE html>
<html>
<head>
    <title></title>
    <script type="text/javascript" src="http://apps.bdimg.com/libs/jquery/2.1.4/jquery.min.js"></script>

    <script type="text/javascript" src="./RxJS-master/dist/rx.lite.js"></script>
</head>
<body>
<script>
/* Using arguments */
var range = Rx.Observable.range(0, 5);

var source = Rx.Observable.zipArray(
    range,
    range.skip(1), 
    range.skip(2)
);

var subscription = source.subscribe(
    function (x) {
        console.log('Next: ' + x);
    },
    function (err) {
        console.log('Error: ' + err);   
    },
    function () {
        console.log('Completed');   
    });
</script>
</body>
</html>
Next: [0,1,2]
Next: [1,2,3]
Next: [2,3,4]
Completed

image

RxJS also implements a similar operator, forkJoin. There are two varieties of this operator. The first collects the last element emitted by each of the source Observables into an array and emits this array as its own sole emitted item. You can either pass a list of Observables to forkJoin as individual parameters or as an array of Observables. RxJS还实现了一个类似的操作符,forkJoin。 这个操作员有两个品种。 第一个将每个源Observables发出的最后一个元素收集到数组中,并将该数组作为其唯一发出的项发出。 您可以将一个Observables列表作为单独的参数传递给forkJoin,也可以作为一个Observables数组。

var source = Rx.Observable.forkJoin(
    Rx.Observable.return(42),
    Rx.Observable.range(0, 10),
    Rx.Observable.fromArray([1,2,3]),
    RSVP.Promise.resolve(56)
);

var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x); },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); });
取最后一个元素:
Next: [42, 9, 3, 56]
Completed

image

A second variant of forkJoin exists as a prototype function, and you call it on an instance of one source Observable, passing it another source Observable as a parameter. As a second parameter, you pass it a function that combines the final item emitted by the two source Observables into the sole item to be emitted by the resulting Observable. forkJoin的第二个变体作为原型函数存在,并且在一个源Observable的实例上调用它,将另一个源Observable作为参数传递。 作为第二个参数,您传递一个将由两个源观察器发出的最终项目合并到由所得到的Observable发出的唯一项目的函数。

var source1 = Rx.Observable.return(42);
var source2 = Rx.Observable.range(0, 3);

var source = source1.forkJoin(source2, function (s1, s2) {
    return s1 + s2;
});

var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x); },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); });
Next: 44
Completed