RxJS 核心概念之Subject
什么是Subject?在RxJS中,Subject是一類特殊的Observable,它可以向多個Observer多路推送數值。普通的Observable并不具備多路推送的能力(每一個Observer都有自己獨立的執行環境),而Subject可以共享一個執行環境。
Subject是一種可以多路推送的可觀察對象。與EventEmitter類似,Subject維護著自己的Observer。
每一個Subject都是一個Observable(可觀察對象)對于一個Subject,你可以訂閱( subscribe )它,Observer會和往常一樣接收到數據。從Observer的視角看,它并不能區分自己的執行環境是普通Observable的單路推送還是基于Subject的多路推送。
Subject的內部實現中,并不會在被訂閱( subscribe )后創建新的執行環境。它僅僅會把新的Observer注冊在由它本身維護的Observer列表中,這和其他語言、庫中的 addListener 機制類似。
每一個Subject也可以作為Observer(觀察者)Subject同樣也是一個由 next(v) , error(e) ,和 complete() 這些方法組成的對象。調用 next(theValue) 方法后,Subject會向所有已經在其上注冊的Observer多路推送 theValue 。
下面的例子中,我們在Subject上注冊了兩個Observer,并且多路推送了一些數值:
var subject = new Rx.Subject();
subject.subscribe({
next: (v) => console.log('observerA: ' + v)
});
subject.subscribe({
next: (v) => console.log('observerB: ' + v)
});
subject.next(1);
subject.next(2);
控制臺輸出結果如下:
observerA: 1
observerB: 1
observerA: 2
observerB: 2
既然Subject是一個Observer,你可以把它作為 subscribe (訂閱)普通Observable時的參數,如下面例子所示:
var subject = new Rx.Subject();
subject.subscribe({
next: (v) => console.log('observerA: ' + v)
});
subject.subscribe({
next: (v) => console.log('observerB: ' + v)
});
var observable = Rx.Observable.from([1, 2, 3]);
observable.subscribe(subject); // 你可以傳遞Subject來訂閱observable
執行后結果如下:
observerA: 1
observerB: 1
observerA: 2
observerB: 2
observerA: 3
observerB: 3
通過上面的實現:我們發現可以通過Subject將普通的Observable單路推送轉換為多路推送。這說明了Subject的作用——作為單路Observable轉變為多路Observable的橋梁。
還有幾種特殊的 Subject 類型,分別是 BehaviorSubject , ReplaySubject ,和 AsyncSubject 。
多路推送的Observable
在以后的語境中,每當提到“多路推送的Observable”,我們特指通過Subject構建的Observable執行環境。否則“普通的Observable”只是一個不會共享執行環境并且被訂閱后才生效的一系列值。
通過使用Subject可以創建擁有相同執行環境的多路的Observable。
下面展示了 多路 的運作方式:Subject從普通的Observable訂閱了數據,然后其他Observer又訂閱了這個Subject,示例如下:
var source = Rx.Observable.from([1, 2, 3]);
var subject = new Rx.Subject();
var multicasted = source.multicast(subject);
// 通過`subject.subscribe({...})`訂閱Subject的Observer:
multicasted.subscribe({
next: (v) => console.log('observerA: ' + v)
});
multicasted.subscribe({
next: (v) => console.log('observerB: ' + v)
});
// 讓Subject從數據源訂閱開始生效:
multicasted.connect();
multicast 方法返回一個類似于Observable的可觀察對象,但是在其被訂閱后,它會表現Subject的特性。 multicast 返回的對象同時是 ConnectableObservable 類型的,擁有 connect() 方法。
connect() 方法非常的重要,它決定Observable何時開始執行。由于調用 connect() 后,Observable開始執行,因此, connect() 會返回一個 Subscription 供調用者來終止執行。
引用計數
通過手動調用 connect() 返回的Subscription控制執行十分繁雜。通常,我們希望在有第一個Observer訂閱Subject后 自動 connnect ,當所有Observer都取消訂閱后終止這個Subject。
我們來分析一下下面例子中subscription的過程:
-
第一個Observer 訂閱了多路推送的 Observable
-
多路Observable被連接
-
向第一個Observer發送 值為 0 的 next 通知
-
第二個Observer訂閱了多路推送的 Observable
-
向第一個Observer發送 值為 1 的 next 通知
-
向第二個Observer發送 值為 1 的 next 通知
-
第一個Observer取消了對多路推送的Observable的訂閱
-
向第二個Observer發送 值為 2 的 next 通知
-
第二個Observer取消了對多路推送的Observable的訂閱
-
取消對多路推送的Observable的連接
通過顯式地調用 connect() ,代碼如下:
var source = Rx.Observable.interval(500);
var subject = new Rx.Subject();
var multicasted = source.multicast(subject);
var subscription1, subscription2, subscriptionConnect;
subscription1 = multicasted.subscribe({
next: (v) => console.log('observerA: ' + v)
});
subscriptionConnect = multicasted.connect();
setTimeout(() => {
subscription2 = multicasted.subscribe({
next: (v) => console.log('observerB: ' + v)
});
}, 600);
setTimeout(() => {
subscription1.unsubscribe();
}, 1200);
setTimeout(() => {
subscription2.unsubscribe();
subscriptionConnect.unsubscribe();
}, 2000);
如果你不想顯式地調用 connect() 方法,可以在ConnectableObservable類型的Observable上調用 refCount() 方法。方法會進行引用計數:記錄Observable被訂閱的行為。當訂閱數從 0 到 1 時 refCount() 會調用 connect() 方法。到訂閱數從 1 到 0 ,他會終止整個執行過程。
refCount 使得多路推送的Observable在被訂閱后自動執行,在所有觀察者取消訂閱后,停止執行。
下面是示例:
var source = Rx.Observable.interval(500);
var subject = new Rx.Subject();
var refCounted = source.multicast(subject).refCount();
var subscription1, subscription2, subscriptionConnect;
console.log('observerA subscribed');
subscription1 = refCounted.subscribe({
next: (v) => console.log('observerA: ' + v)
});
setTimeout(() => {
console.log('observerB subscribed');
subscription2 = refCounted.subscribe({
next: (v) => console.log('observerB: ' + v)
});
}, 600);
setTimeout(() => {
console.log('observerA unsubscribed');
subscription1.unsubscribe();
}, 1200);
setTimeout(() => {
console.log('observerB unsubscribed');
subscription2.unsubscribe();
}, 2000);
執行輸出結果如下:
observerA subscribed
observerA: 0
observerB subscribed
observerA: 1
observerB: 1
observerA unsubscribed
observerB: 2
observerB unsubscribed
只有ConnectableObservables擁有 refCount() 方法,調用后會返回一個 Observable 而不是新的ConnectableObservable。
BehaviorSubject
BehaviorSubject 是Subject的一個衍生類,具有“最新的值”的概念。它總是保存最近向數據消費者發送的值,當一個Observer訂閱后,它會即刻從 BehaviorSubject 收到“最新的值”。
BehaviorSubjects非常適于表示“隨時間推移的值”。舉一個形象的例子,Subject表示一個人的生日,而Behavior則表示一個人的歲數。(生日只是一天,一個人的歲數會保持到下一次生日之前。)
下面例子中,展示了如何用 0 初始化BehaviorSubject,當Observer訂閱它時, 0 是第一個被推送的值。緊接著,在第二個Observer訂閱BehaviorSubject之前,它推送了 2 ,雖然訂閱在推送 2 之后,但是第二個Observer仍然能接受到 2 :
var subject = new Rx.BehaviorSubject(0 /* 初始值 */);
subject.subscribe({
next: (v) => console.log('observerA: ' + v)
});
subject.next(1);
subject.next(2);
subject.subscribe({
next: (v) => console.log('observerB: ' + v)
});
subject.next(3);
輸出結果如下:
observerA: 0
observerA: 1
observerA: 2
observerB: 2
observerA: 3
observerB: 3
ReplaySubject
ReplaySubject 如同于 BehaviorSubject 是 Subject 的子類。通過 ReplaySubject 可以向新的訂閱者推送舊數值,就像一個錄像機 ReplaySubject 可以記錄Observable的一部分狀態(過去時間內推送的值)。
.一個 ReplaySubject 可以記錄Observable執行過程中推送的多個值,并向新的訂閱者回放它們。
你可以指定回放值的數量:
var subject = new Rx.ReplaySubject(3 /* 回放數量 */);
subject.subscribe({
next: (v) => console.log('observerA: ' + v)
});
subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);
subject.subscribe({
next: (v) => console.log('observerB: ' + v)
});
subject.next(5);
輸出如下:
observerA: 1
observerA: 2
observerA: 3
observerA: 4
observerB: 2
observerB: 3
observerB: 4
observerA: 5
observerB: 5
除了回放數量,你也可以以毫秒為單位去指定“窗口時間”,決定ReplaySubject記錄多久以前Observable推送的數值。下面的例子中,我們把回放數量設置為 100 ,把窗口時間設置為 500 毫秒:
var subject = new Rx.ReplaySubject(100, 500 /* windowTime */);
subject.subscribe({
next: (v) => console.log('observerA: ' + v)
});
var i = 1;
setInterval(() => subject.next(i++), 200);
setTimeout(() => {
subject.subscribe({
next: (v) => console.log('observerB: ' + v)
});
}, 1000);
第二個Observer接受到 3 (600ms), 4 (800ms) 和 5 (1000ms),這些值均在訂閱之前的 500 毫秒內推送(窗口長度 1000ms - 600ms = 400ms < 500ms):
observerA: 1
observerA: 2
observerA: 3
observerA: 4
observerA: 5
observerB: 3
observerB: 4
observerB: 5
observerA: 6
observerB: 6
...
AsyncSubject
AsyncSubject是Subject的另外一個衍生類,Observable僅會在執行完成后,推送執行環境中的最后一個值。
var subject = new Rx.AsyncSubject();
subject.subscribe({
next: (v) => console.log('observerA: ' + v)
});
subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);
subject.subscribe({
next: (v) => console.log('observerB: ' + v)
});
subject.next(5);
subject.complete();
輸出結果如下:
observerA: 5
observerB: 5
AsyncSubject 與 last() 操作符相似,等待完成通知后推送執行過程的最后一個值。
來自: https://segmentfault.com/a/1190000005069851