RxJava學習筆記(變換Observables)
轉換Observables
轉換Observables,顧名思義就是變換可觀測序列,來創建一個能夠更好的滿足我們需求的序列。
map家族
RxJava提供了幾個mapping函數: map() , flatMap() , concatMap() , flatMapIterable() 以及 switchMap() .所有這些函數都作用于一個可觀測序列,然后變換它發射的值,最后用一種新的形式返回它們。
Map
Rxjava的map函數接收一個指定的Func1對象,然后將它應用到每一個由Observable發射的值上。Func1是一個接口,兩個泛型,內部只有一個 call 方法,一個泛型是參數類型,一個泛型是返回值類型,在這里就可以理解這個 call 方法就是將可觀測序列元素轉換的轉換方法。so,意思就是我們可以自己定制自己需要的轉換方法。
-
比如下面的代碼,我們讓可觀測序列元素的值乘以5發射。
public static void main(String... args) { Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { subscriber.onNext(1); subscriber.onNext(2); subscriber.onNext(3); } }).map(new Func1<Integer, Integer>() { @Override public Integer call(Integer integer) { return integer * 5; } }).subscribe(new Observer<Integer>() { @Override public void onCompleted() { System.out.println("completed"); }
@Override public void onError(Throwable e) { System.out.println("something is error"); } @Override public void onNext(Integer integer) { System.out.println("i = " + integer); } });
}</code></pre> </li> </ul>
打印結果:
i = 5
i = 10
i = 15
FlatMap
flatMap同樣也是轉換元素的,不過和 map 不同的是,flatMap轉換的返回的是Observable對象。要對其進行理解做一個對比比較容易理解。已知:兩個javabean類,School和Student:
public class School { private List<Student> mStudents = new ArrayList<>();
public List<Student> getStudents() { return mStudents; } public void setStudents(List<Student> students) { mStudents = students; }
}</code></pre>
就School特殊一點,只寫了一個Student的集合,當然,當中還可以包含其他數據,這里只是體現其中有一個數據集合,Student類就是一個普通bean類,包含name,age兩個成員變量及其get/set方法。然后假設我們獲取到一個School變量,需要從中得到所有的Student的具體數據,用 map 我們可以這樣干:
public static void main(String... args) { School school = new School(); school.getStudents().add(new Student("張三" , 23)); school.getStudents().add(new Student("李四" , 24)); school.getStudents().add(new Student("王五" , 25));
Observable.just(school) .map(new Func1<School, List<Student>>() { @Override public List<Student> call(School school) { return school.getStudents(); } }) .subscribe(new Observer<List<Student>>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(List<Student> students) { for (Student student : students) { System.out.println("name = " + student.getName()); } } });
}</code></pre>
接收一個Student數據集合遍歷獲取當中的數據,但是如果我們不想在代碼中使用for循環遍歷,而是希望在 Subscriber 中直接傳入單個的 Student 對象呢,用 map 顯然是行不通的,因為 map 僅僅是一對一的轉化,而現在要求的是一對多的轉換,那么如何實現呢,我們用 flatMap :
public static void main(String... args) { School school = new School(); school.getStudents().add(new Student("張三" , 23)); school.getStudents().add(new Student("李四" , 24)); school.getStudents().add(new Student("王五" , 25));
Observable.just(school) .flatMap(new Func1<School, Observable<Student>>() { @Override public Observable<Student> call(School school) { return Observable.from(school.getStudents()); } }) .subscribe(new Observer<Student>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(Student student) { System.out.println("name = " + student.getName()); } });
}</code></pre>
打印的結果都是:
name = 張三
name = 李四
name = 王五
我們有一個數據序列,它發射一個數據序列,這些數據本身自己也可以發射Observable,flatMap就可以合并這些Observable發射的數據,然后將合并后的結果作為最終的Observable。但是它的合并允許交叉,也就意味著 flatMap() 不能夠保證在最終生成的Observable中源Observables確切的發射順序。
ConcatMap
RxJava的 concatMap() 函數解決了 flatMap() 的交叉問題,提供了一種能夠把發射的值連續在一起的鋪平函數,而不是合并它們。
public static void main(String... args) { School school = new School(); school.setAddress("China"); school.getStudents().add(new Student("張三" , 23)); school.getStudents().add(new Student("李四" , 24)); school.getStudents().add(new Student("王五" , 25));
Observable<School> observable = Observable.just(school); observable.subscribe(new Action1<School>() { @Override public void call(School school) { System.out.println("school address = " + school.getAddress()); } }); observable.concatMap(new Func1<School, Observable<Student>>() { @Override public Observable<Student> call(School school) { return Observable.from(school.getStudents()); } }).subscribe(new Observer<Student>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(Student student) { System.out.println("student name = " + student.getName()); } });
}</code></pre>
這里給School類添加一個屬性地址,打印結果:
school address = China
student name = 張三
student name = 李四
student name = 王五
FlatMapIterable
flatMapIterable()可以將數據包裝成Iterable,在Iterable中我們可以隨意對數據進行加工。
public static void main(String... args) { Observable.just(1,2,3,4) .flatMapIterable(new Func1<Integer, Iterable<Integer>>() { @Override public Iterable<Integer> call(Integer integer) { List<Integer> list = new ArrayList(); list.add(integer); list.add(5); return list; } }).subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { System.out.println("i = " + integer); } }); }
這里在每個數據創建的Iterable中都加入了一個元素"5",打印結果如下:
i = 1
i = 5
i = 2
i = 5
i = 3
i = 5
i = 4
i = 5
SwitchMap
每當源Observable發射一個新的數據項( Observable) 時,它將取消訂閱并停止監視之前那個數據項產生的Observable,并開始監視當前發射的這一個。
Scan
RxJava的 scan() 函數可以看做是一個累積函數。 scan() 函數對原始Observable發射的每一項數據都應用一個函數,計算出函數的結果值,并將該值填充回可觀測序列,等待和下一次發射的數據一起使用。
public static void main(String... args) { Observable.just(1,2,3,4,5) .scan(new Func2<Integer, Integer, Integer>() { @Override public Integer call(Integer integer, Integer integer2) { return integer + integer2; } }) .subscribe(new Observer<Integer>() { @Override public void onCompleted() { System.out.println("rxjava onCompleted"); }
@Override public void onError(Throwable e) { System.out.println("rxjava onError"); } @Override public void onNext(Integer integer) { System.out.println("result = " + integer); } });
}</code></pre>
這里對每一個數據元素都進行了相加的操作,看結果更明了:
result = 1
result = 3
result = 6
result = 10
result = 15
rxjava onCompleted
還有一個 scan() 函數的變體 scan(R initialValue, Func2<R, ? super T, R> accumulator) ,它可以設置一個初始值作為第一個發射的元素值,
public static void main(String... args) { Observable.just(1,2,3,4,5) .scan(10, new Func2<Integer, Integer, Integer>() { @Override public Integer call(Integer integer, Integer integer2) { return integer + integer2; } }) .subscribe(new Observer<Integer>() { @Override public void onCompleted() { System.out.println("rxjava onCompleted"); }
@Override public void onError(Throwable e) { System.out.println("rxjava onError"); } @Override public void onNext(Integer integer) { System.out.println("result = " + integer); } });
}</code></pre>
這里我們為 scan() 設置了一個初始值10,那么發射的第一個元素就是這個10,之后又與其相加,打印結果如下:
result = 10
result = 11
result = 13
result = 16
result = 20
result = 25
rxjava onCompleted
GroupBy
groupBy()將源Observable變換成一個發射Observables的新的Observable。它們中的每一個新的Observable都發射一組指定的數據。實際使用中,我們需要提供一個生成key的規則(也就是Func1中的call方法),所有key相同的數據會包含在同一個小的Observable中。
public static void main(String... args) { Student lisi = new Student("李四", 24); Student wangwu = new Student("王五", 25); Student zhangsan = new Student("張三", 23); Student jianjian = new Student("jianjian", 24); Student wenwen = new Student("wenwen", 25);
Observable<GroupedObservable<Integer, Student>> GroupedObservable = Observable.just(lisi, wangwu, zhangsan, jianjian, wenwen) .groupBy(new Func1<Student, Integer>() { @Override public Integer call(Student student) { return student.getAge(); } }); Observable.concat(GroupedObservable) .subscribe(new Action1<Student>() { @Override public void call(Student student) { System.out.println("student = " + student.getName() + ", age = " + student.getAge()); } });
}</code></pre>
這里我們首先創建了一個新的Observable: GroupedObservable ,它將會發送一個帶有GroupedObservable的序列(也就是指發送的數據項的類型為GroupedObservable)。GroupedObservable是一個特殊的Observable,它基于一個分組的key,在這個例子中,key就是 student 的年齡age,代表的意思就是年齡相同的數據會包含在一起。打印結果:
student = 李四, age = 24
student = jianjian, age = 24
student = 王五, age = 25
student = wenwen, age = 25
student = 張三, age = 23
Buffer
RxJava中的 buffer() 函數將源Observable變換一個新的Observable,這個新的Observable每次發射一組列表值而不是一個一個發射。
public static void main(String... args) { Observable.just(1,2,3,4,5,6) .buffer(2) .subscribe(new Action1<List<Integer>>() { @Override public void call(List<Integer> integers) { for (Integer i : integers) { System.out.println("i = " + i); } System.out.println("-------------------"); } }); }
這里的 buffer(2) 指定了緩沖容量的大小為2,打印結果:
i = 1
i = 2
-------------------
i = 3
i = 4
-------------------
i = 5
i = 6
-------------------
實際上, buffer() 函數還有幾種重載方法,其中一個允許你指定一個skip值,此后每 skip 項數據,然后又用count項數據填充緩沖區。
public static void main(String... args) { Observable.just(1,2,3,4,5,6) .buffer(2, 3) .subscribe(new Action1<List<Integer>>() { @Override public void call(List<Integer> integers) { for (Integer i : integers) { System.out.println("i = " + i); } System.out.println("-------------------"); } }); }
這里buffer(2, 3)設置了每3個元素中用兩個元素填充緩沖區,打印結果:
i = 1
i = 2
-------------------
i = 4
i = 5
-------------------
buffer() 還可以帶一個 timespan 的參數,會創建一個每隔timespan時間段就會發射一個列表的Observable。
Window
Rxjava的 window() 和 buffer() 很像,但是它發射的是Observable而不是數據列表,發射的這些Observables都是源Observable數據的一個子集,數量由參數count來定,最后發送一個 onCompleted() 結束。
public static void main(String... args) { Observable.just(1,2,3,4,5) .window(3) .subscribe(new Observer<Observable<Integer>>() { @Override public void onCompleted() { System.out.println("window onCompleted"); }
@Override public void onError(Throwable e) { } @Override public void onNext(Observable<Integer> integerObservable) { integerObservable.subscribe(new Observer<Integer>() { @Override public void onCompleted() { System.out.println("onCompleted"); } @Override public void onError(Throwable e) { } @Override public void onNext(Integer integer) { System.out.println("i = " + integer); } }); } });
}</code></pre>
這里的 window(3) 設置了子集的大小為3,打印結果:
i = 1
i = 2
i = 3
onCompleted
i = 4
i = 5
onCompleted
window onCompleted
同樣的, window() 也有帶skip參數的重載方法。
Cast
Rxjava的 cast() 可以將源Observable中的每一項數據都轉換為新的類型,把它變成了不同的Class。第一次看到這個,我還以為可以隨便轉,就用一個Integer轉成String試試咯,好吧原諒我的天真,java.lang.ClassCastException立馬打我臉,接著我創建了一個父類一個子類,里面很簡單。
父類:
public class Father { public void eat(){ System.out.println("father was eat"); } }
子類:
public class Son extends Father { public void eat(){ System.out.println("son was eat"); } }
接著:
public static void main(String... args) { Observable.just(new Son()) .cast(Father.class) .subscribe(new Action1<Father>() { @Override public void call(Father f) { f.eat(); } }); }
注意這里只能向上轉型,也就是說子類轉成父類,構成這樣: Father f = new Son() ,要記住一點,"編譯看左邊,運行看右邊",打印結果:
來自:http://www.jianshu.com/p/eccc241928cf