通俗的方式理解RxJS
通俗的方式理解Rx.js
序言
今早看 民工叔 的文章的時候, 發現對Rxjs所知甚少, 于是去官方看了下教程, 整理出一些東西, 寫成此文。
Rxjs據說會在2017年流行起來, 因為其處理異步邏輯,數據流, 事件非常擅長。 但是其學習曲線相比Promise, EventEmitter陡峭了不少。 而且民工叔也說:"由于RxJS的抽象程度很高,所以,可以用很簡短代碼表達很復雜的含義,這對開發人員的要求也會比較高,需要有比較強的歸納能力。" 本文就Rx.js的幾個核心概念做出闡述。 盡可能以通俗易懂的方式解釋這些概念。要是本文有誤或不完善的地方,歡迎指出。
Observable到底是什么
先上代碼:
let foo = Rx.Observable.create(observer => {
console.log('Hello');
observer.next(42);
});
foo.subscribe(x => console.log(x));
foo.subscribe(y => console.log(y));</code></pre>
輸出
"Hello"
42
"Hello"
42
這里可以把foo想象成一個函數, 這意味著你每次調用foo都會導致傳入Rx.Observable.create里的回調函數重新執行一次 , 調用的方式為foo.subscribe(callback), 相當于foo()。 接收函數返回值的方式也從var a = foo()改為通過傳入回調函數的方式獲取。第三行的observer.next表示返回一個值, 你可以調用多次, 每次調用observer.next后, 會先將next里的值返回給foo.subcribe里的回調函數, 執行完后再返回 。observer.complete, observer.error來控制流程。 具體看代碼:
var observable = Rx.Observable.create(observer => {
try {
observer.next(1);
console.log('hello');
observer.next(2);
observer.next(3);
observer.complete();
observer.next(4);
} catch (err) {
observer.error(err);
}
});
let = subcription = observable.subscribe(value => {
console.log(value)
})</code></pre>
運行結果:
1
hello
2
3
如上的第一個回調函數里的結構是推薦的結構。 當observable的執行出現異常的時候,通過observer.error將錯誤返回, 然而observable.subscribe的回調函數無法接收到.因為observer.complete已經調用, 因此observer.next(4)的返回是無效的. Observable不是可以返回多個值的Promise 雖然獲得Promise的值的方式也是通過then函數這種類似的方式, 但是new Promise(callback)里的callback回調永遠只會執行一次!因為 Promise的狀態是不可逆的 。
可以使用其他方式創建Observable, 看代碼:
var clicks = Rx.Observable.fromEvent(document, 'click');
clicks.subscribe(x => console.log(x));
當用戶對document產生一個click行為的時候, 就會打印事件對象到控制臺上。
Observer是什么
先看代碼:
let foo = Rx.Observable.create(observer => {
console.log('Hello');
observer.next(42);
});
let observer = x => console.log(x);
foo.subscribe(observer);</code></pre>
代碼中的第二個變量就是observer. 沒錯, observer就是 當Observable"返回"值的時候接受那個值的函數! 第一行中的observer其實就是通過foo.subscribe傳入的callback. 只不過稍加封裝了。 怎么封裝的? 看代碼:
let foo = Rx.Observable.create(observer => {
try {
console.log('Hello');
observer.next(42);
observer.complete();
observer.next(10);
} catch(e) { observer.error(e) }
});
let observer = {
next(value) { console.log(value) },
complete() { console.log('completed'),
error(err) { console.error(err) }
}
foo.subscribe(observer);</code></pre>
你看到observer被定義成了一個對象, 其實這才是完整的observer. 傳入一個callback到observable.subcribe相當于傳入了 { next: callback } 。
Subcription里的陷阱
Subscription是什么, 先上代碼:
var observable = Rx.Observable.interval(1000);
var subscription = observable.subscribe(x => console.log(x));
setTimeout(() => {
subscription.unsubscribe();
}, 3100)</code></pre>
運行結果:
Rx.Observable.interval可以返回 一個能夠發射(返回)0, 1, 2, 3..., n數字的Observable , 返回的時間間隔這里是1000ms。 第二行中的變量就是subscription。 subscription有一個unsubscribe方法, 這個方法可以讓 subscription訂閱的observable發射的數據被observer忽略掉 .通俗點說就是取消訂閱。
unsubscribe存在一個陷阱。 先看代碼:
var foo = Rx.Observable.create((observer) => {
var i = 0
setInterval(() => {
observer.next(i++)
console.log('hello')
}, 1000)
})
const subcription = foo.subscribe((i) => console.log(i))
subcription.unsubscribe()</code></pre>
運行結果:
hello
hello
hello
......
hello
unsubscribe只會讓observer忽略掉observable發射的數據,但是setInterval依然會繼續執行。 這看起來似乎是一個愚蠢的設計。 所以不建議這樣寫。
Subject
Subject是一種能夠發射數據給多個observer的Observable, 這讓Subject看起來就好像是EventEmitter。 先上代碼:
var subject = new Rx.Subject();
subject.subscribe({
next: (v) => console.log('observerA: ' + v)
});
subject.subscribe({
next: (v) => console.log('observerB: ' + v)
});
subject.next(1);
subject.next(2);</code></pre>
運行結果:
observerA: 1
observerB: 1
observerA: 2
observerB: 2
與Observable不同的是, Subject發射數據給多個observer。 其次, 定義subject的時候并沒有傳入callback, 這是因為subject自帶next, complete, error等方法。從而可以發射數據給observer。 這和EventEmitter很類似。observer并不知道他subscribe的是Obervable還是Subject。 對observer來說是透明的。 而且Subject還有各種派生, 比如說:
BehaviorSubject 能夠保留最近的數據,使得當有subscribe的時候,立馬發射出去。看代碼:
var subject = new Rx.BehaviorSubject(0); // 0 is the initial value
subject.subscribe({
next: (v) => console.log('observerA: ' + v)
});
subject.next(1);
subject.next(2);
subject.subscribe({
next: (v) => console.log('observerB: ' + v)
});
subject.next(3);</code></pre>
運行結果:
observerA: 0
observerA: 1
observerA: 2
observerB: 2
observerA: 3
observerB: 3
ReplaySubject 能夠保留最近的一些數據, 使得當有subscribe的時候,將這些數據發射出去。看代碼:
var subject = new Rx.ReplaySubject(3);
subject.subscribe({
next: (v) => console.log('observerA: ' + v)
});
subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);
subject.subscribe({
next: (v) => console.log('observerB: ' + v)
});
subject.next(5);</code></pre>
輸出結果:
observerA: 1
observerA: 2
observerA: 3
observerA: 4
observerB: 2
observerB: 3
observerB: 4
observerA: 5
observerB: 5
第一行的聲明表示ReplaySubject最大能夠記錄的數據的數量是3。
AsyncSubject 只會發射結束前的一個數據。 看代碼:
var subject = new Rx.AsyncSubject();
subject.subscribe({
next: (v) => console.log('observerA: ' + v)
});
subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);
subject.subscribe({
next: (v) => console.log('observerB: ' + v)
});
subject.next(5);
subject.complete();</code></pre>
輸出結果:
observerA: 5
observerB: 5
既然subject有next, error, complete三種方法, 那subject就可以作為observer! 看代碼:
var subject = new Rx.Subject();
subject.subscribe({
next: (v) => console.log('observerA: ' + v)
});
subject.subscribe({
next: (v) => console.log('observerB: ' + v)
});
var observable = Rx.Observable.from([1, 2, 3]);
observable.subscribe(subject);</code></pre>
輸出結果:
observerA: 1
observerB: 1
observerA: 2
observerB: 2
observerA: 3
observerB: 3
也就是說, observable.subscribe可以傳入一個subject來訂閱其消息。 這就好像是Rxjs中的一顆語法糖, Rxjs有專門的實現。
Multicasted Observables 是一種借助Subject來將數據發射給多個observer的Observable。 看代碼:
var source = Rx.Observable.from([1, 2, 3]);
var subject = new Rx.Subject();
var multicasted = source.multicast(subject);
multicasted.subscribe({
next: (v) => console.log('observerA: ' + v)
});
multicasted.subscribe({
next: (v) => console.log('observerB: ' + v)
});
multicasted.connect();</code></pre>
Rx.Observable.from能夠逐一發射數組中的元素, 在multicasted.connect()調用之前的任何subscribe都不會導致source發射數據。multicasted.connect()相當于之前的observable.subscribe(subject)。因此不能將multicasted.connect()寫在subscribe的前面。因為這會導致在執行multicasted.connect()的時候source發射數據, 但是subject又沒保存數據, 導致兩個subscribe無法接收到任何數據。
最好是第一個subscribe的時候能夠得到當前已有的數據, 最后一個unsubscribe的時候就 停止Observable的執行 , 相當于Observable發射的數據都被忽略。
refCount就是能夠返回這樣的Observable的方法
var source = Rx.Observable.interval(500);
var subject = new Rx.Subject();
var refCounted = source.multicast(subject).refCount();
var subscription1, subscription2, subscriptionConnect;
console.log('observerA subscribed');
subscription1 = refCounted.subscribe({
next: (v) => console.log('observerA: ' + v)
});
setTimeout(() => {
console.log('observerB subscribed');
subscription2 = refCounted.subscribe({
next: (v) => console.log('observerB: ' + v)
});
}, 600);
setTimeout(() => {
console.log('observerA unsubscribed');
subscription1.unsubscribe();
}, 1200);
setTimeout(() => {
console.log('observerB unsubscribed');
subscription2.unsubscribe();
}, 2000);</code></pre>
輸出結果:
observerA subscribed
observerA: 0
observerB subscribed
observerA: 1
observerB: 1
observerA unsubscribed
observerB: 2
observerB unsubscribed
What's Operators?
Observable上有很多方法, 比如說map, filter, merge等等。 他們基于調用它們的observable,返回一個 全新的observable 。 而且他們都是純方法。 operators分為兩種, instance operators 和 static operators。 instance operators是存在于observable實例上的方法, 也就是實例方法; static operators是存在于Observable這個類型上的方法, 也就是靜態方法。Rxjs擁有很多強大的 operators 。
自己實現一個operators:
function multiplyByTen(input) {
var output = Rx.Observable.create(function subscribe(observer) {
input.subscribe({
next: (v) => observer.next(10 * v),
error: (err) => observer.error(err),
complete: () => observer.complete()
});
});
return output;
}
var input = Rx.Observable.from([1, 2, 3, 4]);
var output = multiplyByTen(input);
output.subscribe(x => console.log(x));</code></pre>
輸出結果:
Rx.js實踐import React from 'react';
import ReactDOM from 'react-dom';
import Rx from 'rx';
class Main extends React.Component {
constructor (props) {
super(props);
this.state = {count: 0};
}
// Click events are now observables! No more proactive approach.
componentDidMount () {
const plusBtn = document.getElementById('plus');
const minusBtn = document.getElementById('minus');
const plus$ = Rx.Observable.fromEvent(plusBtn, 'click').map(e => 1);
const minus$ = Rx.Observable.fromEvent(minusBtn, 'click').map(e => -1);
Rx.Observable.merge(plus$, minus$).scan((acc, n) => acc + n)
.subscribe(value => this.setState({count: value}));
}
render () {
return (
<div>
<button id="plus">+</button>
<button id="minus">-</button>
<div>count: {this.state.count}</div>
</div>
);
}
}
ReactDOM.render(<Main/>, document.getElementById('app'));</code></pre>
merge用于合并兩個observable產生一個新的observable。 scan類似于Array中的reduce。 這個例子 實現了點擊plus的時候+1, 點擊minus的時候-1。
Rx.js適用的場景
-
多個復雜的異步或事件組合在一起。
-
處理多個數據序列
假如沒有被復雜的異步,事件, 數據序列困擾, 如果promise已經足夠的話, 就沒必要適用Rx.js。
Summary
-
Observable, Observer, Subscription, Subscrib, Subject概念。
-
RxJS適用于解決復雜的異步,事件問題。
文章參考
來自:https://segmentfault.com/a/1190000008464065