初識Agera(一)——基本概念和原理的介紹
近日Google開源了一個基于觀察者模式的項目Agera:Reactive Programming for Android。還是菜鳥的我剛看完RxJava就發現Google開源了一個"類似"的項目自然得學習一下。本來結合自己的理解簡單介紹一下Agera的基本概念和原理以便更好的使用Agera。
什么是Agera
Agera is a set of classes and interfaces to help write functional, asynchronous, and reactive applications for Android.Requires Android SDK version 9 or higher.
Agera是用于Android開發者更方便地開發函數,異步,響應式程序的框架,它要求Android SDK版本9以上。
Agera是一個超輕量的Android庫,有助于準備數據用于讓Activity或者其中的事物(例如View)這些具有生命周期的事物消耗。它介紹了一種函數式響應編程,有助于更清晰的分離when,where以及what這三個數據處理流的要素,以及用近乎自然語言這樣簡潔的表達式來描述復雜,異步的流程。
要學會知道怎么使用應該先知道Agera的原理吧,因此接下來詳細地介紹一下Agera。。
一、Reactive Programming
Ageray的響應式編程范型以著名的觀察者模式作為他的驅動機制。一個可觀察物通過接口 Observable 描述,并且有責任去廣播事件給所有注冊的觀察者。一個觀察者通過接口 Updatable 描述,可以注冊以及從 Observable 對象取消注冊,并且通過更新它自己來響應事件。
接下來的文檔也將用observable和updatable來代表實現了這兩個接口的java對象。
push event ,pull data
Agera使用的是push事件,pull數據的模型。這就意味著事件本身不攜帶數據,當updatable響應事件時如果需要的話就必須自己從它的數據源中抓取數據。
用這種方法,就把接口Observable供應數據的責任移除了,允許接口封裝簡單的事件(例如一次按鈕的點擊,一次下拉刷新的觸發)用于廣播。但是,observable通常也能提供數據。如果一個observable能夠提供數據并且將一個事件定義為一次其供應的數據的改變,那么這個observable就被稱做Repository(實現了Suppiler的Observable)。這樣依然沒有改變push事件 ,pull數據的模型:當數據改變時這個Repository通知所有注冊的push event, pull data更新他們自己;當他們分別響應事件時從Repository中拉取數據。這個模型的優點就是分離數據消耗與事件分發,也就允許Repository執行懶計算。
取決于push event, pull data和一般的多線程處理,一個updatable可以不會見到repository提供的數據所有的改變歷史。這是故意如此的:在大多數情況下(特別是當更新App的UI),只需要處理最近的,最新的數據。
Agera風格的響應式客戶端的標準實現需要由下面幾點組成:
- 將updatable注冊到一個合適的observable用于通知相關事件
- 人為隨意調用updatable初始化或者糾正客戶端狀態
- updatable等待被任意observable調用,當updatable被調用時更新客戶端狀態必要時使用從數據源新拉到的數據。
- 當響應不再需要的時候取消updatable對同一組observable的注冊
二、Observables and updatables
正如之前提到的,一個observable代表一個事件源,一個updatable觀察這些事件。一個updatable通過Observable.addUpdatable(Updatable)注冊到observable,通過Observable.removeUpdatable(Updatable)取消注冊。一個事件以Updatable.update()的形式被發送到updatable。

Observables and updatables.png
一個Activity可以觀察來自于observable的事件:
public class MyUpdatableActivity extends Activity implements Updatable {
private Observable observable;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
observable = new MyObservable();
}
@Override
public void update() {
// Act on the event
}
@Override
protected void onResume() {
super.onResume();
observable.addUpdatable(this);
update();
}
@Override
protected void onPause() {
super.onPause();
observable.removeUpdatable(this);
}
}
在上面的代碼中,observable在onResume()中被激活,在onPause()中被失效,observable的生命周期與Activity的生命周期相匹配。
Updatable注冊與取消注冊應該成對出現。再次添加一個相同的updatable到observable是非法的。從observable移除一個updatable當該updatable尚未注冊到observable上或者已經取消注冊都是非法的。
Activation lifecycle and event chain
一個observable當它被至少一個updatable觀察時它處于active狀態,當他不被任意一個updatable觀察時處于inactive狀態。從另一個角度說:一個updatable通過注冊到處于inactive的observable上來激活observable;當處于active的observable僅具有一個注冊的updatable時,該updatable通過取消注冊來使其停用。

Activation lifecycle and event chain.png
一個observable可能觀察其它的在其“上游”(就事件傳播路徑而言)的observable并且將他們的event轉化為自身的事件,一個很普通的例子就是一個數據依賴于其它repository的repository。就正確的配線方式而言,如此一個中間的observable通常保持對其上游observable的強引用,但是僅僅當它自身是active時將他內部的updatable注冊到其上游observable上,當他處與inactive時取消內部updatable的注冊。這就意味著他在下游方向的強引用僅當有別的updatable注冊到它上面時存在。這也意味著最下游的updatable最終控制在事件鏈上所有的observable的激活與失效。
UI lifecycle
這個事件鏈特別適合伴隨UI的生命周期構建響應式結構。允許一個U元素是一個Activity,一個Fragment,或者在之內的view,它的活動生命周期可以通過一些Android生命周期事件對進行定義,例如從onStart到onStop,從onResume到onPause,從onAttachedToWindow到onDetachedFromWindow以及其他的。允許這個UI元素是或者持有一個updatable用于更新UI,該updatable使用repository提供的數據。該repository反過來使用其它的事件源和數據源(未必是repositories)來計算數據。
在UI元素的生命周期的開端,上述的updatable注冊到repository并因此激活它。這將連接上事件鏈并激活相關的數據處理流,使數據和UI最新。
在UI的生命周期的結尾事,上述的updatable從相同的repository上取消注冊,假設此時該事件鏈上沒有其它的updatable使得任意observable保持active,這將導致該事件鏈連鎖拆卸。如果UI元素將不再活躍(由于例如activity將被摧毀),因為當系統處于inactive狀態時,在事件鏈中并沒有下游引用,所以該UI元素可以自由的被回收,也就避免了activity泄漏。

UI Lifecycle
Threading
Agera提倡明確的線程處理,使用Loopers(有大量可用的lOOPER,例如app的主Looper和IntentService工作線程的Looper)來幫助定義下列線程處理契約。
就內部的激活生命周期處理,每一個observable與其被創建時所在的線程的Looper(worker Looper)有一生的聯系。該observable被激活和失效均來自該worker Looper。如果這個observable觀察處于激活狀態的其它observables,那個這個observable的updatable將從這個worker Looper注冊到其上游的observables上。
一個updatable必須從一個Looper 線程注冊到一個observable上,這個線程不需要和運行observable‘s worker looper的線程相同。observable會使用同一個Looper Thread來發送Updatable.update()調用updatable。
一個updatable可以從任何線程取消注冊。但是為了避免由于Looper的內部處理updatable取消注冊之后事件被發送到updatable的情況,推薦在updatable注冊所發生的線程里進行updatable的取消注冊操作。
Looper只要有observable或者注冊的updatable依賴于他,開發者就有責任使它保持激活存活狀態。由于死亡的Looper導致的異常和內存泄漏是開發者的責任(Agera是不會背這個鍋的)。實際上,除了使用一直存活的主Looper以外很少使用其他的Looper。
三、Repositories
上文已經提到,一個Repositories是一個可以提供數據的observable,它將一個事件定義為提供的數據的一次改變。提供的數據可以通過Repository.get()獲取。
Simple repositories
一個簡單的repository可以通過Repositories中的方法創建。
有兩個選擇:
- 提供相同數據并且不生成事件的靜態的repository()(repository(object))。
- 無論何時value被更新成了另一個value(通過Object.equals(Object))允許改變value和生成事件的可變repository(mutableRepository(object))。
這里兩種方法均會調用SimpleRepository(@NonNull final T reference, boolean mutable),只是所傳的mutable參數的不同。
天生的,這些簡單的repositories無論是否激活通常都能提供最新的數據。
Complex repositories
一個復雜的repository可以響應其它的repository或者任意通常的observable(該repository的事件源),以同步或者異步的方式,通過內部程序將從其他數據源獲取到的數據轉換生成自身的數據。r該epository提供的數據對事件源的事件做出響應保持最新,但是由于程序處理的復雜性,當repository處于inactive時可能不會選擇將數據保持最新。任何數據消費者必須通過注冊一個updatable來表達他想消費數據的企圖。這樣的操作會激活repository,但是未必迅速使數據最新;在repository發送第一個事件之前數據消費者可能仍然看到過時的數據。
Agera提供repository compiler,用近乎自然語言來聲明和實現一個complex repository.
四、Compiled repositories
一個complex repository可以通過一個單獨的java表達式編成。這個表達式由以下幾部分組成:
- Repositories.repositoryWithInitialValue(...);
- Event sources - .observe(...);
- Frequency of reaction - .onUpdatesPer(...) or .onUpdatesPerLoop()
- Data processing flow - .getFrom(...), .mergeIn(...)
, .transform(...), etc.; - Miscellaneous configurations - .notifyIf(...) , .onDeactivation(...), etc.;
- .compile()
這個我想在后面結合Demo再詳細的理解,這里就僅僅是一些介紹。
當被編成的repository被激活時,它注冊一個內部的updatable到給定的事件源并開始第一次開啟數據處理流來計算暴露的數據。這個數據處理流響應來自事件源的事件再次更新數據。在第一次計算完成前,這個repository暴露初始化數據也就是repositoryWithInitialValue所指定的數據。無論何時數據被更新,repository的客戶端(我覺得可以理解為監聽他的updatable)被通知。當repository被去活化時,內部的updatable從事件源中取消注冊,數據處理流不再運行,所以內部的數據可能變得過時。當再次激活,數據將再一次更新。
表達式中不同的階段用嵌套在RepositoryCompilerStates接口中編譯狀態接口描述。這些接口在每個階段僅僅暴露一些合適的方法來引導開發者去正確的補全表達式(可以使用IDE的自動補全).
這些方法的完整文檔可以在這些接口中看到;特別是接下來的每個部分:
- 響應的事件源和頻率:RFrequency和他的父類REventSource
- 數據處理流:RFlow和他的父類RSyncFlow
- 各種各樣的配置:RConfig
這個repository編寫表達式不應該在中間被打斷。為了捕獲一個變量的中間對象或者將其轉換為另一個接口的用法是不支持的。
編寫一個repository會招致一些開銷,但是之后的操作是相當輕量的。任何repository最好是和一些具有生命周期的例如activity,可復用的view hierarchy等高級組件或者服務于整個應用的全局單例的創建相關聯。特別是complied repository,因為他的編譯的確會造成開銷(發生在運行時)。
When, where, what
編成的repository表達式清晰的記載了whenrepository響應事件,where響應發生的線程,what構成了暴露的數據。
repository按照給定的頻率監聽給定的事件源,這兩部分組成了when要素。
數據處理流指定了數據的起源以及repository中數據的計算,這就是what要素。
由于要使用內部的updatable,他必須從一個Looper Thread注冊到事件源,所以編成的repository就和一個worker Looper相關聯了(接下來Asynchronous programming會講到)。在數據處理流中,可以插入指令使處理流移動到java Executors上。這樣明確的線程處理設計組成了where要素。
Data processing flow
數據處理流由指令組成,每一條指令接收一個輸入變量并為下一條指令生成一個輸出變量。第一條的指令的輸入變量類型是repository的變量類型,同時也是最后一條以then開頭的指令的輸出變量的類型。這些編譯者狀態接口只要可能就使用通用的類型參數來確保類型安全,伴隨著輸入類型的抗變性(下一條指令可以接收當前指令的輸出類型的父類)和輸出類型的共分散(最后一條指令可以生成該repository的變量類型的子類)(這句話我也表示有點懵逼,后面再結合源碼理解一下)。
當數據流運行時,通過Repository.get()取出的當前repository的變量被用來當做第一條指令的輸入變量。如果在這之前數據流尚未更新數據或者repository因為RepositoryConfig.RESET_TO_INITIAL_VALUE配置而被重置,這個數據可能是repository的初始化數據。指令被有序的運行來轉換這個輸入的變量。運行生成最終變量的以then開頭的指令或者運行一條結束流并生成一個變量的終止子句(例如orEnd,通過RTermination狀態接口描述,接下來會在“Attempts and Result”中描述)后,數據處理流通常會結束,在這種情況下,repository數據被更新并通知注冊的updatable。如果使用.thenSkip()指令或者其他跳過接下來操作的終止子句例如orSkip(),該數據流會被突然終止并且跳過更新自身數據并且不會通知更新。
Operators
為了讓數據流可以調用客戶端代碼邏輯,Agera指定一下接口各自提供了一個方法:
- Supplier.get(): a 0-input, 1-output operator;
- Function.apply(TFrom): a 1-input, 1-output operator;
- Merger.merge(TFirst, TSecond): a 2-input, 1-output operator.
下面的指令會使用它們: - .getFrom(Supplier) and variants;
- .transform(Function) and variants;
-
.mergeIn(Supplier, Merger) and variants
.getFrom(Supplier)表示忽略輸入變量,使用從給定Supplier中獲取數據作為輸出變量;.transform(Function)表示將給定的Function將輸入變量轉換為輸出變量;.mergeIn(Supplier, Merger)表示將輸入變量和從Supplier中新拿到的數據通過Merger轉換為輸出變量。
正如下面圖片所示:

Operators.png
為了更高級的功能,數據處理流提供了非線性的操作(數據通過這個方法到流的外面,或者終止流,具體的信息可以去看源碼,文檔中有描述)。這些方法通過下面的接口提供:
- Receiver.accept(T): a 1-input, 0-output operator;
- Binder.bind(TFirst, TSecond): a 2-input, 0-output operator;
- Predicate.apply(T): an operator that checks the input value for a yes-or-no answer.
下面的指令會使用這些操作: - .sendTo(Receiver) and variants;
- .bindWith(Supplier, Binder) and variants;
- .check(Predicate).or,and variants,
.sendTo(Receiver)表示將輸入的變量發送到給定的receive,然后傳遞輸入變量當做指令的輸出變量;.bindWith(Supplier, Binder)表示將輸入變量和Supplier中的數據傳遞給Binder,然后不修改輸入變量作為輸出變量;.check(Predicate)表示如果Predicate適用于輸入變量,則流繼續運行,否則執行之后的終止子句并將該輸入變量作為子句指令的輸入變量。

Operators.png
為了實現模塊化結構,Repository實現了Supplier接口,MutableRepository實現了Supplier和Receiver,所以可以直接在complex repository中作為操作符。
Attempts and Result
功能接口Supplier,Function和Merger被定義成不拋出異常,但是實際上,很多操作可能會失敗。為了捕獲這些失敗,Agera提供了一個包裝類Result,這個類封裝了(不管是成功或者失敗)操作或者嘗試的結果。這些嘗試可能作為一個Supplier, Function或者Merger來實現,并且會返回一個結果。
* An immutable object encapsulating the result of an <i>attempt</i>. An attempt is a call to
* {@link Function#apply}, {@link Merger#merge} or {@link Supplier#get} that may fail. This class
* helps avoid throwing exceptions from those methods, by encapsulating either the output value of
* those calls, or the failure encountered. In this way, an attempt always produces a {@link Result}
* whether it has {@link #succeeded} or {@link #failed}.
*
* <p>This class can also be used to wrap a nullable value for situations where the value is indeed
* null, but null is not accepted. In this case a {@link Result} instance representing a failed
* attempt to obtain a non-null value can be used in place of the nullable value.
數據流提供了一系列能意識到失敗的指令,如果失敗發生,這些執行可以終止數據流并執行后續的終止子句:
- .attemptGetFrom(Supplier).or…;
- .attemptTransform(Function).or…;
- .attemptMergeIn(Supplier, Merger).or…,
.or...代表的是終止子句,由上面的RTermination接口描述。.orSkip()表示一旦失敗就跳過更新。.orEnd(Function)表示一旦失敗就結束當前數據流并用Function生成的結果更新當前編成repository的數據,如果需要的話會通知相關的updatable。
因為這些.attempt*指令確保下一條指令只會接收到正確的結果,內部使用的操作符生成Result<T>,所以這些指令的輸出類型是T而不是Result<T>,例如attemptGetFrom所示:
RTermination<TVal, Throwable, ? extends RSyncFlow<TVal, TCur, ?>> attemptGetFrom(
@NonNull Supplier<Result<TCur>> attemptSupplier);
Supplier會生成Result<TCur>,但是attempGetFrom會得到一個TCur類型的結果。
對稱的,一個操作符也可以是recovery operation(這個單詞我感覺怎么翻譯都不恰當),也就是可以使用Result類的變量作為輸入。一個操作符使用Result類變量作為輸入并且生成Result類輸出,就被稱作attempt recovery operator。例如:
private static final class HttpResponseToBitmap
implements Function<Result<HttpResponse>, Result<Bitmap>> {
@NonNull
@Override
public Result<Bitmap> apply(@NonNull final Result<HttpResponse> input) {
final byte[] body = input.get().getBody();
Bitmap bitmap = decodeByteArray(body, 0, body.length);
Log.i(TAG,"HttpResponseToBitmap apply "+(bitmap!=null));
return absentIfNull(bitmap);
}
}
為了在數據處理流中使用這樣的操作符,之前的指令是不能意識失敗的(即便使用了attempt操作符),所以之前指令的成功和失敗的結果(以Result的類型)recovery operator都能收到。
Asynchronous programming
這個repository必須在一個Looper線程(通常是主線程)上編成。這個Looper就是這個repository的的worker repository,接下來的處理也會在這個Looper線程上執行:
- 客戶端updatable的注冊與取消注冊
- 對事件源的監聽,處理和限制頻率
- 開啟一個新的數據處理流
這個數據處理流不要求在這個Looper線程上同步的完成。特殊的指令 .goTo(Executor) 和 .goLazy() 確保異步編程。這些指令不改變輸入的變量;它們僅僅在運行時控制流的延續:.goTo(Executor)將剩下的執行指令發送到給定的Executor,.goLazy()將暫停執行直到Repository.get()第一次被調用。
在.goTo(Executor)之后,該worker Looper線程被釋放可以用處理其他的事件,這個repository同時可能通過他的updatable被失效或被事件源通知更新。在后一種情況,為了減少競爭條件,這個數據處理流被調度重新開始運行而不是開啟一個和正在運行的流同時運行。這repository可以通過onDeactivation和onConcurrentUpdate配置來取消這個流。這個有助于保護資源(就失效而言)和快速重新運行(就更新而言)。一個被取消的流禁止改變repository的數據和通知updatable更新。取消行為可以通過.onDeactivation(int) 和.onConcurrentUpdate(int)來配置,這兩個方法在RConfig狀態接口中定義。
就.goLazy()指令而言,如果repository數據發生更新,那么他的updatable會被通知更新,但是是否去更新repository要取決于后面的指令。當Repository.get()被調用時,因為該方法需要產出一個數據,該數據流將在這個線程同步重新啟用并且從此時忽略取消信號。另一方面,如果在Repository.get()重新運行這個暫停流之前repository收到一個來自事件源的更新通知,暫停狀態和中間的變量將被丟棄,接下來的指令也不會再次運行,這個流將快速重啟。在流重啟之后并且在再次到達.goLazy()指令之前調用Repository.get()將會返回repository的上一個數據。因為.goLazy()有助于跳過不必要的計算,有策略的使用它有助于提升程序的執行。
五、Compiled functions
compiled repository的數據處理流是一種數據結構不可知論者(除了Result包裝)。實際上,數據流很普遍的用于處理lists數據(例如在RecyclerView中使用)。舉個特例,接下來是一個通過網絡的一系列數據的程序的流程:
- Download the data as a byte array;
- Parse the data into some object representation;
- Extract the items from the object representation;
- Perform any additional transformation for each item into a form (UI model object) ready to render by an Adapter, and/or, perform any filtering task on the list to include or exclude specific items;
- Set the resulting list as the data source of the Adapter.
開發者可能打算將前四步封裝到一個function來讓compiled repository調用,并且用一個updatable用第5步來響應這個提供了很多UI模型對象的repository。如果更多的子程序(例如將數據模型轉換為UI模型)是分離可用的,將這些全部程序封裝成一個function,這樣更容易使用和增加可讀性。
Agera提供了一種類似于complied repository的風格來使用可重用的小操作符來編寫function:
// For type clarity only, the following are smaller, reused operators:
Function<String, DataBlob> urlToBlob = …;
Function<DataBlob, List<ItemBlob>> blobToItemBlobs = …;
Predicate<ItemBlob> activeOnly = …;
Function<ItemBlob, UiModel> itemBlobToUiModel = …;
Function<List<UiModel>, List<UiModel>> sortByDateDesc = …;
Function<String, List<UiModel>> urlToUiModels =
Functions.functionFrom(String.class)
.apply(urlToBlob)
.unpack(blobToItemBlobs)
.filter(activeOnly)
.map(itemBlobToUiModel)
.morph(sortByDateDesc)
.thenLimit(5);
這個function的可讀性棒吧!!!
就可復用性這個術語而言,意味著這段操作背后的邏輯在其它地方也需要。在complied function中只需要很少的工作就將它們封裝成了Function接口,如果讀過代碼,右邊的表達式最終將生成一個ChainFunction類型。為了使用FunctionCompiler 預先準備了很多Function/Predicate定義,這就導致了很多花費(編譯時需要編譯附加的類,運行時需要加載這些類,生成這些對象,并且鏈接它們變成一個Complied function)。這種方法可能會比直接自己寫自定義function更糟糕。所以開發者一定要考慮僅當只減少代碼行數的時候才應該使用function compiler。
function compiler通過定義在FunctionCompilerStates的編譯者狀態接口支持。如同repository compiler,這個表達式同樣不能在中間被打斷。
六、Reservoirs and parallelism
取決于push event,pull data模型和一般的多線程處理,一個updatable可能不會見到repository的數據的所有改變歷史。這是因為在大多數情況下(特別是用于更新App的UI),只有最近,最新的數據需要處理。但是,如果updatable需要知道所有的改變的數據改變歷史呢?Agera提供了一個Repository的子類Reservoir。在這種情況Reservoir很適用。
public interface Reservoir<T> extends Receiver<T>, Repository<Result<T>> {}
Reservoir是Queue的響應式版本。數據可以通過Receiver接口進隊到reservoir,這會通知他的客戶updatable,這個updatable會通過Repository(準確說是Supplier)接口反過來是相同的數據出隊。reservior的訪問是同步的,所以不可能有兩個客戶出隊同一個實例(在這個地方,實例被定義為一個成功入隊的數據;如果同一個變量(同一個java對象引用)被多次入隊,在reservoir里他們是不同的實例)。reservoir返回結果類型是被Result包裝了的,所有如果一個客戶當repository是空的時候試圖出隊數據,他將收到Result.absent()作為一個失敗的提示。
reservoir更適合于作為必須響應每一個數據的響應者的事件源。如果合適的話,可以使用complied repository實現響應者,它使用reservoir作為它的事件源之一,利用.attemptGetFrom(reservoir).orSkip()開啟數據處理流。只要repository處于激活狀態,在reservoir和complied repository之間的observable-updatable關系將會消費所有提交到reservoir的數據。
簡單的并行可以使用一個reservoir和compiled repositories的多個實例,這些repository均按照上述的方式使用reservoir作為它的事件源。數據可以被提交給reservoir,然后每一個repository都試圖出隊該數據用于自身的數據處理流。為了實現真正的并行,這些repository必須將處理移動到多線程的執行器或者在不同的worker Looper運行。
七、Custom observables
Agera可以很簡單的實現自定義Observable。
Proxy observables
一個代理observable傳遞其它observable(其它源observable)的事件,對于這些事件做一些小的或不處理。類Observables提供了下列標準代理observable的創建方法:
- compositeObservable that composites multiple source observables;
- conditionalObservable that suppresses events from a source observable during the times a specified condition does not hold;
- perMillisecondObservable and perLoopObservable that throttle the event frequency of a source observable.
提供了三種代理Observable:compositeObservable組合了多個源observable,conditionalObservable當指定條件未達到時壓制源observable的事件,perMillisecondObservable 和 perLoopObservable則壓制源observable發送事件的頻率。
BaseObservable
BaseObservable完整的實現了updatable的注冊,取消注冊,和通過線程處理通知更新。通過繼承它可以很簡單建立一個自定義observable。無論何時需要發送事件,子類只需要在任意線程簡單的調用dispatchUpdate()。下面的例子是將一個view的點擊事件轉換為一個observable:
public class ViewClickedObservable extends BaseObservable
implements View.OnClickListener {
@Override
public void onClick(View v) {
dispatchUpdate();
}
}
BaseObservable的子類可以通過重寫observableActivated()
和 observableDeactivated()監控這個observable的激活生命周期。observableActivated()將在生命周期的開端被調用, observableDeactivated()將在生命周期結束時被調用(順便提一句:observable的觀察者從0到至少一個表示激活,從至少一個到0個表示失活,中間這段時間就是他的生命周期)。這兩個方法被BaseObservable的worker Looper線程調用,這個線程指的是創建Baseobservable實例的線程。在大多數情況下所有的observable的worker Looper都是主線程的Looper,這也就減輕了同步鎖的需求。
UpdateDispatcher
當不能直接繼承BaseObservable或者不是最優時,例如該類已經繼承了另外一個類,依然很容易去實現Observable接口。一個UpdateDispatcher實例有助于實現一個用和BaseObservable同樣的方式管理updatable的,遵從線程處理約定的自定義Observable。
這個自定義observable需要通過或者重載Observables.updateDispatcher()來私有地持有一個update dispatcher,這個方法會接收一個ActivationHandler實例。ActivationHandler接口定義了observableActivated
和 observableDeactivated來監聽生命周期。和BaseObservable一樣,update dispatcher同樣需要一個worker looper用于工作,所以必須在在一個Looper線程中創建。
該自定義observable可以簡單的調用update dispatcher實現所有updatable的注冊與取消注冊。為了發送事件給所有的客戶updatable,可以通過調用UpdateDispatcher.update()。看名字就知道UpdateDispatcher是一個updatable,所以如果要自定義的observable是一個proxy observable并且需要注冊內部的updatable到其它的事件源,UpdateDispatcher是一個很好地選擇。
額外的提示,UpdateDispatcher同樣是Observable的子類,所以也能當做一個基本的observable使用。正如mutable repository連接了數據的生產者和消費者,UpdateDispatcher連接了事件的生產者和消費者。MutableRepository繼承了Repository,Receiver,數據生產者通過他的Receiver接口端提供數據給它,數據消費者通過Repository接口端從它獲取數據并消費數據。同樣的,UpdateDispatcher繼承了Observable和Updatable接口,事件生產者通過它的Updatable接口端發送事件給他,同時事件消費者通過它的Observable接口端接收事件。
八、Incrementally Agerifying legacy code
Agera介紹的風格可能更適合從頭開始編寫的新的app。接下來將是給希望將代碼轉換為Agera風格的開發者的tips。
Upgrading legacy observer pattern
觀察者模式可以通過很多方式可以實現,但是不是所有的都能直接的遷移成observable-updatable結構。接下來就的通過給遺留的"listenable"類添加一個Observable借口來升級的例子。
類MyListenable通過addListener和removeListener來管理listener(Listener接口的實現)的添加和刪除。作為演示,他繼承SomebaseClass作為額外的負擔。使用UpdateDispatcher來管理注冊在其上的Updatable,并且使用內部類Bridge連接內部的UpdateDispatcher與Listener,這樣在使它成為observable之時還能使用之前的API:
public final class MyListenable extends SomeBaseClass implements Observable {
private final UpdateDispatcher updateDispatcher;
public MyListenable() {
// Original constructor code here...
updateDispatcher = Observables.updateDispatcher(new Bridge());//
}
// Original class body here... including:
public void addListener(Listener listener) { … }
public void removeListener(Listener listener) { … }
@Override
public void addUpdatable(Updatable updatable) {
updateDispatcher.addUpdatable(updatable);
}
@Override
public void removeUpdatable(Updatable updatable) {
updateDispatcher.removeUpdatable(updatable);
}
private final class Bridge implements ActivationHandler, Listener {
@Override
public void observableActivated(UpdateDispatcher caller) {
addListener(this);
}
@Override
public void observableDeactivated(UpdateDispatcher caller) {
removeListener(this);
}
@Override
public void onEvent() { // Listener implementation
updateDispatcher.update();//這樣就將listener和UpdateDispatch聯系起來了
}
}
}
這句代碼 Observables.updateDispatcher(new Bridge())會生成一個AsyncUpdateDispatcher,傳入的Bridge作為其內部的activationHandler,具體的建議去看Observables#AsyncUpdateDispatcher。
Exposing synchronous operations as repositories
java本質上是一種同步性的語言,最低級的操作符都是同步方法實現的。當這個操作會花費一些時間來輸出結果,我們稱這個方法為阻塞的方法,并且開發者被警告不能在主線程調用它。
假設app的UI需要從一個阻塞方法獲得數據,Agera提供了compiled
repository很方便讓這個阻塞方法在后臺的執行器處理,同時由于線程的處理,UI可以在觀察repository的線程上消費數據。首先,方法被封裝成Agera的操作符,如下
public class NetworkCallingSupplier implements Supplier<Result<ResponseBlob>> {
private final RequestBlob request = …;
@Override
public Result<ResponseBlob> get() {
try {
ResponseBlob blob = networkStack.execute(request); // blocking call
return Result.success(blob);
} catch (Throwable e) {
return Result.failure(e);
}
}
}
Supplier<Result<ResponseBlob>> networkCall = new NetworkCallingSupplier();
Repository<Result<ResponseBlob>> responseRepository =
Repositories.repositoryWithInitialValue(Result.<ResponseBlob>absent())
.observe() // no event source; works on activation
.onUpdatesPerLoop() // but this line is still needed to compile
.goTo(networkingExecutor)
.thenGetFrom(networkCall)
.compile();
上面的片段假設在repository被編寫之前就知道了request,并且這個request將不會改變。不能改變的request并沒有什么卵用,所以為了可以改變request就需要進行升級。因此就用一個MutableRepository來存儲提供request。為了使該repository被建成后就能提供第一個request,將request用Result封裝并用absent()初始化repository。
// MutableRepository<RequestBlob> requestVariable =
// mutableRepository(firstRequest);
// OR:
MutableRepository<Result<RequestBlob>> requestVariable =
mutableRepository(Result.<RequestBlob>absent());
然后將阻塞的方法封裝成Function:
public class NetworkCallingFunction
implements Function<RequestBlob, Result<ResponseBlob>> {
@Override
public Result<ResponseBlob> apply(RequestBlob request) {
try {
ResponseBlob blob = networkStack.execute(request);
return Result.success(blob);
} catch (Throwable e) {
return Result.failure(e);
}
}
}
Function<RequestBlob, Result<ResponseBlob>> networkCallingFunction =
new NetworkCallingFunction()
最終版的repository就如下了:
Result<ResponseBlob> noResponse = Result.absent();
Function<Throwable, Result<ResponseBlob>> withNoResponse =
Functions.staticFunction(noResponse);
Repository<Result<ResponseBlob>> responseRepository =
Repositories.repositoryWithInitialValue(noResponse)
.observe(requestVariable)
.onUpdatesPerLoop()
// .getFrom(requestVariable) if it does not supply Result, OR:
.attemptGetFrom(requestVariable).orEnd(withNoResponse)
.goTo(networkingExecutor)
.thenTransform(networkCallingFunction)
.compile();
基本上看名字也能理解吧,所以寫表達式時最好能給操作符特定的命名,這樣極大的增加可讀性。
Wrapping asynchronous calls in repositories
現在很多庫都提供了都提供了異步API和配套的線程調度功能,客戶端代碼無法控制或者取消線程的調度。使用這樣的一個庫可能會使Agerify整個app的難度增大。最顯然的方式就是用上面演示的方式找到一個替代物。一種不推薦的方式就是運行到一個后臺線程,執行異步調用,堵塞線程直到返回結果,然后再"同步"地返回結果。在這部分討論的是當上述顯然的方式不適用時一種恰當的變通方案。
一種循環的異步調用模式就是request-response結構.接下來的例子假設了一種詳細的結構,這個結構可以取消未完成的工作,但是并沒有指定調用callback的線程。首先應該很容易抽取出以下幾個類:
interface AsyncOperator<P, R> {
Cancellable request(P param, Callback<R> callback);
}
interface Callback<R> {
void onResponse(R response); // Can be called from any thread
}
interface Cancellable {
void cancel();
}
首先要執行request(),就需要有AsyncOperator接口實例;
其次request執行需要參數,由Supplier提供;為了能取消request操作那么又必須有Cancellable對象;然后會提供一個result的結果。
需要消費該result的對象就可以監聽它來實現數據的更新
public class AsyncOperatorRepository<P, R> extends BaseObservable
implements Repository<Result<R>>, Callback<R> {
private final AsyncOperator<P, R> asyncOperator;
private final Supplier<P> paramSupplier;
private Result<R> result;
private Cancellable cancellable;
public AsyncOperatorRepository(AsyncOperator<P, R> asyncOperator,
Supplier<P> paramSupplier) {
this.asyncOperator = asyncOperator;
this.paramSupplier = paramSupplier;
this.result = Result.absent();
}
@Override
protected synchronized void observableActivated() {
cancellable = asyncOperator.request(paramSupplier.get(), this);
}
@Override
protected synchronized void observableDeactivated() {
if (cancellable != null) {
cancellable.cancel();
cancellable = null;
}
}
@Override
public synchronized void onResponse(R response) {
cancellable = null;
result = Result.absentIfNull(response);
dispatchUpdate();
}
@Override
public synchronized Result<R> get() {
return result;
}
}
為了可以支持request參數的動態變化,那么提供參數的Supplier就必須同時可以被監聽,因此參數的來源就變成了Repository(實現了Supplier的Observable),AsyncOperatorRepository也必須實現updatable接口。并且只有當有對象需要消費response也就是有updatable監聽AsyncOperatorRepository的時候,AsyncOperatorRepository才會監聽paramRepository。
public class AsyncOperatorRepository<P, R> extends BaseObservable
implements Repository<Result<R>>, Callback<R>, Updatable {
private final AsyncOperator<P, R> asyncOperator;
private final Repository<P> paramRepository;
private Result<R> result;
private Cancellable cancellable;
public AsyncOperatorRepository(AsyncOperator<P, R> asyncOperator,
Repository<P> paramRepository) {
this.asyncOperator = asyncOperator;
this.paramRepository = paramRepository;
this.result = Result.absent();
}
@Override
protected void observableActivated() {
paramRepository.addUpdatable(this);
update();
}
@Override
protected synchronized void observableDeactivated() {
paramRepository.removeUpdatable(this);
cancelOngoingRequestLocked();
}
@Override
public synchronized void update() {
cancelOngoingRequestLocked();
// Adapt accordingly if paramRepository supplies a Result.
cancellable = asyncOperator.request(paramRepository.get(), this);
}
private void cancelOngoingRequestLocked() {
if (cancellable != null) {
cancellable.cancel();
cancellable = null;
}
}
@Override
public synchronized void onResponse(R response) {
cancellable = null;
result = Result.absentIfNull(response);
dispatchUpdate();
}
// Similar process for fallible requests (typically with an
// onError(Throwable) callback): wrap the failure in a Result and
// dispatchUpdate().
@Override
public synchronized Result<R> get() {
return result;
}
}
上面所描述的repository也就完美的封裝了異步調用。
小結
謝謝您的閱讀,由于理解有限,有誤的地方還請多多指出。有時間的同學還是看看官方文檔吧!
本來還想結合demo介紹用法的,后面發現自己太天真了,這篇介紹已經夠長了,用法什么的還是下一篇再寫吧。。。。
文/MaybeStupid(簡書作者)