概述:通过代码介绍RxJava中的操作符,以及操作符的使用。
操作符分类
创建操作
用于创建Observable的操作符
Create — 通过调用观察者的方法从头创建一个Observable
Observable.create(new OnSubscribe<Integer>() { public void call(Subscriber<? super Integer> subscriber) { if (!subscriber.isUnsubscribed()) { for (int i = 0; i < 5; i++) { subscriber.onNext(i); } subscriber.onCompleted(); } } }).subscribe(new Subscriber<Integer>() { @Override public void onNext(Integer item) { System.out.println("Next: " + item); }
@Override public void onError(Throwable error) { System.err.println("Error: " + error.getMessage()); }
@Override public void onCompleted() { System.out.println("Sequence complete."); } });
|
输出
Next: 0 Next: 1 Next: 2 Next: 3 Next: 4 Sequence complete.
|
From — 将其它的对象或数据结构转换为Observable
Integer[] items = {0, 1, 2, 3, 4}; Observable<Integer> observable = Observable.from(items); observable.subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { System.out.println(integer); } }, new Action1<Throwable>() { @Override public void call(Throwable throwable) { System.out.println("Error encountered: " + throwable.getMessage()); } }, new Action0() { @Override public void call() { System.out.println("Sequence complete"); } });
|
输出
Just — 将对象或者对象集合转换为一个会发射这些对象的Observable
Observable.just(1, 2, 3).subscribe(new Subscriber<Integer>() { @Override public void onCompleted() { System.out.println("Sequence complete."); }
@Override public void onError(Throwable error) { System.err.println("Error: " + error.getMessage()); }
@Override public void onNext(Integer integer) { System.out.println("Next: " + integer); } });
|
输出
Next: 1 Next: 2 Next: 3 Sequence complete.
|
Defer — 在观察者订阅之前不创建这个Observable,为每一个观察者创建一个新的Observable
static String i = "旧数据"; Observable<String> observable = Observable.defer(new Func0<Observable<String>>() { @Override public Observable<String> call() { return Observable.just(i); } }); Subscriber subscriber=new Subscriber() { @Override public void onCompleted() { System.out.println("Sequence complete."); }
@Override public void onError(Throwable error) { System.err.println("Error: " + error.getMessage()); }
@Override public void onNext(Object o) { System.out.println("Next: " + o.toString()); } }; i = "新数据"; observable.subscribe(subscriber);
|
输出
Next: 新数据 Sequence complete.
|
Empty/Never/Throw — 创建行为受限的特殊Observable
Empty 创建一个不发射任何数据但是正常终止的Observable
Never 创建一个不发射数据也不终止的Observable
Throw 创建一个不发射数据以一个错误终止的Observable
Observable.empty().subscribe(new Subscriber() { @Override public void onCompleted() { System.out.println("Sequence complete."); }
@Override public void onError(Throwable e) { System.out.println("error:" + e.getMessage()); }
@Override public void onNext(Object i) { System.out.println("Next:" + i.toString()); } });
|
输出
Timer — 创建在一个指定的延迟之后发射单个数据的Observable
Observable.timer(3, TimeUnit.SECONDS).subscribe(new Subscriber<Long>() { @Override public void onCompleted() { System.out.println("Sequence complete."); }
@Override public void onError(Throwable e) { System.out.println("error:" + e.getMessage()); }
@Override public void onNext(Long aLong) { System.out.println("Next:" + aLong.toString()); } });
|
输出
I/System.out: Next:0 I/System.out: Sequence complete.
|
Interval — 创建一个定时发射整数序列的Observable
Observable.interval(3, TimeUnit.SECONDS).subscribe(new Subscriber<Long>() { @Override public void onCompleted() { System.out.println("Sequence complete."); }
@Override public void onError(Throwable e) { System.out.println("error:" + e.getMessage()); }
@Override public void onNext(Long aLong) { System.out.println("Next:" + aLong.toString()); } });
|
输出
I/System.out: Next:0 I/System.out: Next:1 I/System.out: Next:2 I/System.out: Next:3 I/System.out: Next:4 I/System.out: Next:5 I/System.out: Next:6 I/System.out: Next:7 ...
|
Range — 创建发射指定范围的整数序列的Observable
Observable.range(3, 8).subscribe(new Subscriber<Integer>() { @Override public void onCompleted() { System.out.println("Sequence complete."); }
@Override public void onError(Throwable e) { System.out.println("error:" + e.getMessage()); }
@Override public void onNext(Integer integer) { System.out.println("Next:" + integer.toString()); } });
|
输出
Next:3 Next:4 Next:5 Next:6 Next:7 Next:8 Next:9 Next:10 Sequence complete.
|
Repeat — 创建重复发射特定的数据或数据序列的Observable
Observable.range(3, 3).repeat(2).subscribe(new Subscriber<Integer>() { @Override public void onCompleted() { System.out.println("Sequence complete."); }
@Override public void onError(Throwable e) { System.out.println("error:" + e.getMessage()); }
@Override public void onNext(Integer i) { System.out.println("Next:" + i.toString()); } });
|
输出
Next:3 Next:4 Next:5 Next:3 Next:4 Next:5 Sequence complete.
|
变换操作
Map — 映射,通过对序列的每一项都应用一个函数变换Observable发射的数据,实质是对序列中的每一项执行一个函数,函数的参数就是这个数据项
Integer[] integers = {0, 9, 6, 4, 8}; Observable.from(integers).map(new Func1<Integer, Boolean>() { @Override public Boolean call(Integer integer) { System.out.println("call: "+integer); return (integer > 5); } }).subscribe(new Subscriber<Boolean>() { @Override public void onCompleted() { System.out.println("Sequence complete."); }
@Override public void onError(Throwable e) { System.out.println("error:" + e.getMessage()); }
@Override public void onNext(Boolean b) { System.out.println("Next:" + b.toString()); } });
|
输出
call: 0 Next:false call: 9 Next:true call: 6 Next:true call: 4 Next:false call: 8 Next:true Sequence complete.
|
Cast — 强制类型转换,通过对序列的每一项都强制类型变换Observable发射的数据
Integer[] integers = {0, 9, 6, 4, 8}; Observable.just(integers).cast(String.class).subscribe(new Subscriber<String>() { @Override public void onCompleted() { System.out.println("Sequence complete."); }
@Override public void onError(Throwable e) { System.out.println("error:" + e.getMessage()); }
@Override public void onNext(String s) { System.out.println("Next:" + s); }
});
|
输出
error:Cannot cast [Ljava.lang.Integer; to java.lang.String
|
FlatMap — 扁平映射,将Observable发射的数据变换为Observables集合,然后将这些Observable发射的数据平坦化的放进一个单独的Observable,可以认为是一个将嵌套的数据结构展开的过程。
Integer[] integers = {1, 2, 3}; Observable.from(integers).flatMap(new Func1<Integer, Observable<String>>() { @Override public Observable<String> call(final Integer integer) { return Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { System.out.println("call: FlatMap " + Thread.currentThread().getName()); try { Thread.sleep(200); subscriber.onNext(integer + 100 + " FlatMap"); subscriber.onCompleted(); } catch (InterruptedException e) { e.printStackTrace(); subscriber.onError(e); } } }).subscribeOn(Schedulers.newThread()); } }).observeOn(AndroidSchedulers.mainThread()) .subscribe(new Subscriber<String>() { @Override public void onCompleted() { System.out.println("Sequence complete."); }
@Override public void onError(Throwable e) { System.out.println("error:" + e.getMessage()); }
@Override public void onNext(String s) { System.out.println("Next:" + s); } });
|
输出
call: FlatMap RxNewThreadScheduler-1 call: FlatMap RxNewThreadScheduler-2 call: FlatMap RxNewThreadScheduler-3 Next:101 FlatMap Next:102 FlatMap Next:103 FlatMap Sequence complete.
|
ConcatMap - 该操作符是类似于最简单版本的flatMap,但是它按次序连接而不是合并那些生成的Observables,然后产生自己的数据序列。
Integer[] integers = {1, 2, 3}; Observable.from(integers).concatMap(new Func1<Integer, Observable<String>>() { @Override public Observable<String> call(final Integer integer) { return Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { System.out.println("call: FlatMap " + Thread.currentThread().getName()); try { Thread.sleep(200); subscriber.onNext(integer + 100 + " FlatMap"); subscriber.onCompleted(); } catch (InterruptedException e) { e.printStackTrace(); subscriber.onError(e); } } }).subscribeOn(Schedulers.newThread()); } }).observeOn(AndroidSchedulers.mainThread()) .subscribe(new Subscriber<String>() { @Override public void onCompleted() { System.out.println("Sequence complete."); }
@Override public void onError(Throwable e) { System.out.println("error:" + e.getMessage()); }
@Override public void onNext(String s) { System.out.println("Next:" + s); } });
|
输出
call: FlatMap RxNewThreadScheduler-1 call: FlatMap RxNewThreadScheduler-2 Next:101 FlatMap Next:102 FlatMap call: FlatMap RxNewThreadScheduler-3 Next:103 FlatMap Sequence complete.
|
SwitchMap - 当原始Observable发射一个新的数据(Observable)时,它将取消订阅并停止监视产生执之前那个数据的Observable,只监视当前这一个。
Integer[] integers = {1, 2, 3}; Observable.from(integers).switchMap(new Func1<Integer, Observable<String>>() { @Override public Observable<String> call(Integer integer) { System.out.println("call: SwitchMap " + Thread.currentThread().getName()); return Observable.just((integer + 100) + "SwitchMap") .subscribeOn(Schedulers.newThread()); } }).observeOn(AndroidSchedulers.mainThread()).subscribe(new Subscriber<String>() { @Override public void onCompleted() { System.out.println("Sequence complete."); }
@Override public void onError(Throwable e) { System.out.println("error:" + e.getMessage()); }
@Override public void onNext(String s) { System.out.println("Next:" + s); } });
|
输出
call: SwitchMap main call: SwitchMap main call: SwitchMap main Next:103SwitchMap Sequence complete.
|
GroupBy - 将原始Observable分拆为一些Observables集合,它们中的每一个发射原始Observable数据序列的一个子序列。哪个数据项由哪一个Observable发射是由一个函数判定的,这个函数给每一项指定一个Key,Key相同的数据会被同一个Observable发射。
Observable.range(1, 10).groupBy(new Func1<Integer, Boolean>() { @Override public Boolean call(Integer integer) { return integer % 2 == 0; } }).subscribe(new Subscriber<GroupedObservable<Boolean, Integer>>() {
@Override public void onCompleted() { System.out.println("Sequence complete.1"); }
@Override public void onError(Throwable e) { System.out.println("error:1" + e.getMessage()); }
@Override public void onNext(GroupedObservable<Boolean, Integer> booleanIntegerGroupedObservable) { booleanIntegerGroupedObservable.toList().subscribe(new Subscriber<List<Integer>>() { @Override public void onCompleted() { System.out.println("Sequence complete.2"); }
@Override public void onError(Throwable e) { System.out.println("error:2" + e.getMessage()); }
@Override public void onNext(List<Integer> integers) { System.out.println("onNext:2" + integers); } }); } });
|
输出
onNext:2[1, 3, 5, 7, 9] Sequence complete.2 onNext:2[2, 4, 6, 8, 10] Sequence complete.2 Sequence complete.1
|
Scan - 操作符对原始Observable发射的第一项数据应用一个函数,然后将那个函数的结果作为自己的第一项数据发射。它将函数的结果同第二项数据一起填充给这个函数来产生它自己的第二项数据。它持续进行这个过程来产生剩余的数据序列。
Observable.range(1, 4).scan(new Func2<Integer, Integer, Integer>() { @Override public Integer call(Integer integer, Integer integer2) { System.out.println("call: integer:"+integer+" integer2 "+integer2); return integer+integer2; } }).subscribe(new Subscriber<Integer>() { @Override public void onCompleted() { System.out.println("Sequence complete."); }
@Override public void onError(Throwable e) { System.out.println("error:" + e.getMessage()); }
@Override public void onNext(Integer integer) { System.out.println("onNext:" + integer); } });
|
输出
onNext:1 call: integer:1 integer2 2 onNext:3 call: integer:3 integer2 3 onNext:6 call: integer:6 integer2 4 onNext:10 Sequence complete.
|
Buffer - 将一个Observable变换为另一个,原来的Observable正常发射数据,变换产生的Observable发射这些数据的缓存集合,如果原来的Observable发射了一个onError通知,Buffer会立即传递这个通知,而不是首先发射缓存的数据,即使在这之前缓存中包含了原始Observable发射的数据。
Observable.range(10, 6).buffer(2).subscribe(new Subscriber<List<Integer>>() { @Override public void onCompleted() { System.out.println("Sequence complete.2"); }
@Override public void onError(Throwable e) { System.out.println("error:2" + e.getMessage()); }
@Override public void onNext(List<Integer> integers) { System.out.println("onNext:2" + integers); } });
|
输出
onNext:[10, 11] onNext:[12, 13] onNext:[14, 15] Sequence complete.
|
Window - Window和Buffer类似,但不是发射来自原始Observable的数据包,它发射的是Observables,这些Observables中的每一个都发射原始Observable数据的一个子集,最后发射一个onCompleted通知。
Observable.range(10, 6).window(2).subscribe(new Subscriber<Observable<Integer>>() { @Override public void onCompleted() { System.out.println("Sequence complete.1 "); }
@Override public void onError(Throwable e) { System.out.println("error:1 " + e.getMessage()); }
@Override public void onNext(Observable<Integer> integerObservable) { System.out.println("onNext:1 "); integerObservable.subscribe(new Subscriber<Integer>() { @Override public void onCompleted() { System.out.println("Sequence complete.2 "); }
@Override public void onError(Throwable e) { System.out.println("error:2 " + e.getMessage()); }
@Override public void onNext(Integer integer) { System.out.println("onNext:2 " + integer); } }); } });
|
输出
onNext:1 onNext:2 10 onNext:2 11 Sequence complete.2 onNext:1 onNext:2 12 onNext:2 13 Sequence complete.2 onNext:1 onNext:2 14 onNext:2 15 Sequence complete.2 Sequence complete.1
|
过滤操作
Filter - 接收一个Func1参数,我们可以在其中通过运用你自己的判断条件去判断我们要过滤的数据,当数据通过判断条件后返回true表示发射该项数据,否则就不发射,这样就过滤出了我们想要的数据。
Integer[] ints = {1, 2, 3, 4, 5, 6, 7, 8, 9}; Observable<Integer> observable = Observable.from(ints).filter(new Func1<Integer, Boolean>() { @Override public Boolean call(Integer integer) { return integer % 2 != 0; } }); observable.subscribe(new Subscriber<Integer>() { @Override public void onCompleted() { System.out.println("Sequence complete."); }
@Override public void onError(Throwable e) { System.out.println("error:" + e.getMessage()); }
@Override public void onNext(Integer integer) { System.out.println("onNext:" + integer); } });
|
输出
onNext:1 onNext:3 onNext:5 onNext:7 onNext:9 Sequence complete.
|
ofType - 过滤一个Observable只返回指定类型的数据
Observable.just(0x68, "Jinlin", 6, 33, "5upport", 8, "asdfasf", 1, "four", 0) .ofType(String.class) .subscribe(new Subscriber<String>() { @Override public void onCompleted() { System.out.println("Sequence complete."); }
@Override public void onError(Throwable e) { System.out.println("error:" + e.getMessage()); }
@Override public void onNext(String s) { System.out.println("Next:" + s); } });
|
输出
Next:Jinlin Next:5upport Next:asdfasf Next:four Sequence complete.
|
First - 如果我们只对Observable发射的第一项数据,或者满足某个条件的第一项数据感兴趣,则可以使用First操作符。
Observable.just(0, 1, 2, 3).first(new Func1<Integer, Boolean>() { @Override public Boolean call(Integer integer) { return integer > 2; } }).subscribe(new Subscriber<Integer>() { @Override public void onCompleted() { System.out.println("Sequence complete."); }
@Override public void onError(Throwable e) { System.out.println("error:" + e.getMessage()); }
@Override public void onNext(Integer integer) { System.out.println("onNext:" + integer); } });
变形 Observable.just(11,12,13).firstOrDefault(10).subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { System.out.println("call:" + integer); } });
Observable.just(10,13, 16).firstOrDefault(15, new Func1<Integer, Boolean>() { @Override public Boolean call(Integer integer) { return integer > 20; } }).subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { System.out.println("call:" + integer); } });
|
输出
onNext:3 Sequence complete.
|
takeFirst - 该操作符与first操作符的区别就是如果原始Observable没有发射任何满足条件的数据,first会抛出一个NoSuchElementException直接执行onError(),而takeFist会返回一个空的Observable(不调用onNext()但是会调用onCompleted)
#### first Observable.just(10, 11).filter(new Func1<Integer, Boolean>() { @Override public Boolean call(Integer integer) { return integer > 20; } }).first().subscribe(new Subscriber<Integer>() { @Override public void onCompleted() { System.out.println("Sequence complete."); }
@Override public void onError(Throwable e) { System.out.println("error:" + e.getMessage()); }
@Override public void onNext(Integer integer) { System.out.println("onNext:" + integer); } });
#### takeFirst Observable.just(10, 11).takeFirst(new Func1<Integer, Boolean>() { @Override public Boolean call(Integer integer) { return integer > 20; } }).subscribe(new Subscriber<Integer>() { @Override public void onCompleted() { System.out.println("Sequence complete."); }
@Override public void onError(Throwable e) { System.out.println("error:" + e.getMessage()); }
@Override public void onNext(Integer integer) { System.out.println("onNext:" + integer); } });
|
输出
#### first error:Sequence contains no elements
#### takeFirst Sequence complete.
|