Open varHarrie opened 6 years ago
Subject是Observable(可观察对象),每一个Subject都有可以被subscribe(订阅),但不会创建新的执行环境,只会把新的Observer注册到内部维护着的Observer清单。
Subject
Observable(可观察对象)
subscribe(订阅)
Observer
Observer清单
Subject是Observer(观察者),是一个由next(),error(),complete()方法组成的对象,可以subscribe(订阅)一个Observable,并从其中接受到推送的值。
Observer(观察者)
next()
error()
complete()
Observable
每次Subject接收到值时,都遍历Observer清单,并推送该值。
Subject可以被订阅:
const subject = new Rx.Subject() subject.subscribe((value) => console.log('A value: ', value)) subject.subscribe((value) => console.log('B value: ', value)) subject.next(1) // A value: 1 // B value: 1 subject.next(2) // A value: 2 // B value: 2
Subject可以订阅Observable,接受并转发值:
const observable = Rx.Observable.from([1, 2]) const subject = new Rx.Subject() subject.subscribe((value) => console.log('A value: ', value)) subject.subscribe((value) => console.log('B value: ', value)) observable.subscribe(subject) // A value: 1 // B value: 1 // A value: 2 // B value: 2
BehaviorSubject是Subject的子类,拥有初始值,并且总是保存着一个最新值,一旦被订阅立即向订阅者发送最新值。
BehaviorSubject
初始值
最新值
const subject = new Rx.BehaviorSubject(0) // 初始值为0 subject.subscribe((value) => console.log('A value: ', value)) subject.next(1) // A value: 0 // A value: 1 subject.next(2) // A value: 2 // B value: 2 subject.subscribe((value) => console.log('B value: ', value)) subject.next(3) // A value: 3 // B value: 3
ReplaySubject也是Subject的子类,被订阅时立即向订阅者推送最新指定数量的值。
ReplaySubject
指定数量的值
const subject = new Rx.ReplaySubject(2) // 回放2个 subject.subscribe((value) => console.log('A value: ', value)) subject.next(1) // A value: 1 subject.next(2) // A value: 2 subject.next(3) // A value: 3 subject.subscribe((value) => console.log('B value: ', value)) // B value: 2 // B value: 3 subject.next(4) // A value: 4 // B value: 4
AsyncSubject也是Subject的子类,仅会在complete()之后,向订阅者推送最后一个值。
AsyncSubject
最后一个值
const subject = new Rx.AsyncSubject() subject.subscribe((value) => console.log('A value: ', value)) subject.next(1) subject.next(2) subject.subscribe((value) => console.log('B value: ', value)) subject.next(3) subject.complete() // A value: 3 // B value: 3
在开头我们提到Subject就是一个Obserable,但新的订阅者会共用同一个执行环境:
Obserable
const source = Rx.Observable.interval(1000) .take(3) const subject = new Rx.Subject() subject.subscribe((value) => console.log('A value: ', value)) source.subscribe(subject) setTimeot(() => { subject.subscribe((value) => console.log('B value: ', value)) }, 1000) // A value: 0 // A value: 1 // B value: 1 // A value: 2 // B value: 2
B在1秒钟之后订阅,所以不会接受到第一个值0。
B
0
multicast
仔细看上面的例子,虽然定义的变量不多,但是这段代码的订阅关系还是略显复杂,这时候我们可以用multicast操作符来简化:
const source = Rx.Observable.interval(1000) .take(3) .multicast(new Rx.Subject()) source.subscribe((value) => console.log('A value: ', value)) source.connect() setTimeout(() => { source.subscribe((value) => console.log('B value: ', value)) }, 1000)
这样一来,所有的订阅者都可以直接订阅source,其中的multicast用于挂载一个Subject,并返回一个ConnectableObservable,拥有connect()方法。注意的是,必须等到connect()调用之后,Subject才真正订阅source并开始推送。
source
ConnectableObservable
connect()
connect()方法会返回一个subscription,可调用其unsubscribe进行退订。
subscription
unsubscribe
refCount
refCount必须搭配multicast使用,用于创建一个只要有第一个订阅就会自动connect()的Observable,并且当订阅数变为0后会自动终止推送。
const source = Rx.Observable.interval(1000) // 没有加take(3) .multicast(new Rx.Subject()) .refCount() let subscriptionA let subscriptionB subscriptionA = source.subscribe((value) => console.log('A value: ', value)) // 订阅数 0 => 1,自动connect,开始推送 setTimeout(() => { subscriptionB = source.subscribe((value) => console.log('B value: ', value)) // 订阅数 1 => 2 }, 1000) setTimeout(() => { subscriptionA.unsubscribe() // 订阅数 2 => 1 subscriptionB.unsubscribe() // 订阅数 1 => 0,终止推送 }, 5000)
publish
publish是multicast的简化写法:
// Subject => publish const source = Rx.Observable.interval(1000).multicast(new Rx.Subject()).refCount() const source = Rx.Observable.interval(1000).publish().refCount() // BehaviorSubject => publishBehavior const source = Rx.Observable.interval(1000).multicast(new Rx.BehaviorSubject(0)).refCount() const source = Rx.Observable.interval(1000).publishBehavior(0).refCount() // ReplaySubject => publishReplay const source = Rx.Observable.interval(1000).multicast(new Rx.ReplaySubject(3)).refCount() const source = Rx.Observable.interval(1000).publishReplay(3).refCount() // AsyncSubject => publishLast const source = Rx.Observable.interval(1000).multicast(new Rx.AsyncSubject()).refCount() const source = Rx.Observable.interval(1000).publishLast(3).refCount()
share
share是publish+refCount的简化写法:
// publish + refCount => share const source = Rx.Observable.interval(1000).publish().refCount() const source = Rx.Observable.interval(1000).share() // publishReplay + refCount => shareReplay const source = Rx.Observable.interval(1000).publishReplay(3).refCount() const source = Rx.Observable.interval(1000).shareReplay(3) // 没有shareBehavior和shareAsync、shareLast
30 天精通 RxJS
Subject、BehaviorSubject、ReplaySubject、AsyncSubject
Subject
Subject
是Observable(可观察对象)
,每一个Subject
都有可以被subscribe(订阅)
,但不会创建新的执行环境,只会把新的Observer
注册到内部维护着的Observer清单
。Subject
是Observer(观察者)
,是一个由next()
,error()
,complete()
方法组成的对象,可以subscribe(订阅)
一个Observable
,并从其中接受到推送的值。每次
Subject
接收到值时,都遍历Observer清单
,并推送该值。Subject可以被订阅:
Subject可以订阅Observable,接受并转发值:
BehaviorSubject
BehaviorSubject
是Subject
的子类,拥有初始值
,并且总是保存着一个最新值
,一旦被订阅立即向订阅者发送最新值
。ReplaySubject
ReplaySubject
也是Subject
的子类,被订阅时立即向订阅者推送最新指定数量的值
。AsyncSubject
AsyncSubject
也是Subject
的子类,仅会在complete()
之后,向订阅者推送最后一个值
。multicast、refCount、publish、share
在开头我们提到
Subject
就是一个Obserable
,但新的订阅者会共用同一个执行环境:B
在1秒钟之后订阅,所以不会接受到第一个值0
。multicast
仔细看上面的例子,虽然定义的变量不多,但是这段代码的订阅关系还是略显复杂,这时候我们可以用
multicast
操作符来简化:这样一来,所有的订阅者都可以直接订阅
source
,其中的multicast
用于挂载一个Subject
,并返回一个ConnectableObservable
,拥有connect()
方法。注意的是,必须等到connect()
调用之后,Subject
才真正订阅source
并开始推送。connect()
方法会返回一个subscription
,可调用其unsubscribe
进行退订。refCount
refCount
必须搭配multicast
使用,用于创建一个只要有第一个订阅就会自动connect()
的Observable
,并且当订阅数变为0后会自动终止推送。publish
publish
是multicast
的简化写法:share
share
是publish
+refCount
的简化写法:参考资料
30 天精通 RxJS