RxJava學習筆記(變換Observables)

xjc1027 8年前發布 | 9K 次閱讀 RxJava

轉換Observables

轉換Observables,顧名思義就是變換可觀測序列,來創建一個能夠更好的滿足我們需求的序列。

map家族

RxJava提供了幾個mapping函數: map() , flatMap() , concatMap() , flatMapIterable() 以及 switchMap() .所有這些函數都作用于一個可觀測序列,然后變換它發射的值,最后用一種新的形式返回它們。

Map

Rxjava的map函數接收一個指定的Func1對象,然后將它應用到每一個由Observable發射的值上。Func1是一個接口,兩個泛型,內部只有一個 call 方法,一個泛型是參數類型,一個泛型是返回值類型,在這里就可以理解這個 call 方法就是將可觀測序列元素轉換的轉換方法。so,意思就是我們可以自己定制自己需要的轉換方法。

  • 比如下面的代碼,我們讓可觀測序列元素的值乘以5發射。

    public static void main(String... args) {
          Observable.create(new Observable.OnSubscribe<Integer>() {
              @Override
              public void call(Subscriber<? super Integer> subscriber) {
                  subscriber.onNext(1);
                  subscriber.onNext(2);
                  subscriber.onNext(3);
              }
          }).map(new Func1<Integer, Integer>() {
              @Override
              public Integer call(Integer integer) {
                  return integer * 5;
              }
          }).subscribe(new Observer<Integer>() {
              @Override
              public void onCompleted() {
                  System.out.println("completed");
              }

          @Override
          public void onError(Throwable e) {
              System.out.println("something is error");
          }
    
          @Override
          public void onNext(Integer integer) {
              System.out.println("i = " + integer);
          }
      });
    

    }</code></pre> </li> </ul>

    打印結果:

    i = 5

    i = 10

    i = 15

    FlatMap

    flatMap同樣也是轉換元素的,不過和 map 不同的是,flatMap轉換的返回的是Observable對象。要對其進行理解做一個對比比較容易理解。已知:兩個javabean類,School和Student:

    public class School {
        private List<Student> mStudents = new ArrayList<>();

    public List<Student> getStudents() {
        return mStudents;
    }
    
    public void setStudents(List<Student> students) {
        mStudents = students;
    }
    

    }</code></pre>

    就School特殊一點,只寫了一個Student的集合,當然,當中還可以包含其他數據,這里只是體現其中有一個數據集合,Student類就是一個普通bean類,包含name,age兩個成員變量及其get/set方法。然后假設我們獲取到一個School變量,需要從中得到所有的Student的具體數據,用 map 我們可以這樣干:

    public static void main(String... args) {
        School school = new School();
        school.getStudents().add(new Student("張三" , 23));
        school.getStudents().add(new Student("李四" , 24));
        school.getStudents().add(new Student("王五" , 25));

    Observable.just(school)
            .map(new Func1<School, List<Student>>() {
                @Override
                public List<Student> call(School school) {
                    return school.getStudents();
                }
            })
            .subscribe(new Observer<List<Student>>() {
                @Override
                public void onCompleted() {
    
                }
    
                @Override
                public void onError(Throwable e) {
    
                }
    
                @Override
                public void onNext(List<Student> students) {
                    for (Student student : students) {
                        System.out.println("name = " + student.getName());
                    }
                }
            });
    

    }</code></pre>

    接收一個Student數據集合遍歷獲取當中的數據,但是如果我們不想在代碼中使用for循環遍歷,而是希望在 Subscriber 中直接傳入單個的 Student 對象呢,用 map 顯然是行不通的,因為 map 僅僅是一對一的轉化,而現在要求的是一對多的轉換,那么如何實現呢,我們用 flatMap :

    public static void main(String... args) {
        School school = new School();
        school.getStudents().add(new Student("張三" , 23));
        school.getStudents().add(new Student("李四" , 24));
        school.getStudents().add(new Student("王五" , 25));

    Observable.just(school)
            .flatMap(new Func1<School, Observable<Student>>() {
                @Override
                public Observable<Student> call(School school) {
                    return Observable.from(school.getStudents());
                }
            })
            .subscribe(new Observer<Student>() {
                @Override
                public void onCompleted() {
    
                }
    
                @Override
                public void onError(Throwable e) {
    
                }
    
                @Override
                public void onNext(Student student) {
                    System.out.println("name = " + student.getName());
                }
            });
    

    }</code></pre>

    打印的結果都是:

    name = 張三

    name = 李四

    name = 王五

    我們有一個數據序列,它發射一個數據序列,這些數據本身自己也可以發射Observable,flatMap就可以合并這些Observable發射的數據,然后將合并后的結果作為最終的Observable。但是它的合并允許交叉,也就意味著 flatMap() 不能夠保證在最終生成的Observable中源Observables確切的發射順序。

    ConcatMap

    RxJava的 concatMap() 函數解決了 flatMap() 的交叉問題,提供了一種能夠把發射的值連續在一起的鋪平函數,而不是合并它們。

    public static void main(String... args) {
        School school = new School();
        school.setAddress("China");
        school.getStudents().add(new Student("張三" , 23));
        school.getStudents().add(new Student("李四" , 24));
        school.getStudents().add(new Student("王五" , 25));

    Observable<School> observable = Observable.just(school);
    observable.subscribe(new Action1<School>() {
        @Override
        public void call(School school) {
            System.out.println("school address = " + school.getAddress());
        }
    });
    observable.concatMap(new Func1<School, Observable<Student>>() {
        @Override
        public Observable<Student> call(School school) {
            return Observable.from(school.getStudents());
        }
    }).subscribe(new Observer<Student>() {
        @Override
        public void onCompleted() {
    
        }
    
        @Override
        public void onError(Throwable e) {
    
        }
    
        @Override
        public void onNext(Student student) {
            System.out.println("student name = " + student.getName());
        }
    });
    

    }</code></pre>

    這里給School類添加一個屬性地址,打印結果:

    school address = China

    student name = 張三

    student name = 李四

    student name = 王五

    FlatMapIterable

    flatMapIterable()可以將數據包裝成Iterable,在Iterable中我們可以隨意對數據進行加工。

    public static void main(String... args) {
        Observable.just(1,2,3,4)
                .flatMapIterable(new Func1<Integer, Iterable<Integer>>() {
                    @Override
                    public Iterable<Integer> call(Integer integer) {
                        List<Integer> list = new ArrayList();
                        list.add(integer);
                        list.add(5);
                        return list;
                    }
                }).subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                System.out.println("i = " + integer);
            }
        });
    }

    這里在每個數據創建的Iterable中都加入了一個元素"5",打印結果如下:

    i = 1

    i = 5

    i = 2

    i = 5

    i = 3

    i = 5

    i = 4

    i = 5

    SwitchMap

    每當源Observable發射一個新的數據項( Observable) 時,它將取消訂閱并停止監視之前那個數據項產生的Observable,并開始監視當前發射的這一個。

    Scan

    RxJava的 scan() 函數可以看做是一個累積函數。 scan() 函數對原始Observable發射的每一項數據都應用一個函數,計算出函數的結果值,并將該值填充回可觀測序列,等待和下一次發射的數據一起使用。

    public static void main(String... args) {
        Observable.just(1,2,3,4,5)
                .scan(new Func2<Integer, Integer, Integer>() {
                    @Override
                    public Integer call(Integer integer, Integer integer2) {
                        return integer + integer2;
                    }
                })
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onCompleted() {
                        System.out.println("rxjava onCompleted");
                    }

                @Override
                public void onError(Throwable e) {
                    System.out.println("rxjava onError");
                }
    
                @Override
                public void onNext(Integer integer) {
                    System.out.println("result = " + integer);
                }
            });
    

    }</code></pre>

    這里對每一個數據元素都進行了相加的操作,看結果更明了:

    result = 1

    result = 3

    result = 6

    result = 10

    result = 15

    rxjava onCompleted

    還有一個 scan() 函數的變體 scan(R initialValue, Func2<R, ? super T, R> accumulator) ,它可以設置一個初始值作為第一個發射的元素值,

    public static void main(String... args) {
        Observable.just(1,2,3,4,5)
                .scan(10, new Func2<Integer, Integer, Integer>() {
                    @Override
                    public Integer call(Integer integer, Integer integer2) {
                        return integer + integer2;
                    }
                })
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onCompleted() {
                        System.out.println("rxjava onCompleted");
                    }

                @Override
                public void onError(Throwable e) {
                    System.out.println("rxjava onError");
                }
    
                @Override
                public void onNext(Integer integer) {
                    System.out.println("result = " + integer);
                }
            });
    

    }</code></pre>

    這里我們為 scan() 設置了一個初始值10,那么發射的第一個元素就是這個10,之后又與其相加,打印結果如下:

    result = 10

    result = 11

    result = 13

    result = 16

    result = 20

    result = 25

    rxjava onCompleted

    GroupBy

    groupBy()將源Observable變換成一個發射Observables的新的Observable。它們中的每一個新的Observable都發射一組指定的數據。實際使用中,我們需要提供一個生成key的規則(也就是Func1中的call方法),所有key相同的數據會包含在同一個小的Observable中。

    public static void main(String... args) {
        Student lisi = new Student("李四", 24);
        Student wangwu = new Student("王五", 25);
        Student zhangsan = new Student("張三", 23);
        Student jianjian = new Student("jianjian", 24);
        Student wenwen = new Student("wenwen", 25);

    Observable<GroupedObservable<Integer, Student>> GroupedObservable = Observable.just(lisi, wangwu, zhangsan, jianjian, wenwen)
            .groupBy(new Func1<Student, Integer>() {
                @Override
                public Integer call(Student student) {
                    return student.getAge();
                }
            });
    Observable.concat(GroupedObservable)
            .subscribe(new Action1<Student>() {
                @Override
                public void call(Student student) {
                    System.out.println("student = " + student.getName() + ", age = " + student.getAge());
                }
            });
    

    }</code></pre>

    這里我們首先創建了一個新的Observable: GroupedObservable ,它將會發送一個帶有GroupedObservable的序列(也就是指發送的數據項的類型為GroupedObservable)。GroupedObservable是一個特殊的Observable,它基于一個分組的key,在這個例子中,key就是 student 的年齡age,代表的意思就是年齡相同的數據會包含在一起。打印結果:

    student = 李四, age = 24

    student = jianjian, age = 24

    student = 王五, age = 25

    student = wenwen, age = 25

    student = 張三, age = 23

    Buffer

    RxJava中的 buffer() 函數將源Observable變換一個新的Observable,這個新的Observable每次發射一組列表值而不是一個一個發射。

    public static void main(String... args) {
        Observable.just(1,2,3,4,5,6)
                .buffer(2)
                .subscribe(new Action1<List<Integer>>() {
                    @Override
                    public void call(List<Integer> integers) {
                        for (Integer i : integers) {
                            System.out.println("i = " + i);
                        }
                        System.out.println("-------------------");
                    }
                });
    }

    這里的 buffer(2) 指定了緩沖容量的大小為2,打印結果:

    i = 1

    i = 2

    -------------------

    i = 3

    i = 4

    -------------------

    i = 5

    i = 6

    -------------------

    實際上, buffer() 函數還有幾種重載方法,其中一個允許你指定一個skip值,此后每 skip 項數據,然后又用count項數據填充緩沖區。

    public static void main(String... args) {
        Observable.just(1,2,3,4,5,6)
                .buffer(2, 3)
                .subscribe(new Action1<List<Integer>>() {
                    @Override
                    public void call(List<Integer> integers) {
                        for (Integer i : integers) {
                            System.out.println("i = " + i);
                        }
                        System.out.println("-------------------");
                    }
                });
    }

    這里buffer(2, 3)設置了每3個元素中用兩個元素填充緩沖區,打印結果:

    i = 1

    i = 2

    -------------------

    i = 4

    i = 5

    -------------------

    buffer() 還可以帶一個 timespan 的參數,會創建一個每隔timespan時間段就會發射一個列表的Observable。

    Window

    Rxjava的 window() 和 buffer() 很像,但是它發射的是Observable而不是數據列表,發射的這些Observables都是源Observable數據的一個子集,數量由參數count來定,最后發送一個 onCompleted() 結束。

    public static void main(String... args) {
        Observable.just(1,2,3,4,5)
                .window(3)
                .subscribe(new Observer<Observable<Integer>>() {
                    @Override
                    public void onCompleted() {
                        System.out.println("window onCompleted");
                    }

                @Override
                public void onError(Throwable e) {
    
                }
    
                @Override
                public void onNext(Observable<Integer> integerObservable) {
                    integerObservable.subscribe(new Observer<Integer>() {
                        @Override
                        public void onCompleted() {
                            System.out.println("onCompleted");
                        }
    
                        @Override
                        public void onError(Throwable e) {
    
                        }
    
                        @Override
                        public void onNext(Integer integer) {
                            System.out.println("i = " + integer);
                        }
                    });
                }
            });
    

    }</code></pre>

    這里的 window(3) 設置了子集的大小為3,打印結果:

    i = 1

    i = 2

    i = 3

    onCompleted

    i = 4

    i = 5

    onCompleted

    window onCompleted

    同樣的, window() 也有帶skip參數的重載方法。

    Cast

    Rxjava的 cast() 可以將源Observable中的每一項數據都轉換為新的類型,把它變成了不同的Class。第一次看到這個,我還以為可以隨便轉,就用一個Integer轉成String試試咯,好吧原諒我的天真,java.lang.ClassCastException立馬打我臉,接著我創建了一個父類一個子類,里面很簡單。

    父類:

    public class Father {
        public void eat(){
            System.out.println("father was eat");
        }
    }

    子類:

    public class Son extends Father {
        public void eat(){
            System.out.println("son was eat");
        }
    }

    接著:

    public static void main(String... args) {
        Observable.just(new Son())
                .cast(Father.class)
                .subscribe(new Action1<Father>() {
                    @Override
                    public void call(Father f) {
                        f.eat();
                    }
                });
    }

    注意這里只能向上轉型,也就是說子類轉成父類,構成這樣: Father f = new Son() ,要記住一點,"編譯看左邊,運行看右邊",打印結果:

     

     

    來自:http://www.jianshu.com/p/eccc241928cf

     

 本文由用戶 xjc1027 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!