RxJava中操作符到底做了什么?

zsz083 8年前發布 | 7K 次閱讀 RxJava 安卓開發

RxJava今年徹底火了一把,其中最牛逼之處就是操作符了,以前只知道怎么用,這幾天看了看源碼,大致的弄清楚了操作符的工作過程,今天分享給大家。

今天我們已filter為例,看代碼:

Integer[] datas={1,2,3,4,5,6,7,8,9,10}; 
Observable.from(datas) 
        .filter(new Func1<Integer, Boolean>() { 
            @Override 
            public Boolean call(Integer integer) { 
                return integer>=5; 
            } 
        }) 
        .subscribe(new Action1<Integer>() { 
            @Override 
            public void call(Integer integer) { 
                mText.append(integer.toString()+","); 
            } 
        }); 

一個很簡單的小例子,用過濾操作符 filter 找出大于等于5的數字。我們點進去看看源碼中filter做了什么

public final Observable<T> filter(Func1<? super T, Boolean> predicate) {  
return create(new OnSubscribeFilter<T>(this, predicate));  
} 

調用了create()方法,等等我們什么時候是不是也用過create() 方法,我們在創建Observable時候也用過create()方法,原來創建了一個新的Observable返回出去了,那豈不是說我們的訂閱者其實訂閱的是這個新的Observable,我們繼續往下看create方法,create方法需要的參數是一個OnSubscribe對象,那我們可以確定OnSubscribeFilter是OnSubscribe的一個實現類,我們點進去看看。

public final class OnSubscribeFilter<T> implements OnSubscribe<T> { 
    
       final Observable<T> source; 
    
       final Func1<? super T, Boolean> predicate; 
    
       public OnSubscribeFilter(Observable<T> source, Func1<? super T, Boolean> predicate) { 
           this.source = source; 
           this.predicate = predicate; 
       } 

果然不出我們所料,OnSubscribeFilter是OnSubscribe的實現類,我們看他的構造方法,傳遞了兩個參數,第一個參數Observable對象,一個Func1,其中第一個參數就是我們我們自己創建的那個Observable,第二個參數使我們在外面寫的Func1,然后保存了起來。我們都知道在subscribe()訂閱的時候,OnSubscribe的call()方法。我們看看OnSubscribeFilter的call()方法都干了些什么

@Override 
        public void call(final Subscriber<? super T> child) { 
            FilterSubscriber<T> parent = new FilterSubscriber<T>(child, predicate); 
            child.add(parent); 
            source.unsafeSubscribe(parent); 
        } 

出現了一個FilterSubscriber,什么鬼玩意兒,我們看看他是什么鬼

} 
 
      @Override 
      public void onError(Throwable e) { 
          if (done) { 
              RxJavaHooks.onError(e); 
              return; 
          } 
          done = true; 
 
          actual.onError(e); 
      } 
 
 
      @Override 
      public void onCompleted() { 
          if (done) { 
              return; 
          } 
          actual.onCompleted(); 
      } 
      @Override 
      public void setProducer(Producer p) { 
          super.setProducer(p); 
          actual.setProducer(p); 
      } 
  } 

一個Subscriber的子類,我們看他的構造方法,兩個參數,一個Subscriber一個Func1,我們在創建對象時候Subscriber對象是我們真正的從外界傳過來的觀察者,Func1呢使我們創建OnSubscribeFilter時候傳遞進來的對象,也就是我們在外界定義的Func1。

回過頭來我們繼續看OnSubscribeFilter的call方法。我們看到source.unsafeSubscribe(parent),source是我們原來外界的Observable,他訂閱了FilterSubscriber對象。我們在他的onNext方法中看到他根據func1.call(t)的返回值來判斷是否讓我們外界的真正的觀察者調用onNext方法。

看到這里有沒有恍然大悟,啥?我都不知道你在說啥,額,那我們整體的屢屢。

我們外界的代碼,在subscribe()時候,Subscriber并不是訂閱了我們自己寫的Observable,Subscriber訂閱的是filter方法返回的那個新的Observable對象,所以訂閱時候會調用OnSubscribeFilter的call方法,OnSubscribeFilter才是我們訂閱的被觀察者的onSubscribe對象,在OnSubscribeFilter的call()方法中,我們讓我們包裝的FilterSubscriber訂閱我們原來的被觀察者,也就是我們在外界生成的那個Observable。我們在外界的Observable的onSubscribe對象的call方法中得到的觀察者是FilterSubscriber對象,我們調用的onNext會回調到FilterSubscriber的onNext方法中。在FilterSubscriber的onNext方法中我們根據我們傳遞的Func1來判斷是否要回調真正的Subscriber的onNext方法,在為true的時候我們才回調我們外界的觀察者的onNext方法,也就起到了過濾的作用。這就是Filter的整個的流程。

我們來測試下我們的小結論:

Observable.create(new Observable.OnSubscribe<Integer>() { 
               @Override 
               public void call(Subscriber<? super Integer> subscriber) { 
                   Log.e("call:subscriber", "" + subscriber.getClass().getCanonicalName()); 
                   subscriber.onNext(5); 
               } 
           }).filter(new Func1<Integer, Boolean>() { 
               @Override 
               public Boolean call(Integer integer) { 
                   return integer > 0; 
               } 
           }).subscribe(new Action1<Integer>() { 
               @Override 
               public void call(Integer integer) { 
                    
               } 
           }); 

 

來自:http://developer.51cto.com/art/201611/521677.htm

 

 本文由用戶 zsz083 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!