RxAndroid 2.0 學習筆記

pmdykoif 7年前發布 | 39K 次閱讀 RxAndroid RxJava Android開發 移動開發

Rxjava 2.x正式版出來已經快兩個月了。在之前的項目中也在使用Rx。但卻一直沒有時間對整個的知識進行梳理,恰好今天抽出時間,也系統的再學習一遍RxJava/RxAndroid

RxJava的使用

一、觀察者/被觀察者

1、前奏:

在觀察者之前就要前提下 backpressure 這個概念。簡單來說, backpressure 是在異步場景中,被觀察者發送事件速度遠快于觀察者的處理速度時,告訴被觀察者降低發送速度的策略。

2、在2.0中有以下幾種觀察者

  • Observable/Observer
  • Flowable/Subscriber
  • Single/SingleObserver
  • Completable/CompletableObserver
  • Maybe/MaybeObserver

依次的來看一下:

Observable

Observable
.just(1, 2, 3)
.subscribe(new Observer < Integer > () {
@Override public void onSubscribe(Disposable d) {}
@Override public void onNext(Integer value) {}
@Override public void onError(Throwable e) {}
@Override public void onComplete() {}
});

這里要提的就是onSubscribe(Disposable d),disposable用于取消訂閱。

就用簡單的just這個操作符來分析一下。

@SuppressWarnings("unchecked")
@SchedulerSupport(SchedulerSupport.NONE) 
public static < T > Observable < T > just(T item1, T item2, T item3, T item4) {
    ObjectHelper.requireNonNull(item1, "The first item is null");
    ObjectHelper.requireNonNull(item2, "The second item is null");
    ObjectHelper.requireNonNull(item3, "The third item is null");
    ObjectHelper.requireNonNull(item4, "The fourth item is null");

    return fromArray(item1, item2, item3, item4);
}
@SchedulerSupport(SchedulerSupport.NONE) 
public static < T > Observable < T > fromArray(T...items) {
    ObjectHelper.requireNonNull(items, "items is null");
    if (items.length == 0) {
        return empty();
    } else if (items.length == 1) {
        return just(items[0]);
    }
    return RxJavaPlugins.onAssembly(new ObservableFromArray < T > (items));
}
@Override 
public void subscribeActual(Observer < ?super T > s) {
    FromArrayDisposable < T > d = new FromArrayDisposable < T > (s, array);
    s.onSubscribe(d);
    if (d.fusionMode) {
        return;
    }
    d.run();
}

@Override 
public void dispose() {
    disposed = true;
}

@Override 
public boolean isDisposed() {
    return disposed;
}

void run() {
    T[] a = array;
    int n = a.length;
    for (int i = 0; i < n && !isDisposed(); i++) {
        T value = a[i];
        if (value == null) {
            actual.onError(new NullPointerException("The " + i + "th element is null"));
            return;
        }
        actual.onNext(value);
    }
    if (!isDisposed()) {
        actual.onComplete();
    }
}

just實際調用了 fromArray 方法,中創建了 ObservableFromArray 的實例,在這個實例中實現了 Observable 這個接口,在調用 subscribe 方法進行綁定之后,首先調用了 subscribeActual 方法, onSubscribe 就會回調。

在取消綁定是我們可以將Disposable添加到CompositeDisposable中或者直接調用Disposable的dispose() 方法在流的任意位置取消。

此外, 為了簡化代碼,我使用了Consumer作為觀察者(可以當成1.0時候的Action1 、ActionX) subscribe 的返回值就是一個Disposable ( subscribe 的返回值根據傳入的參數不同,也有不同)我把這個對象添加到CompositeDisposable,并在中途取消,但發射器仍然會把所有的數據全都發射完。因為LambdaSubscriber(也就是傳入Consumer 所構造的觀察者)的 dispose 和 isDispose 略有不同,并不是簡簡單單的true/false, 說實話,我沒看懂Consumer的這兩個方法干了什么...........尷尬

LambdaSubscriber 瞅瞅

@Override
public void dispose() { 
cancel();
}

@Override
public boolean isDisposed() {  
  return get() == SubscriptionHelper.CANCELLED;
}

Flowable

是2.0之后用的最多的觀察者了,他與上一個的區別在于支持背壓,也就是說,下游會知道上游有多少數據,所以他Subscriber會是這樣

Flowable
.just(1, 2, 3, 4)
.subscribe(new Subscriber < Integer > () {
@Override public void onSubscribe(Subscription s) {
  s.request(Long.MAX_VALUE);
}
@Override public void onNext(Integer integer) {}
@Override public void onError(Throwable t) {}
@Override public void onComplete() {}
});

onSubscribe 這個回調傳出了一個Subscription, 我們要指定他傳出數據的大小, 調用他的 request() 方法。如沒有要求可以傳入一個Long的最大數值 Long.MAX_VALUE 。

要說明一下,request這個方法若不調用,下游的onNext與OnComplete都不會調用;若你寫的數量小于,只會傳你的個數,但是不會調用onComplete方法,可以看下 FlowableFromArray 的 slowPath 方法

@Override void slowPath(long r) {
    long e = 0;
    T[] arr = array;
    int f = arr.length;
    int i = index;
    Subscriber < ?super T > a = actual;
    for (;;) {
        while (e != r && i != f) {
            if (cancelled) {
                return;
            }
            T t = arr[i];
            if (t == null) {
                a.onError(new NullPointerException("array element is null"));
                return;
            } else {
                a.onNext(t);
            }
            e++;
            i++;
        }
        if (i == f) {
            if (!cancelled) {
                a.onComplete();
            }
            return;
        }
        r = get();
        if (e == r) {
            index = i;
            r = addAndGet( - e);
            if (r == 0L) {
                return;
            }
            e = 0L;
        }
    }
}
}

需要if (i == f) f 是這個數據的大小,i是當前發送數據的個數,所以不會調用onComplete

休息一下

這是幾種被觀察者實現的接口

  • Observable 接口 ObservableSource
  • Flowable 接口 Publisher
  • Single 接口 SingleSource
  • Completable 接口 CompletableSource
  • Maybe 接口 MaybeSource

梳理完了兩個被觀察者,發現Flowable支持背壓,父類是Publisher;Observable不支持背壓,父類是ObservableSource,他們的實現方式,與其的操作符,到最后的觀察者,都有些不同,他們是完全獨立開的。各自之間也互不影響。

Single

單值相應的模式: 就是只有一個值唄?

Completable

表示沒有任何值但僅指示完成或異常的延遲計算。

Maybe

maybe 可以當成上面兩個的合體吧!

后面的三種,就不細掰了,我就是這么不求甚解。

二、操作符

操作符基本就沒有改變,但還是會發現,我擦,from沒了,可以使用fromIterable

之前的actionx 也替換了Action \ Consumer \ BiConsumer

Func也跟action一樣, 名字改變了Function

感覺這樣是正經Rx了。

三、線程切換

當然現場切換沒有發生改變,用法還是一樣,但是之前沒有看過源碼,不知道怎樣神奇的把線程切換了,難道是來自東方的神秘力量。趁著還有時間,擼一下代碼。

在調用 subscribeOn(Schedulers.io()) 之后,會創建ObservableSubscribeOn

parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
@Override
public void run() {
        source.subscribe(parent);
    }
}
));

在這個過程中,會把source也就是ObservableSource在線程中訂閱,同時也把把傳入的Observer變成SubscribeOnObserver。若指定的是io線程,可以在 IoScheduler 中看見對線程的管理

在調用 observeOn(AndroidSchedulers.mainThread()) 時,會產生一個ObservableObserveOn,同時還會把Observer變成ObserveOnObserver,可以發現在 HandlerScheduler ,在ui線程調用了ObserveOnObserver的 run 方法

四、Rxjava的數據傳遞

Rxjava是我在工作這幾個月最喜歡的框架,沒有之一。完全解決了我這個有潔癖的人在打代碼時的玻璃心。

雖然重復造輪輪子是沒有必要的(我也造不出來),但是為了全面的了解Rxjava,我也準備簡單的實現一下,數據在每個操作符之中傳輸的整個過程。

在實現之前先猜想一下大概的過程吧:

我的需求是在一個static方法中產生一個數值,并且通過一層層的接口傳遞下去,下面的操作符的人參是上一個的返回值,最后輸出,我就模仿著Rx的 Maybe 的名字實現吧。

  • 首先我要一直‘點’下去的話Maybe 一定要返回自己
  • 接口要一層層的傳進去,這樣的話就可以在發射數據時,發原始數據傳入這個一堆的接口,然后每個接口計算自己的實現。
  • 最后返回結果

之后就是仿造源碼完成這段需求了,當然這些方法也都簡單寫了,就是為了弄清楚思路:

1、創建一個MaybeSource,我們的Maybe 和 各個操作符都會實現它。

public interface MaybeSource {
     void subscribe(MaybeObserver observer);
}

2、創建一個MaybeObserver, 這就是最后綁定的時候的接口

public interface MaybeObserver {
    void onSuccess(int value);
}

3、創建Function, 這個在操作符中用于實現

public interface Function {
    int apply(int t);
}

4、當然少不了Maybe, 這里就實現just和map兩個方法吧

public abstract class Maybe implements MaybeSource {
    public static Maybe just(int item) {
        return new MaybeJust(item);
    }

    public final Maybe map(Function mapper) {
        return new MaybeMap(this, mapper);
    }
}

5、just實際返回的對象是MaybeJust,他的父類是Maybe

public class MaybeJust extends Maybe {
    final int value;

    public MaybeJust(int value) {
        this.value = value;
    }

    @Override
    public void subscribe(MaybeObserver observer) {
        observer.onSuccess(value);
    }
}

6、map實際返回的對象是MaybeMap,他的父類是Maybe

public class MaybeMap extends Maybe {
    final Function mapper;
    final MaybeSource source;

    public MaybeMap(MaybeSource source, Function mapper) {
        this.source = source;
        this.mapper = mapper;
    }

    @Override
    public void subscribe(MaybeObserver observer) {
        source.subscribe(new MapMaybeObserver(observer, mapper));
    }

    static final class MapMaybeObserver implements MaybeObserver {
        final MaybeObserver actual;

        final Function mapper;

        MapMaybeObserver(MaybeObserver actual, Function mapper) {
            this.actual = actual;
            this.mapper = mapper;
        }

        @Override
        public void onSuccess(int value) {
            this.actual.onSuccess(this.mapper.apply(value));
        }
    }
}

7、在main中可以這么運行

Maybe
.just(1)
.map(new Function() {

    @Override
    public int apply(int t) {
        return t + 1;
    }
}).map(new Function() {

    @Override
    public int apply(int t) {
        return t * 4;
    }
}).subscribe(new MaybeObserver() {

    @Override
    public void onSuccess(int value) {
        System.out.println(value);
    }
});

8、運行結果,傳入1,先+1, 在 * 4,最后結果應該是8

得到了期望的結果

RxJava 2.0 + Retrofit 2 .0

之前做過一個項目,沒用什么架構,也沒什么封裝。但對我幫助最大的是,之前是不能接受這樣的代碼的,感覺看上去腦袋都大了。但看習慣了, 也就習慣了。

但平時自己弄個小項目還是使用mvp,自己的潔癖可能更加強烈一點

在Retrofit 中選擇了Flowable作為返回值,支持背壓,在2.0之后應該最為常用

@GET("/")  
Flowable<ResponseBody> getText();

在RxJava 2.0 中使用CompositeDisposable做解除綁定的操作, Consumer 回調中使用了三個Consumer,作為成功、失敗、完成的回調

public <T> void addSubscription(Flowable flowable,
        final RxSubscriber<T> subscriber) {
        if (mCompositeDisposable == null) {
            mCompositeDisposable = new CompositeDisposable();
        }

        if (subscriber == null) {
            Log.e(TAG, "rx callback is null");

            return;
        }

        Disposable disposable = flowable.subscribeOn(Schedulers.io())
                                        .observeOn(AndroidSchedulers.mainThread())
                                        .subscribe(new Consumer<T>() {
                    @Override
                    public void accept(T o) throws Exception {
                        subscriber.onNext(o);
                    }
                },
                new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable throwable)
                        throws Exception {
                        subscriber.onError(throwable);
                    }
                },
                new Action() {
                    @Override
                    public void run() throws Exception {
                        subscriber.onComplete();
                    }
                });

此外,之前的項目后臺接口也是奇葩,同一個人寫的接口,接口的返回格式更是多種多樣,還不改,沒辦法,客戶端只能將就著服務端,誰叫我們是新來的呢。遇到這種問題,就不直接轉成對象格式了,先轉成ResponseBody得到Body,再拿出string來。

okhttp中response的body對象就是這個ResponseBody,他的string() 方法就可以獲得整個body,然后再做json解析吧

 

 

來自:http://www.jianshu.com/p/a608e71fc5e1

 

 本文由用戶 pmdykoif 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!