細說ReactiveCocoa的冷信號與熱信號(三):怎么處理冷信號與熱信號
第一篇文章中我們介紹了冷信號與熱信號的概念,前一篇文章我們也討論了為什么要區分冷信號與熱信號,下面我會先為大家揭曉熱信號的本質,再給出冷信號轉換成熱信號的方法。
揭示熱信號的本質
在 ReactiveCocoa 中,究竟什么才是熱信號呢?冷信號是比較常見的, map 一下就會得到一個冷信號。但在RAC中,好像并沒有“hot signal”這個單獨的說法。原來在RAC的世界中,所有的熱信號都屬于一個類—— RACSubject 。接下來我們來看看究竟它為什么這么“神奇”。
在RAC2.5文檔的 框架概述 中,有著這樣一段描述:
A subject, represented by the RACSubject class, is a signal that can be manually controlled.
Subjects can be thought of as the "mutable" variant of a signal, much like NSMutableArray is for NSArray. They are extremely useful for bridging non-RAC code into the world of signals.
For example, instead of handling application logic in block callbacks, the blocks can simply send events to a shared subject instead. The subject can then be returned as a RACSignal, hiding the implementation detail of the callbacks.
Some subjects offer additional behaviors as well. In particular, RACReplaySubject can be used to buffer events for future subscribers, like when a network request finishes before anything is ready to handle the result.
從這段描述中,我們可以發現Subject具備如下三個特點:
- Subject是“可變”的。
- Subject是非RAC到RAC的一個橋梁。
- Subject可以附加行為,例如 RACReplaySubject 具備為未來訂閱者緩沖事件的能力。
從第三個特點來看,Subject具備為未來訂閱者緩沖事件的能力,那也就說明它是自身是有狀態的。根據上文的介紹,Subject是符合熱信號的特點的。為了驗證它,我們再來做個簡單實驗:
RACSubject *subject = [RACSubject subject];
RACSubject *replaySubject = [RACReplaySubject subject];
[[RACScheduler mainThreadScheduler] afterDelay:0.1 schedule:^{
// Subscriber 1
[subject subscribeNext:^(id x) {
NSLog(@"Subscriber 1 get a next value: %@ from subject", x);
}];
[replaySubject subscribeNext:^(id x) {
NSLog(@"Subscriber 1 get a next value: %@ from replay subject", x);
}];
// Subscriber 2
[subject subscribeNext:^(id x) {
NSLog(@"Subscriber 2 get a next value: %@ from subject", x);
}];
[replaySubject subscribeNext:^(id x) {
NSLog(@"Subscriber 2 get a next value: %@ from replay subject", x);
}];
}];
[[RACScheduler mainThreadScheduler] afterDelay:1 schedule:^{
[subject sendNext:@"send package 1"];
[replaySubject sendNext:@"send package 1"];
}];
[[RACScheduler mainThreadScheduler] afterDelay:1.1 schedule:^{
// Subscriber 3
[subject subscribeNext:^(id x) {
NSLog(@"Subscriber 3 get a next value: %@ from subject", x);
}];
[replaySubject subscribeNext:^(id x) {
NSLog(@"Subscriber 3 get a next value: %@ from replay subject", x);
}];
// Subscriber 4
[subject subscribeNext:^(id x) {
NSLog(@"Subscriber 4 get a next value: %@ from subject", x);
}];
[replaySubject subscribeNext:^(id x) {
NSLog(@"Subscriber 4 get a next value: %@ from replay subject", x);
}];
}];
[[RACScheduler mainThreadScheduler] afterDelay:2 schedule:^{
[subject sendNext:@"send package 2"];
[replaySubject sendNext:@"send package 2"];
}];
按照時間線來解讀一下上述代碼:
- 0s時創建 subject 與 replaySubject 這兩個subject。
- 0.1s時 Subscriber 1 分別訂閱了 subject 與 replaySubject 。
- 0.1s時 Subscriber 2 也分別訂閱了 subject 與 replaySubject 。
- 1s時分別向 subject 與 replaySubject 發送了 "send package 1" 這個字符串作為 值 。
- 1.1s時 Subscriber 3 分別訂閱了 subject 與 replaySubject 。
- 1.1s時 Subscriber 4 也分別訂閱了 subject 與 replaySubject 。
- 2s時再分別向 subject 與 replaySubject 發送了 "send package 2" 這個字符串作為 值 。
接下來看一下輸出的結果:
2015-09-28 13:35:22.855 RACDemos[13646:1269269] Start
2015-09-28 13:35:23.856 RACDemos[13646:1269269] Subscriber 1 get a next value: send package 1 from subject
2015-09-28 13:35:23.856 RACDemos[13646:1269269] Subscriber 2 get a next value: send package 1 from subject
2015-09-28 13:35:23.857 RACDemos[13646:1269269] Subscriber 1 get a next value: send package 1 from replay subject
2015-09-28 13:35:23.857 RACDemos[13646:1269269] Subscriber 2 get a next value: send package 1 from replay subject
2015-09-28 13:35:24.059 RACDemos[13646:1269269] Subscriber 3 get a next value: send package 1 from replay subject
2015-09-28 13:35:24.059 RACDemos[13646:1269269] Subscriber 4 get a next value: send package 1 from replay subject
2015-09-28 13:35:25.039 RACDemos[13646:1269269] Subscriber 1 get a next value: send package 2 from subject
2015-09-28 13:35:25.039 RACDemos[13646:1269269] Subscriber 2 get a next value: send package 2 from subject
2015-09-28 13:35:25.039 RACDemos[13646:1269269] Subscriber 3 get a next value: send package 2 from subject
2015-09-28 13:35:25.040 RACDemos[13646:1269269] Subscriber 4 get a next value: send package 2 from subject
2015-09-28 13:35:25.040 RACDemos[13646:1269269] Subscriber 1 get a next value: send package 2 from replay subject
2015-09-28 13:35:25.040 RACDemos[13646:1269269] Subscriber 2 get a next value: send package 2 from replay subject
2015-09-28 13:35:25.040 RACDemos[13646:1269269] Subscriber 3 get a next value: send package 2 from replay subject
2015-09-28 13:35:25.040 RACDemos[13646:1269269] Subscriber 4 get a next value: send package 2 from replay subject
結合結果可以分析出如下內容:
- 22.855s時,測試啟動, subject 與 replaySubject 創建完畢。
- 23.856s時,距離啟動大約1s后, Subscriber 1 和 Subscriber 2 同時 從 subject 接收到了 "send package 1" 這個值。
- 23.857s時,也是距離啟動大約1s后, Subscriber 1 和 Subscriber 2 同時 從 replaySubject 接收到了 "send package 1" 這個值。
- 24.059s時,距離啟動大約1.2s后, Subscriber 3 和 Subscriber 4 同時 從 replaySubject 接收到了 "send package 1" 這個值。 注意 Subscriber 3 和 Subscriber 4 并沒有從 subject 接收 "send package 1" 這個值。
- 25.039s時,距離啟動大約2.1s后, Subscriber 1 、 Subscriber 2 、 Subscriber 3 、 Subscriber 4 同時 從 subject 接收到了 "send package 2" 這個值。
- 25.040s時,距離啟動大約2.1s后, Subscriber 1 、 Subscriber 2 、 Subscriber 3 、 Subscriber 4 同時 從 replaySubject 接收到了 "send package 2" 這個值。
只關注 subject ,根據時間線,我們可以得到下圖:
經過觀察不難發現,4個訂閱者實際上是共享 subject 的,一旦這個 subject 發送了值,當前的訂閱者就會同時接收到。由于 Subscriber 3 與 Subscriber 4 的訂閱時間稍晚,所以錯過了第一次值的發送。這與冷信號是截然不同的反應。冷信號的圖類似下圖:
對比上面兩張圖,是不是可以發現, subject 類似“直播”,錯過了就不再處理。而 signal 類似“點播”,每次訂閱都會從頭開始。所以我們有理由認定 subject 天然就是熱信號。
下面再來看看 replaySubject ,根據時間線,我們能得到另一張圖:
將圖3與圖1對比會發現, Subscriber 3 與 Subscriber 4 在訂閱后馬上接收到了“歷史值”。對于 Subscriber 3 和 Subscriber 4 來說,它們只關心“歷史的值”而不關心“歷史的時間線”,因為實際上 1 與 2 是間隔1s發送的,但是它們接收到的顯然不是。舉個生動的例子,就好像科幻電影里面主人公穿越時間線后會先把所有的回憶快速閃過再來到現實一樣。(見《X戰警:逆轉未來》、《蝴蝶效應》)所以我們也有理由認定 replaySubject 天然也是熱信號。
看到這里,我們終于揭開了熱信號的面紗,結論就是:
- RACSubject 及其子類是 熱信號 。
- RACSignal 排除 RACSubject 類以外的是 冷信號 。
如何將一個冷信號轉化成熱信號——廣播
冷信號與熱信號的本質區別在于是否保持狀態,冷信號的多次訂閱是不保持狀態的,而熱信號的多次訂閱可以保持狀態。所以一種將冷信號轉換為熱信號的方法就是,將冷信號訂閱,訂閱到的每一個時間通過 RACSbuject 發送出去,其他訂閱者只訂閱這個 RACSubject 。
觀察下面的代碼:
RACSignal *coldSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
NSLog(@"Cold signal be subscribed.");
[[RACScheduler mainThreadScheduler] afterDelay:1.5 schedule:^{
[subscriber sendNext:@"A"];
}];
[[RACScheduler mainThreadScheduler] afterDelay:3 schedule:^{
[subscriber sendNext:@"B"];
}];
[[RACScheduler mainThreadScheduler] afterDelay:5 schedule:^{
[subscriber sendCompleted];
}];
return nil;
}];
RACSubject *subject = [RACSubject subject];
NSLog(@"Subject created.");
[[RACScheduler mainThreadScheduler] afterDelay:2 schedule:^{
[coldSignal subscribe:subject];
}];
[subject subscribeNext:^(id x) {
NSLog(@"Subscriber 1 recieve value:%@.", x);
}];
[[RACScheduler mainThreadScheduler] afterDelay:4 schedule:^{
[subject subscribeNext:^(id x) {
NSLog(@"Subscriber 2 recieve value:%@.", x);
}];
執行順序是這樣的:
- 創建一個冷信號: coldSignal 。該信號聲明了“訂閱后1.5秒發送‘A’,3秒發送'B',5秒發送完成事件”。
- 創建一個RACSubject: subject 。
- 在2秒后使用這個 subject 訂閱 coldSignal 。
- 立即訂閱這個 subject 。
- 4秒后訂閱這個 subject 。
如果所料不錯的話,通過訂閱這個 subject 并不會引起 coldSignal 重復執行block的內容。我們來看下結果:
2015-09-28 19:36:45.703 RACDemos[14110:1556061] Subject created.
2015-09-28 19:36:47.705 RACDemos[14110:1556061] Cold signal be subscribed.
2015-09-28 19:36:49.331 RACDemos[14110:1556061] Subscriber 1 recieve value:A.
2015-09-28 19:36:50.999 RACDemos[14110:1556061] Subscriber 1 recieve value:B.
2015-09-28 19:36:50.999 RACDemos[14110:1556061] Subscriber 2 recieve value:B.
參考時間線,會得到下圖:
不難發現其中的幾個重點:
- subject 是從一開始就創建好的,等到2s后便開始訂閱 coldSignal 。
- Subscriber 1 是 subject 創建后就開始訂閱的,但是第一個接收時間與 subject 接收 coldSignal 第一個值的時間是一樣的。
- Subscriber 2 是 subject 創建4s后開始訂閱的,所以只能接收到第二個值。
通過觀察可以確定, subject 就是 coldSignal 轉化的熱信號。所以使用 RACSubject 來將冷信號轉化為熱信號是可行的。
當然,使用這種 RACSubject 來訂閱冷信號得到熱信號的方式仍有一些小的瑕疵。例如 subject 的訂閱者提前終止了訂閱,而 subject 并不能終止對 coldSignal 的訂閱。( RACDisposable 是一個比較大的話題,我計劃在其他的文章中詳細闡述它,也希望感興趣的同學自己來理解。)所以在RAC庫中對于冷信號轉化成熱信號有如下標準的封裝:
- (RACMulticastConnection *)publish;
- (RACMulticastConnection *)multicast:(RACSubject *)subject;
- (RACSignal *)replay;
- (RACSignal *)replayLast;
- (RACSignal *)replayLazily;
這5個方法中,最為重要的就是 - (RACMulticastConnection *)multicast:(RACSubject *)subject; 這個方法了,其他幾個方法也是間接調用它的。我們來看看它的實現:
/// implementation RACSignal (Operations)
- (RACMulticastConnection *)multicast:(RACSubject *)subject {
[subject setNameWithFormat:@"[%@] -multicast: %@", self.name, subject.name];
RACMulticastConnection *connection = [[RACMulticastConnection alloc] initWithSourceSignal:self subject:subject];
return connection;
}
/// implementation RACMulticastConnection
- (id)initWithSourceSignal:(RACSignal *)source subject:(RACSubject *)subject {
NSCParameterAssert(source != nil);
NSCParameterAssert(subject != nil);
self = [super init];
if (self == nil) return nil;
_sourceSignal = source;
_serialDisposable = [[RACSerialDisposable alloc] init];
_signal = subject;
return self;
}
#pragma mark Connecting
- (RACDisposable *)connect {
BOOL shouldConnect = OSAtomicCompareAndSwap32Barrier(0, 1, &_hasConnected);
if (shouldConnect) {
self.serialDisposable.disposable = [self.sourceSignal subscribe:_signal];
}
return self.serialDisposable;
}
- (RACSignal *)autoconnect {
__block volatile int32_t subscriberCount = 0;
return [[RACSignal
createSignal:^(id<RACSubscriber> subscriber) {
OSAtomicIncrement32Barrier(&subscriberCount);
RACDisposable *subscriptionDisposable = [self.signal subscribe:subscriber];
RACDisposable *connectionDisposable = [self connect];
return [RACDisposable disposableWithBlock:^{
[subscriptionDisposable dispose];
if (OSAtomicDecrement32Barrier(&subscriberCount) == 0) {
[connectionDisposable dispose];
}
}];
}]
setNameWithFormat:@"[%@] -autoconnect", self.signal.name];
}
雖然代碼比較短但不是很好懂,大概來說明一下:
- 當 RACSignal 類的實例調用 - (RACMulticastConnection *)multicast:(RACSubject *)subject 時,以 self 和 subject 作為構造參數創建一個 RACMulticastConnection 實例。
- RACMulticastConnection 構造的時候,保存 source 和 subject 作為成員變量,創建一個 RACSerialDisposable 對象,用于取消訂閱。
- 當 RACMulticastConnection 類的實例調用 - (RACDisposable *)connect 這個方法的時候,判斷是否是第一次。如果是的話 用 _signal 這個成員變量來訂閱 sourceSignal 之后返回 self.serialDisposable ;否則直接返回 self.serialDisposable 。這里面訂閱 sourceSignal 是重點。
- RACMulticastConnection 的 signal 只讀屬性,就是一個熱信號,訂閱這個熱信號就避免了各種副作用的問題。它會在 - (RACDisposable *)connect 第一次調用后,根據 sourceSignal 的訂閱結果來傳遞事件。
- 想要確保第一次訂閱就能成功訂閱 sourceSignal ,可以使用 - (RACSignal *)autoconnect 這個方法,它保證了第一個訂閱者觸發 sourceSignal 的訂閱,也保證了當返回的信號所有訂閱者都關閉連接后 sourceSignal 被正確關閉連接。
由于RAC是一個線程安全的框架,所以好奇的同學可以了解下“OSAtomic*”這一系列的原子操作。拋開這些應該不難理解上述代碼。
了解源碼之后,這個方法的正確使用就清楚了,應該像這樣:
RACSignal *coldSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
NSLog(@"Cold signal be subscribed.");
[[RACScheduler mainThreadScheduler] afterDelay:1.5 schedule:^{
[subscriber sendNext:@"A"];
}];
[[RACScheduler mainThreadScheduler] afterDelay:3 schedule:^{
[subscriber sendNext:@"B"];
}];
[[RACScheduler mainThreadScheduler] afterDelay:5 schedule:^{
[subscriber sendCompleted];
}];
return nil;
}];
RACSubject *subject = [RACSubject subject];
NSLog(@"Subject created.");
RACMulticastConnection *multicastConnection = [coldSignal multicast:subject];
RACSignal *hotSignal = multicastConnection.signal;
[[RACScheduler mainThreadScheduler] afterDelay:2 schedule:^{
[multicastConnection connect];
}];
[hotSignal subscribeNext:^(id x) {
NSLog(@"Subscribe 1 recieve value:%@.", x);
}];
[[RACScheduler mainThreadScheduler] afterDelay:4 schedule:^{
[hotSignal subscribeNext:^(id x) {
NSLog(@"Subscribe 2 recieve value:%@.", x);
}];
}];
或者這樣:
RACSignal *coldSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
NSLog(@"Cold signal be subscribed.");
[[RACScheduler mainThreadScheduler] afterDelay:1.5 schedule:^{
[subscriber sendNext:@"A"];
}];
[[RACScheduler mainThreadScheduler] afterDelay:3 schedule:^{
[subscriber sendNext:@"B"];
}];
[[RACScheduler mainThreadScheduler] afterDelay:5 schedule:^{
[subscriber sendCompleted];
}];
return nil;
}];
RACSubject *subject = [RACSubject subject];
NSLog(@"Subject created.");
RACMulticastConnection *multicastConnection = [coldSignal multicast:subject];
RACSignal *hotSignal = multicastConnection.autoconnect;
[[RACScheduler mainThreadScheduler] afterDelay:2 schedule:^{
[hotSignal subscribeNext:^(id x) {
NSLog(@"Subscribe 1 recieve value:%@.", x);
}];
}];
[[RACScheduler mainThreadScheduler] afterDelay:4 schedule:^{
[hotSignal subscribeNext:^(id x) {
NSLog(@"Subscribe 2 recieve value:%@.", x);
}];
}];
以上的兩種寫法和之前用Subject來傳遞的例子都可以得到相同的結果。
下面再來看看其他幾個方法的實現:
/// implementation RACSignal (Operations)
- (RACMulticastConnection *)publish {
RACSubject *subject = [[RACSubject subject] setNameWithFormat:@"[%@] -publish", self.name];
RACMulticastConnection *connection = [self multicast:subject];
return connection;
}
- (RACSignal *)replay {
RACReplaySubject *subject = [[RACReplaySubject subject] setNameWithFormat:@"[%@] -replay", self.name];
RACMulticastConnection *connection = [self multicast:subject];
[connection connect];
return connection.signal;
}
- (RACSignal *)replayLast {
RACReplaySubject *subject = [[RACReplaySubject replaySubjectWithCapacity:1] setNameWithFormat:@"[%@] -replayLast", self.name];
RACMulticastConnection *connection = [self multicast:subject];
[connection connect];
return connection.signal;
}
- (RACSignal *)replayLazily {
RACMulticastConnection *connection = [self multicast:[RACReplaySubject subject]];
return [[RACSignal
defer:^{
[connection connect];
return connection.signal;
}]
setNameWithFormat:@"[%@] -replayLazily", self.name];
}
這幾個方法的實現都相當簡單,只是為了簡化而封裝,具體說明一下:
- - (RACMulticastConnection *)publish 就是幫忙創建了 RACSubject 。
- - (RACSignal *)replay 就是用 RACReplaySubject 來作為 subject ,并立即執行 connect 操作,返回 connection.signal 。其作用是上面提到的 replay 功能,即后來的訂閱者可以收到歷史值。
- - (RACSignal *)replayLast 就是用 Capacity 為1的 RACReplaySubject 來替換 - (RACSignal *)replay 的`subject。其作用是使后來訂閱者只收到最后的歷史值。
- - (RACSignal *)replayLazily 和 - (RACSignal *)replay 的區別就是 replayLazily 會在第一次訂閱的時候才訂閱 sourceSignal 。
所以,其實本質仍然是
使用一個Subject來訂閱原始信號,并讓其他訂閱者訂閱這個Subject,這個Subject就是熱信號。
現在再回過來看下之前系列文章第二篇中那個業務場景的例子,其實修改的方法很簡單,就是在網絡獲取的 fetchData 這個信號后面,增加一個 replayLazily 變換,就不會出現網絡請求重發6次的問題了。
修改后的代碼如下,大家可以試試:
self.sessionManager = [[AFHTTPSessionManager alloc] initWithBaseURL:[NSURL URLWithString:@"http://api.xxxx.com"]];
self.sessionManager.requestSerializer = [AFJSONRequestSerializer serializer];
self.sessionManager.responseSerializer = [AFJSONResponseSerializer serializer];
@weakify(self)
RACSignal *fetchData = [[RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
@strongify(self)
NSURLSessionDataTask *task = [self.sessionManager GET:@"fetchData" parameters:@{@"someParameter": @"someValue"} success:^(NSURLSessionDataTask *task, id responseObject) {
[subscriber sendNext:responseObject];
[subscriber sendCompleted];
} failure:^(NSURLSessionDataTask *task, NSError *error) {
[subscriber sendError:error];
}];
return [RACDisposable disposableWithBlock:^{
if (task.state != NSURLSessionTaskStateCompleted) {
[task cancel];
}
}];
}] replayLazily]; // modify here!!
RACSignal *title = [fetchData flattenMap:^RACSignal *(NSDictionary *value) {
if ([value[@"title"] isKindOfClass:[NSString class]]) {
return [RACSignal return:value[@"title"]];
} else {
return [RACSignal error:[NSError errorWithDomain:@"some error" code:400 userInfo:@{@"originData": value}]];
}
}];
RACSignal *desc = [fetchData flattenMap:^RACSignal *(NSDictionary *value) {
if ([value[@"desc"] isKindOfClass:[NSString class]]) {
return [RACSignal return:value[@"desc"]];
} else {
return [RACSignal error:[NSError errorWithDomain:@"some error" code:400 userInfo:@{@"originData": value}]];
}
}];
RACSignal *renderedDesc = [desc flattenMap:^RACStream *(NSString *value) {
NSError *error = nil;
RenderManager *renderManager = [[RenderManager alloc] init];
NSAttributedString *rendered = [renderManager renderText:value error:&error];
if (error) {
return [RACSignal error:error];
} else {
return [RACSignal return:rendered];
}
}];
RAC(self.someLablel, text) = [[title catchTo:[RACSignal return:@"Error"]] startWith:@"Loading..."];
RAC(self.originTextView, text) = [[desc catchTo:[RACSignal return:@"Error"]] startWith:@"Loading..."];
RAC(self.renderedTextView, attributedText) = [[renderedDesc catchTo:[RACSignal return:[[NSAttributedString alloc] initWithString:@"Error"]]] startWith:[[NSAttributedString alloc] initWithString:@"Loading..."]];
[[RACSignal merge:@[title, desc, renderedDesc]] subscribeError:^(NSError *error) {
UIAlertView *alertView = [[UIAlertView alloc] initWithTitle:@"Error" message:error.domain delegate:nil cancelButtonTitle:@"OK" otherButtonTitles:nil];
[alertView show];
}];
當然,細心的同學會發現這樣修改,仍然有許多計算上的浪費,例如將 fetchData 轉換為 title 的block會執行多次,將 fetchData 轉換為 desc 的block也會執行多次。但是由于這些block都是無副作用的,計算量并不大,可以忽略不計。如果計算量大的,也需要對中間的信號進行熱信號的轉換。不過請不要忽略冷熱信號的轉換本身也是有計算代價的。
好的,寫到這里,我們終于揭開RAC中冷信號與熱信號的全部面紗,也知道如何使用了。希望這個系列文章可以讓大家更好地了解RAC,避免使用RAC遇到的誤區。謝謝大家。
美團iOS組有很多志同道合的小伙伴,對于各種技術都有著深入的了解,我們熱忱地歡迎一切牛掰的小伙伴加入,共同學習,共同進步。(簡歷請發送到郵箱 liangsi02@meituan.com)
來自:http://tech.meituan.com/talk-about-reactivecocoas-cold-signal-and-hot-signal-part-3.html