RxJS 緩存高級教程

1159927186 6年前發布 | 30K 次閱讀 RxJS

在開發 Web 應用程序時,性能一般都是出于最高優先級的。對于 Angular 項目,我們有很多途徑去提升程序性能,例如搖樹優化(tree-shaking)、AoT(ahead-of-time 編譯)、模塊懶加載(lazy loading)以及緩存。為了能夠更好地概覽全局,以便提高 Angular 應用程序的性能,我們強烈推薦使用 Minko GechevAngular 性能檢查表 。本文主要聚焦于 緩存

事實上,緩存是提升網站性能的最有效的方法之一,尤其是當用戶出于受限網絡帶寬或慢速網絡的情況。

有很多種緩存數據或資源的方法。靜態資源通常使用標準的瀏覽器緩存或 Service Worker 進行緩存。當然,Service Worker 也可以緩存 API 請求,但它們更適合緩存圖片、HTML、JS 或 CSS 等文件。緩存系統數據,我們則會選用另外的機制。

不管我們選擇怎樣的機制,緩存都會 改善系統的響應性降低網絡消耗在網絡中斷的情況下依然能夠使用內容 。換句話說,當內容在更接近用戶的地方被緩存,例如就在客戶端,請求就不會引起另外的網絡活動;緩存的數據可以被更快返回,因為我們不需要進行完整的網絡周期。

這篇文章我們將使用 RxJS 以及 Angular 提供的各種工具實現一種高級的緩存機制。

動機

一直以來,都有一個疑問:如何在一個頻繁使用 Observable 對象的 Angular 程序中緩存數據。很多人都知道如何使用 Promise 緩存數據,但是對如何在函數式的響應式編程中緩存數據束手無策。因為后者的復雜性(龐大的 API)、完全不同的使用方式(從命令式編程到指令式編程)以及許多概念。因此,將基于 Promise 的緩存系統移植到 Observable 是非常困難的,尤其是還需要實現一些高級功能的時候。

Angular 應用程序通常使用由 HttpClientModule 提供的 HTTPClient 實現 HTTP 請求。它的所有 API 都是基于 Observable 的。這意味著, HTTPClient 的函數,例如 get 、 post 、 put 和 delete 都返回一個 Observable 。 Observable 天生是懶的,只有當我們調用了 subscribe 函數之后才會發送請求。但是,對同一個 Observable 多次調用 subscribe 函數,會一遍一遍地創建源 Observable 對象,也就是為每一次定于執行一次請求。我們將這種模式成為冷模式(cold)。

如果你對此完全不了解,可以閱讀我們的另外一篇文章 《Observable 的冷模式和熱模式》

這種行為使得實現 Observable 緩存機制變得有些棘手。簡單的實現通常需要大量固定模式的代碼,而且最終可能需要繞過 RxJS。這是一種解決思路,但如果我們依然希望使用 Observable 的強大功能,這種實現就不值得推薦。簡單來說,我們并不想給法拉利安裝一個摩托車引擎,對吧?

需求

在我們深入代碼之前,首先定義好我們的高級緩存機制的需求。

我們要開發一個名為 笑話世界 的應用。這是一個簡單的 app,隨機顯示給定分類里面的笑話。為了盡可能簡單,我們只給出一個分類。

這個 app 有三個組件: AppComponent 、 DashboardComponent 和 JokeListComponent 。

AppComponent 是程序入口,顯示一個工具欄和一個安裝當前路由狀態填充的 <router-outlet> 。

DashboardComponent 只用來顯示分類列表。我們可以從這里導航到 JokeListComponent 。 JokeListComponent 負責將笑話列表顯示到屏幕。

笑話由 Angular 的 HttpClient 服務從服務器獲取。為保持組件的響應、解耦,我們要創建一個 JokeService ,來幫助我們獲取數據。組件只需要注入這個服務,通過其公開的 API 訪問數據即可。

以上所有都是我們的系統架構,并沒有引入緩存。

當我們從主頁到列表視圖時,我們可以從緩存請求數據,而不是每次都從服務器獲取。緩存中的數據每 10 秒自動更新。

當然,每 10 秒獲取數據并不是每個產品都需要遵守的固定準則,我們可能需要更復雜的實現來更新緩存(例如使用 web socket 推送更新)。但是,現在我們可以盡可能保持簡單,集中精力解決緩存的問題。

無論如何,我們都希望收到某種更新的通知。就我們的程序而言,我們不希望自動更新 UI( JokeListComponent )的數據,而是用戶要求 UI 更新時才去更新緩存。為什么?想象下這樣的場景:用戶正在閱讀一個笑話,因為數據的自動更新,所有笑話突然都消失了。這無疑非常令人反感,是一種非常差的用戶體驗。因此,我們的用戶在有新數據的時候會收到通知。

為了更有趣一點,我們還希望用戶能夠強制刷新緩存。這與僅僅更新 UI 不同,因為強制刷新意味著要從服務器請求數據、更新緩存、然后更新 UI。

現在我們總結一下我們想要干什么:

  • 程序有兩個組件,當從組件 A 導航到組件 B 時,組件 B 的數據最好從緩存獲取,而不是每次都從服務器獲取
  • 每 10 秒更新緩存
  • UI 中的數據并不會自動更新,而是由用戶強制更新
  • 用戶可以強制刷新,從服務器重新獲取數據、更新緩存和 UI

下面是我們即將構建的 app 預覽圖:

實現基本的緩存

我們從一個簡單的實現開始,逐漸過渡到最終的全功能版本。

第一步是創建一個新的服務。

然后,我們添加兩個接口,一個用來描述 Joke 的屬性,另一個用來描述 HTTP 請求的返回。這樣的接口會讓我們的程序更符合 TypeScript 的要求,同時也更方便開發。

export interface Joke {
  id: number;
  joke: string;
  categories: Array<string>;
}
 
export interface JokeResponse {
  type: string;
  value: Array<Joke>;
}

下面我們實現 JokeService 。我們不想透露數據究竟是從緩存獲取的,還是從服務器獲取的,因此,我們只提供一個返回值類型為 Observable 的 jokes 屬性,用于獲取笑話列表。

為了執行 HTTP 請求,我們需要為我們的服務注入 HttpClient 。

下面是 JokeService 的代碼框架:

import { Injectable } from '@angular/core';
import { HttpClient } from '@angular/common/http';
 
@Injectable()
export class JokeService {
 
  constructor(private http: HttpClient) { }
 
  get jokes() {
    ...
  }
}

下面,我們實現一個私有的 requestJokes() 函數,通過 HttpClient 的 GET 請求獲取笑話列表。

import { map } from 'rxjs/operators';
 
@Injectable()
export class JokeService {
 
  constructor(private http: HttpClient) { }
 
  get jokes() {
    ...
  }
 
  private requestJokes() {
    return this.http.get<JokeResponse>(API_ENDPOINT).pipe(
      map(response => response.value)
    );
  }
}

現在,我們有了實現獲取笑話的函數的一切準備。

一個顯而易見的實現是,直接返回 this.requestJokes() ,但這無法滿足我們的需要。我們知道,所有 HttpClient 暴露的函數,例如 get() ,都是返回一個冷 Observable 。這意味著每個訂閱者都會重新出發完整的數據流,從而帶來額外的 HTTP 請求。畢竟,緩存的意義就在于提高系統的加載時間,將網絡請求限制到最低的水平。

所以,我們想讓我們的流變成熱的。不僅僅如此,每一個新的訂閱者應該獲取到最近的緩存值。事實上,有一個很方便的操作符可以實現這一點: shareReplay 。這個操作符返回一個 Observable 對象。該對象會在底層共享一個訂閱,也就是 this.requestJokes() 返回的那個 Observable 。

另外, shareReplay 接受一個可選參數 bufferSize ,對于我們的用例非常有用。 bufferSize 決定了重現緩存(replay buffer)的最大元素數,也就是被緩存、能夠重現給每一個訂閱者的元素數。在我們的場景中,我們只需要最近的一個值,因此將 bufferSize 設置為 1。

我們看一下實際代碼,看看我們剛剛學到了什么:

import { Observable } from 'rxjs/Observable';
import { shareReplay, map } from 'rxjs/operators';
 
const API_ENDPOINT = 'https://api.icndb.com/jokes/random/5?limitTo=[nerdy]';
const CACHE_SIZE = 1;
 
@Injectable()
export class JokeService {
  private cache$: Observable<Array<Joke>>;
 
  constructor(private http: HttpClient) { }
 
  get jokes() {
    if (!this.cache$) {
      this.cache$ = this.requestJokes().pipe(
        shareReplay(CACHE_SIZE)
      );
    }
 
    return this.cache$;
  }
 
  private requestJokes() {
    return this.http.get<JokeResponse>(API_ENDPOINT).pipe(
      map(response => response.value)
    );
  }
}

好了,我們已經討論過上面的大部分代碼。但等等,私有的 cache$ 屬性以及訪問函數里面的 if 語句是什么意思?答案很簡單。如果我們直接返回 this.requestJokes().pipe(shareReplay(CACHE_SIZE)) ,那么,每一個訂閱者都會創建一個新的緩存實例。但是,我們想要所有訂閱者共享一個實例。因此,我們將這個實例保持在私有的 cache$ 屬性中,在第一次調用的時候初始化這個屬性。這樣,所有訂閱者都會訪問到這一個共享實例,而不是每次創建一個新的對象。

我們看一下上面代碼實現的更直觀的表示:

上圖是一個 時序圖 ,它描述了場景中涉及的對象,請求一個笑話列表,以及對象之間交換信息的時序。現在我們暫停一下,了解下發生了什么。

我們從導航到列表組件的儀表盤開始。

組件初始化之后,Angular 會調用 ngOnInit 生命周期鉤子。在這里,我們調用 JokeService 暴露的訪問器 jokes 請求笑話列表。由于這是我們第一次請求數據,所以緩存是空的,并且沒有初始化,這意味著 JokeService.cache$ 是 undefined 。我們在訪問器內部調用了 requestJokes() 。這會返回我們一個 Observable 對象,其數據來自服務器。同時,我們使用 shareReplay 運算符來獲得所期望的行為。

shareReplay 操作符會在原始源于未來所有的訂閱者之間自動創建一個 ReplaySubject 。只要訂閱者數量從零變為一,它就會將這個 Subject 關聯到底層數據源,然后廣播其所有值。未來所有訂閱者都會關聯到這個 Subject ,所以實際上只有一個訂閱關聯到底層的那個冷 Observable 。這被稱為 多播 (multicasting),定義了我們的簡單緩存的基礎。

一旦數據從服務器獲取到,就會被緩存。

注意,在時序圖中, Cache 是一個獨立的對象,目的是用來說明那個從消費者(訂閱者)到底層源(HTTP 請求)之間創建的 ReplaySubject 。

下一次我們為列表中組件請求數據的時候,我們的緩存就會發送最近的值給消費者。這時候并不會有額外的 HTTP 調用。

很簡單,對吧?

為了真正的區別開來,我們更進一步,從 Observable 的層次看看緩存是如何工作的。現在我們使用 彈子圖 (marble diagram)看看流:

彈子圖非常清晰地顯示出,底層 Observable 只有一個訂閱,所有消費者都訂閱到這個共享的 Observable ,也就是這個 ReplaySubject 對象。我們也能夠看出,只有第一個訂閱者觸發了 HTTP 調用,其余的都是直接獲取重現的值。

最后,我們看一下 JokeListComponent 是如何顯示數據的。首先,注入 JokeService 對象。之后,在 ngOnInit 中,使用服務對象暴露的訪問器初始化一個 jokes$ 屬性。這個訪問器會返回一個 Array<Joke> 類型的 Observable 對象,這正是我們所需要的。

@Component({
  ...
})
export class JokeListComponent implements OnInit {
  jokes$: Observable<Array<Joke>>;
 
  constructor(private jokeService: JokeService) { }
 
  ngOnInit() {
    this.jokes$ = this.jokeService.jokes;
  }
 
  ...
}

注意,我們并沒有馬上訂閱 jokes$ ,而是在模板中使用了 async 管道,因為這個管道充滿了奇跡。好奇嗎?請閱讀文章 《了解關于 AsyncPipe 你不知道的三件事情》

<mat-card *ngFor="let joke of jokes$ | async">...</mat-card>

太棒了!這就是我們實際使用的簡單緩存。為了驗證是不是只有一次請求,打開 Chrome 的 DevTools,點擊 Network 選項卡,選擇 XHR。開啟儀表盤,導航到列表視圖,然后再導航回來。

自動更新

現在我們用幾行代碼構建了一個簡單的緩存機制。事實上,很多工作都是由 shareReplay 操作符完成的。這個操作符會實現緩存和重現大多數數據值。

當數據不會在后臺更新時,這就已經很好地工作了。那么,如果數據每隔幾分鐘就會改變了呢?我們當然不應該強制用戶為了從服務器獲取最新數據必須要刷新這個頁面。

如果我們的緩存每隔 10 秒鐘就會在后臺更新呢?是不是很酷?肯定的!作為用戶,我們不需要重新加載頁面;數據改變后,UI 會隨之更新。再說一遍,在真實的應用中,我們一般不會主動拉取數據,而是要服務器 推送 通知。對于我們的小示例程序而言,能夠 每隔 10 秒刷新 一次就很好了。

這種實現很簡單。簡而言之,我們需要創建一個 Observable ,根據給定的時間間隔發出一系列值。或者簡單來說,就是我們需要每 X 毫秒產生一個值。對達到這一目的,我們有很多種實現。

第一個選擇是使用 interval 操作符。這個操作符要求一個可選參數,定義了每次發送值得時間。考慮下面的代碼:

import { interval } from 'rxjs/observable/interval';
 
interval(10000).subscribe(console.log);

這里我們創建了一個能夠發送無限整數序列的 Observable 對象。該對象每隔 10 秒會發出一個整數值。這意味著第一個值也會在延遲給定時間之后才會發出。為了更好理解這一行為,我們可以看一下 interval 的彈子圖。

對,就像我們想的那樣。這一個值會“延遲”,這不是我們想要的。為什么?因為如果我們從儀表盤導航到列表組件,希望閱讀一些有趣的笑話,我們得等待 10 秒鐘,才會從服務器請求數據,然后顯示到屏幕。

我們通過引入另外一個操作符來解決這一問題。這個操作符是 startWith(value) ,可以先發出一個給定值作為初始值。但我們可以做得更好!

我會告訴你,其實有一個操作符可以在給定的一段是時間之后(初識延遲)按照特定時間(正常間隔)發出一個值的序列。這就是 timer 。

可視化時間!

酷!但這真的解決我們的問題了嗎?是的。如果我們將初始值設置為 0,將間隔時間設置為 10 秒,我們就會有類似 interval(10000).pipe(startWith(0)) 的行為,但只用了一個操作符。

讓我們把這種實現帶到我們的緩存機制中去吧。

我們需要建立一個 定時器 ,每次觸發都發送一個 HTTP 請求,去服務器獲取新的數據。也就是說,每一次定時器觸發,我們需要使用 switchMap 轉換到一個 Observable 對象,在訂閱時獲取新的笑話列表。使用 switchMap 還有一個額外的好處是,我們可以避免競爭條件。這是這個操作符天生就有的特點,它會取消 Observable 之前的訂閱,僅僅為最新的對象發出值。

我們的緩存的剩余部分不需要改變,意味著我們的流還是多播的,所有的訂閱者共享一個底層源。

再說一遍, shareReplay 天生就會將新的值廣播給所有訂閱者,并且將最近的值發送給新的訂閱者。

正如我們在彈子圖看到的那樣, timer 每 10 秒發出一個值。每一個值都會轉換成一個內部 Observable 對象,去獲取我們所需要的數據。因為我們使用了 switchMap ,我們避免了競爭條件,因此消費者只會接收到值 1 和 3。內部 Observable 對象發出的第二個值會被“跳過”,因為新值到了的時候已經取消了。

讓我們利用學到的知識,更新下 JokeService 。

import { timer } from 'rxjs/observable/timer';
import { switchMap, shareReplay } from 'rxjs/operators';
 
const REFRESH_INTERVAL = 10000;
 
@Injectable()
export class JokeService {
  private cache$: Observable<Array<Joke>>;
 
  constructor(private http: HttpClient) { }
 
  get jokes() {
    if (!this.cache$) {
      // Set up timer that ticks every X milliseconds
      const timer$ = timer(0, REFRESH_INTERVAL);
 
      // For each tick make an http request to fetch new data
      this.cache$ = timer$.pipe(
        switchMap(_ => this.requestJokes()),
        shareReplay(CACHE_SIZE)
      );
    }
 
    return this.cache$;
  }
 
  ...
}

厲害!想自己試試嗎?下面是一個現實的示例。從儀表盤開始,到列表組件,然后看看有什么魔法出現。等幾秒鐘,就可以看到更新的動作。記住,緩存每 10 秒刷新一次,但通過修改 REFRESH_INTERVAL 的值就可以改變這一間隔。

發送更新通知

讓我們回顧一下目前所構建的內容。

當我們通過 JokeService 請求數據時,我們希望數據從緩存獲得,而不是每次都去請求服務器。緩存的底層數據每 10 秒刷新一次。刷新之后,數據會推送給組件,組件自動更新。

這有點問題。想象一下,如果我們是一個用戶,正在閱讀某個笑話,突然間這個笑話消失了,因為 UI 自動更新了。這無疑非常討厭,是很壞的用戶體驗。

因此,我們的用戶應該在有新數據時獲得 通知 。換句話說,我們希望用戶自己去更新 UI。

事實證明,我們不需要修改服務來實現這個功能。這個邏輯很簡單。畢竟,我們的服務不應該關心發送通知的問題,視圖應該負責何時怎樣更新屏幕的數據。

首先,我們需要給用戶展示一個 初始值 ,否則的話在第一次緩存更新之前,屏幕就是空白的。我們馬上就會看到原因。設置一個初始化的流與調用訪問器函數一樣簡單。另外,既然我們只關心第一次的值,我們可以使用 take 操作符。

為了邏輯上的可讀性,我們創建一個副主函數 getDataOnce() 。

import { take } from 'rxjs/operators';
 
@Component({
  ...
})
export class JokeListComponent implements OnInit {
  ...
  ngOnInit() {
    const initialJokes$ = this.getDataOnce();
    ...
  }
 
  getDataOnce() {
    return this.jokeService.jokes.pipe(take(1));
  }
  ...
}

從我們的需求可以知道,我們只想在用戶真正需要更新 UI 的時候出去更新,而不是自動更新。你會問,用戶如何要求更新界面?當用戶點擊了“更新”按鈕時,我們就知道用戶想要更新 UI。這個按鈕和通知一起顯示。現在,我們不去關心通知,把注意力集中在點擊按鈕之后的更新邏輯上面。

為了實現這一目的,我們需要從 DOM 事件(也就是按鈕點擊的事件)創建一個 Observable 對象。有很多種方法可以實現,但最常用的是使用 Subject 對象作為模板與組件視圖邏輯之間的 橋梁 。簡單來說, Subject 即是 Observer 又是 Observable 。 Observable 定義了數據流,能夠發出數據; Observer 則能夠訂閱 Observable 并且接收數據。

好消息是,我們可以在模板的事件綁定中直接使用 Subject ,然后在事件發出時調用其 next 函數。這會產生一個特定值,廣播給所有監聽這個值的 Observer 對象。注意,如果 Subject 是 void 類型的,我們可以簡單地忽略這個值。事實上,我們的用例就是這樣子的。

讓我們繼續,實例化一個 Subject 對象吧。

import { Subject } from 'rxjs/Subject';
 
@Component({
  ...
})
export class JokeListComponent implements OnInit {
  update$ = new Subject<void>();
  ...
}

下面繼續,將這個值用到模板中。

<div class="notification">
  <span>There's new data available. Click to reload the data.</span>
  <button mat-raised-button color="accent" (click)="update$.next()">
    <div class="flex-row">
      <mat-icon>cached</mat-icon>
      UPDATE
    </div>
  </button>
</div>

注意我們在 <button> 標簽的點擊事件使用了 事件綁定 語法。當點擊按鈕時,我們發出一個 幽靈 值,可以被所有活動的 Observer 注意到。我們將其稱為“幽靈”,是因為我們不會傳遞任何值,或者說只是一種 void 類型的值。

另一種實現是使用 @ViewChild() 裝飾器結合 RxJS 的 fromEvent 操作符。但是,這會要求我們“混合” DOM 以及從視圖查詢 HTML 元素。使用 Subject ,我們僅僅將兩邊橋接起來,除了向按鈕添加事件綁定之外,并不再觸及 DOM。

好了,視圖設置完畢,我們可以切換回更新 UI 的邏輯部分。

那么,更新 UI 意味著什么?既然緩存已經在后臺自動刷新了,我們想要在點擊按鈕之后從緩存獲取到最新值來渲染界面,對吧?這意味著我們的 源數據流 也是一個 Subject 。每當 update$ 發出一個值,我們都希望將其 映射 到一個能夠給我們最新值的 Observable 對象。換句話說,我們正在處理的是所謂“ 高階可觀察對象 (Higher Order Observable)”,一個能發射 Observable 的 Observable 對象

之前我們知道, switchMap 正是為了解決這個問題。這一次我們選擇使用 mergeMap 。這個操作符非常像 switchMap ,區別在于它不會取消之前訂閱的內部 Observable 對象,而是將內部發出值合并到外部的 Observable 。

事實上,當從緩存請求最新值時,HTTP 請求已經完成,緩存已經更新。因此,在這里我們并不會遇到競爭條件。雖然看起來像是異步的,但實際上是 同步 的,因為值在同一時間發出。

import { Subject } from 'rxjs/Subject';
import { mergeMap } from 'rxjs/operators';
 
@Component({
  ...
})
export class JokeListComponent implements OnInit {
  update$ = new Subject<void>();
  ...
 
  ngOnInit() {
    ...
    const updates$ = this.update$.pipe(
      mergeMap(() => this.getDataOnce())
    );
    ...
  }
  ...
}

太好了!每一次“更新”,我們都會使用我們之前實現的輔助函數從緩存中獲取到最新值。

現在,為屏幕上呈現的笑話提供數據流只是一小步。我們還需要將初始笑話列表與 update$ 流整合起來。

import { Observable } from 'rxjs/Observable';
import { Subject } from 'rxjs/Subject';
import { merge } from 'rxjs/observable/merge';
import { mergeMap } from 'rxjs/operators';
 
@Component({
  ...
})
export class JokeListComponent implements OnInit {
  jokes$: Observable<Array<Joke>>;
  update$ = new Subject<void>();
  ...
 
  ngOnInit() {
    const initialJokes$ = this.getDataOnce();
 
    const updates$ = this.update$.pipe(
      mergeMap(() => this.getDataOnce())
    );
 
    this.jokes$ = merge(initialJokes$, updates$);
    ...
  }
  ...
}

注意,我們使用輔助函數 getDataOnce() ,將每一個事件映射為一個最新的緩存值。回憶一下,這個函數內部會使用 take(1) 來獲取第一個值,然后結束整個流。這是至關重要的,否則的話,我們會得到一個正在運行的流,或者是直接連接到緩存。在這種情況下,我們只需點擊“更新”按鈕,就會終止強制更新 UI 的邏輯。

同時,因為底層緩存是多播的,所以重復訂閱緩存以便獲得最新值也是沒有任何問題的。

在我們繼續通知流之前,我們先暫停一下,用彈子圖看看現在我們實現了什么。

正如上面的圖中顯示的那樣, initialJokes$ 是至關重要的,因為沒有它,我們只能在點擊了“更新”之后才能在屏幕上看到內容。雖然數據在后臺每 10 秒更新一次,但我們沒有辦法點擊按鈕。這是因為按鈕時通知的一部分,我們并沒有將其顯示給用戶。

讓我們把這個坑填完,實現這個謎題的缺失的部分。

為了達到這一目的,我們需要創建一個 Observable 對象,負責顯示或隱藏通知。本質上,我們需要一個能發出 true 或 false 的流。當有更新的時候,這個值應該是 true ;當用戶點擊“更新”按鈕時,這個值應該是 false 。

另外,我們還得 跳過 由我們的緩存發出的第一個(初始)值,因為這不是一個刷新操作。

我們從流的角度思考這個問題,我們可以把它分解成多個流,然后再 合并 到一起,成為一個單一的可觀察的流。最終的流就是我們所需要的行為,顯示或隱藏通知。

理論已經足夠了!現在是編碼:

import { Observable } from 'rxjs/Observable';
import { Subject } from 'rxjs/Subject';
import { skip, mapTo } from 'rxjs/operators';
 
@Component({
  ...
})
export class JokeListComponent implements OnInit {
  showNotification$: Observable<boolean>;
  update$ = new Subject<void>();
  ...
 
  ngOnInit() {
    ...
    const initialNotifications$ = this.jokeService.jokes.pipe(skip(1));
    const show$ = initialNotifications$.pipe(mapTo(true));
    const hide$ = this.update$.pipe(mapTo(false));
    this.showNotification$ = merge(show$, hide$);
  }
  ...
}

這里,我們監聽從緩存發出的所有值,但是跳過第一個,因為它不是一個 刷新 操作。對 initialNotifications$ 上的每一個新值,我們將其映射為 true ,以便顯示通知。一旦點擊了通知中的“更新”按鈕, update$ 會產生一個值,我們將其簡單地映射為 false ,來使通知消失。

我們在 JokeListComponent 的模板中使用 showNotification$ ,通過切換 class 顯示或隱藏通知。

<div class="notification" [class.visible]="showNotification$ | async">
  ...
</div>

很好!我們已經非常接近最終的解決方案了。但在我們繼續之前,先來嘗試下實例。花點時間一步步瀏覽下代碼。

按需獲取新的數據

太棒了!經過了這么長的道路,我們已經為我們的緩存實現了很多非常酷的特性。在結束本文之前,我們要將緩存提升到一個新的層次。現在還有一件事留給我們。作為一個用戶,我們想要在任意時間點強制更新。

這并不是非常復雜,但我們得同時修改組件和服務。

讓我們從服務開始。我們需要一個公開的 API,能夠強制重新加載緩存中的數據。技術上來說,我們可以 完成 當前緩存,將其設置為 null 。這意味著下一次我們從服務器請求數據時,我們的服務會設置一個新的緩存,獲取數據并向未來的訂閱者存儲數據。當我們強制要求更新時創建新的緩存并不是什么大問題,因為原來的對象會終止,然后被垃圾回收。事實上,這么做還有一個好處是,我們可以重置定時器,這也是我們所需要的。例如,我們已經等待了 9 秒,然后點擊“獲取新笑話”。我們希望數據刷新,但是不想在 1 秒鐘之后就看到通知跳了出來。相反,我們想重新開始定時器,這樣當我們強制更新時,就會有另外的 10 秒之后才會觸發自動更新。

銷毀緩存的另一個原因是,比起讓緩存一直存在的其它機制,這種實現簡單得多。如果是這樣,緩存就需要知道是否需要強制重新加載。

我們創建一個 Subject 對象,用它來通知緩存結束。我們將利用 takeUntil 將其提取到我們的 cache$ 流。另外,我們需要實現一個公共 API,其作用是將緩存設置為 null ,然后向 Subject 對象廣播一個事件。

import { Subject } from 'rxjs/Subject';
import { timer } from 'rxjs/observable/timer';
import { switchMap, shareReplay, map, takeUntil } from 'rxjs/operators';
 
const REFRESH_INTERVAL = 10000;
 
@Injectable()
export class JokeService {
  private reload$ = new Subject<void>();
  ...
 
  get jokes() {
    if (!this.cache$) {
      const timer$ = timer(0, REFRESH_INTERVAL);
 
      this.cache$ = timer$.pipe(
        switchMap(() => this.requestJokes()),
        takeUntil(this.reload$),
        shareReplay(CACHE_SIZE)
      );
    }
 
    return this.cache$;
  }
 
  forceReload() {
    // Calling next will complete the current cache instance
    this.reload$.next();
 
    // Setting the cache to null will create a new cache the
    // next time 'jokes' is called
    this.cache$ = null;
  }
 
  ...
}

這部分沒做太多工作,所以我們繼續,以便將其用到 JokeListComponent 中。我們實現了一個函數 forceReload() ,當我們點擊了“獲取新笑話”按鈕之后會被調用。另外,我們還需要創建一個 Subject 對象,作為更新 UI 并且顯示通知的事件總線。我們馬上就會看到它的作用。

import { Subject } from 'rxjs/Subject';
 
@Component({
  ...
})
export class JokeListComponent implements OnInit {
  forceReload$ = new Subject<void>();
  ...
 
  forceReload() {
    this.jokeService.forceReload();
    this.forceReload$.next();
  }
  ...
}

在適當的位置上,我們將 JokeListComponent 模板中的按鈕連接起來,以便強制重新加載緩存數據。我們需要做的就是使用 Angular 的事件綁定語法監聽點擊事件,然后調用 forceReload() 。

<button class="reload-button" (click)="forceReload()" mat-raised-button color="accent">
  <div class="flex-row">
    <mat-icon>cached</mat-icon>
    FETCH NEW JOKES
  </div>
</button>

這已經可以正常工作了,但只有當我們回到儀表盤,再重新進入列表視圖時才是正常的。這當然不是我們所需要的。我們想要強制更新緩存數據的時候,UI 能夠立即更新。

還記得我們實現了 updates$ 流,當我們點擊“更新”時,會從緩存獲取最新的數據?我們就需要類似這種的行為,所以我們需要擴展一下這個流。這意味著,我們需要將 update$ 和 forceReload$ 合并 起來,因為這兩個流都需要更新 UI。

import { Subject } from 'rxjs/Subject';
import { merge } from 'rxjs/observable/merge';
import { mergeMap } from 'rxjs/operators';
 
@Component({
  ...
})
export class JokeListComponent implements OnInit {
  update$ = new Subject<void>();
  forceReload$ = new Subject<void>();
  ...
 
  ngOnInit() {
    ...
    const updates$ = merge(this.update$, this.forceReload$).pipe(
      mergeMap(() => this.getDataOnce())
    );
    ...
  }
  ...
}

這是不是很簡單?是的,但我們還沒完成。事實上,我們剛剛“打斷”了我們的通知。這本可以正常工作,直到我們點擊了“獲取新笑話”。屏幕上的數據會更新,緩存中的也是,但我們等待 10 秒之后,并沒有彈出通知。問題出在強制更新緩存會完成緩存實例,這意味著組件再也不會收到值。簡單來說,通知流( initialNotifications$ )死了。這很不想,怎么解決這個問題呢?

很簡單!我們可以監聽 forceReload$ 的事件,對其每一個值都切換到一個新的通知流。重要的是,我們需要取消之前的流的訂閱。聽起來耳熟嗎?好像我們需要 switchMap ,是不是?

讓我們動手實踐吧!

import { Observable } from 'rxjs/Observable';
import { Subject } from 'rxjs/Subject';
import { merge } from 'rxjs/observable/merge';
import { take, switchMap, mergeMap, skip, mapTo } from 'rxjs/operators';
 
@Component({
  ...
})
export class JokeListComponent implements OnInit {
  showNotification$: Observable<boolean>;
  update$ = new Subject<void>();
  forceReload$ = new Subject<void>();
  ...
 
  ngOnInit() {
    ...
    const reload$ = this.forceReload$.pipe(switchMap(() => this.getNotifications()));
    const initialNotifications$ = this.getNotifications();
    const show$ = merge(initialNotifications$, reload$).pipe(mapTo(true));
    const hide$ = this.update$.pipe(mapTo(false));
    this.showNotification$ = merge(show$, hide$);
  }
 
  getNotifications() {
    return this.jokeService.jokes.pipe(skip(1));
  }
  ...
}

好了。只要 forceReload$ 一發出值,我們就取消訂閱之前的 Observable ,切換到一個新的通知流。注意我們有一段代碼需要兩遍,也就是 this.jokeService.jokes.pipe(skip(1)) 。為了避免重復代碼,我們創建一個函數 getNotifications() ,返回跳過第一個值的笑話的流。最后,我們將 initialNotifications$ 和 reload$ 合并到名為 show$ 的流。這個流負責在屏幕上顯示通知。這里并不需要取消訂閱 initialNotifications$ ,因為這個流在下一次訂閱重新創建緩存之前就已經結束了。其余部分保持不變。

呼,我們完成了。現在花掉事件看看我們實現了什么。

在彈子圖中可以看到, initialNotifications$ 對顯示通知非常重要。如果我們缺失了這個流,就只能在強制更新緩存之后才會看到通知。也就是說,我們按需請求新的數據時,必須不斷切換到一個新的通知流,因為之前的(舊的) Observable 對象已經完成,不會再發出值。

大功告成!我們使用 RxJS 和 Angular 提供的工具創建并實現了一個復雜的緩存機制。回顧一下,我們的服務暴露了一個笑話列表的流。底層的 HTTP 請求每 10 秒更新緩存。為了改進用戶體驗,我們顯示一個通知,以便用戶強制更新 UI。在這之上,我們還實現了一個允許用戶按需請求新的數據的方法。

太棒了!這就是最終的解決方案。花幾分鐘檢查一下代碼,嘗試下不同的場景,看看一切是否正常。

前景

如果你想做點作業,或者再多思考下,下面有幾個可以改進的地方:

  • 添加錯誤處理
  • 重構組建中的邏輯到一個服務以便重用

特別感謝

特別感謝 Kwinten Pisman 幫助完成代碼。同樣,感謝 Ben LeshBrian Troncone 提供的寶貴反饋以及之處一些改進點。另外,感謝 Christoph Burgdorf 幫助復查文章和代碼。

 

來自:https://www.devbean.net/2018/06/advanced-caching-with-rxjs/

 

 本文由用戶 1159927186 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!