概述:通过代码介绍RxJava中的操作符,以及操作符的使用。

操作符分类

创建操作

用于创建Observable的操作符

Create — 通过调用观察者的方法从头创建一个Observable

Observable.create(new OnSubscribe<Integer>() {
public void call(Subscriber<? super Integer> subscriber) {
if (!subscriber.isUnsubscribed()) {
for (int i = 0; i < 5; i++) {
subscriber.onNext(i);
}
subscriber.onCompleted();
}
}
}).subscribe(new Subscriber<Integer>() {
@Override
public void onNext(Integer item) {
System.out.println("Next: " + item);
}

@Override
public void onError(Throwable error) {
System.err.println("Error: " + error.getMessage());
}

@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}
});

输出

Next: 0
Next: 1
Next: 2
Next: 3
Next: 4
Sequence complete.

From — 将其它的对象或数据结构转换为Observable

Integer[] items = {0, 1, 2, 3, 4};
Observable<Integer> observable = Observable.from(items);
observable.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println(integer);
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
System.out.println("Error encountered: " + throwable.getMessage());
}
}, new Action0() {
@Override
public void call() {
System.out.println("Sequence complete");
}
});

输出

0
1
2
3
4
Sequence complete

Just — 将对象或者对象集合转换为一个会发射这些对象的Observable

Observable.just(1, 2, 3).subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}

@Override
public void onError(Throwable error) {
System.err.println("Error: " + error.getMessage());
}

@Override
public void onNext(Integer integer) {
System.out.println("Next: " + integer);
}
});

输出

Next: 1
Next: 2
Next: 3
Sequence complete.

Defer — 在观察者订阅之前不创建这个Observable,为每一个观察者创建一个新的Observable

static String i = "旧数据";
Observable<String> observable = Observable.defer(new Func0<Observable<String>>() {
@Override
public Observable<String> call() {
return Observable.just(i);
}
});
Subscriber subscriber=new Subscriber() {
@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}

@Override
public void onError(Throwable error) {
System.err.println("Error: " + error.getMessage());
}

@Override
public void onNext(Object o) {
System.out.println("Next: " + o.toString());
}
};
i = "新数据";
observable.subscribe(subscriber);

输出

Next: 新数据
Sequence complete.

Empty/Never/Throw — 创建行为受限的特殊Observable

Empty
创建一个不发射任何数据但是正常终止的Observable

Never
创建一个不发射数据也不终止的Observable

Throw
创建一个不发射数据以一个错误终止的Observable

Observable.empty().subscribe(new Subscriber() {
@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}

@Override
public void onError(Throwable e) {
System.out.println("error:" + e.getMessage());
}

@Override
public void onNext(Object i) {
System.out.println("Next:" + i.toString());
}
});

输出

Sequence complete.

Timer — 创建在一个指定的延迟之后发射单个数据的Observable

Observable.timer(3, TimeUnit.SECONDS).subscribe(new Subscriber<Long>() {
@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}

@Override
public void onError(Throwable e) {
System.out.println("error:" + e.getMessage());
}

@Override
public void onNext(Long aLong) {
System.out.println("Next:" + aLong.toString());
}
});

输出

I/System.out: Next:0
I/System.out: Sequence complete.

Interval — 创建一个定时发射整数序列的Observable

Observable.interval(3, TimeUnit.SECONDS).subscribe(new Subscriber<Long>() {
@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}

@Override
public void onError(Throwable e) {
System.out.println("error:" + e.getMessage());
}

@Override
public void onNext(Long aLong) {
System.out.println("Next:" + aLong.toString());
}
});

输出

I/System.out: Next:0
I/System.out: Next:1
I/System.out: Next:2
I/System.out: Next:3
I/System.out: Next:4
I/System.out: Next:5
I/System.out: Next:6
I/System.out: Next:7
...

Range — 创建发射指定范围的整数序列的Observable

Observable.range(3, 8).subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}

@Override
public void onError(Throwable e) {
System.out.println("error:" + e.getMessage());
}

@Override
public void onNext(Integer integer) {
System.out.println("Next:" + integer.toString());
}
});

输出

Next:3
Next:4
Next:5
Next:6
Next:7
Next:8
Next:9
Next:10
Sequence complete.

Repeat — 创建重复发射特定的数据或数据序列的Observable

Observable.range(3, 3).repeat(2).subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}

@Override
public void onError(Throwable e) {
System.out.println("error:" + e.getMessage());
}

@Override
public void onNext(Integer i) {
System.out.println("Next:" + i.toString());
}
});

输出

Next:3
Next:4
Next:5
Next:3
Next:4
Next:5
Sequence complete.

变换操作

Map — 映射,通过对序列的每一项都应用一个函数变换Observable发射的数据,实质是对序列中的每一项执行一个函数,函数的参数就是这个数据项
Integer[] integers = {0, 9, 6, 4, 8};
Observable.from(integers).map(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) {
System.out.println("call: "+integer);
return (integer > 5);
}
}).subscribe(new Subscriber<Boolean>() {
@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}

@Override
public void onError(Throwable e) {
System.out.println("error:" + e.getMessage());
}

@Override
public void onNext(Boolean b) {
System.out.println("Next:" + b.toString());
}
});

输出

call: 0
Next:false
call: 9
Next:true
call: 6
Next:true
call: 4
Next:false
call: 8
Next:true
Sequence complete.

Cast — 强制类型转换,通过对序列的每一项都强制类型变换Observable发射的数据
Integer[] integers = {0, 9, 6, 4, 8};
Observable.just(integers).cast(String.class).subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}

@Override
public void onError(Throwable e) {
System.out.println("error:" + e.getMessage());
}

@Override
public void onNext(String s) {
System.out.println("Next:" + s);
}

});

输出

error:Cannot cast [Ljava.lang.Integer; to java.lang.String

FlatMap — 扁平映射,将Observable发射的数据变换为Observables集合,然后将这些Observable发射的数据平坦化的放进一个单独的Observable,可以认为是一个将嵌套的数据结构展开的过程。

Integer[] integers = {1, 2, 3};
Observable.from(integers).flatMap(new Func1<Integer, Observable<String>>() {
@Override
public Observable<String> call(final Integer integer) {
return Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
System.out.println("call: FlatMap " + Thread.currentThread().getName());
try {
Thread.sleep(200);
subscriber.onNext(integer + 100 + " FlatMap");
subscriber.onCompleted();
} catch (InterruptedException e) {
e.printStackTrace();
subscriber.onError(e);
}
}
}).subscribeOn(Schedulers.newThread());
}
}).observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}

@Override
public void onError(Throwable e) {
System.out.println("error:" + e.getMessage());
}

@Override
public void onNext(String s) {
System.out.println("Next:" + s);
}
});

输出

call: FlatMap RxNewThreadScheduler-1
call: FlatMap RxNewThreadScheduler-2
call: FlatMap RxNewThreadScheduler-3
Next:101 FlatMap
Next:102 FlatMap
Next:103 FlatMap
Sequence complete.

ConcatMap - 该操作符是类似于最简单版本的flatMap,但是它按次序连接而不是合并那些生成的Observables,然后产生自己的数据序列。
Integer[] integers = {1, 2, 3};
Observable.from(integers).concatMap(new Func1<Integer, Observable<String>>() {
@Override
public Observable<String> call(final Integer integer) {
return Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
System.out.println("call: FlatMap " + Thread.currentThread().getName());
try {
Thread.sleep(200);
subscriber.onNext(integer + 100 + " FlatMap");
subscriber.onCompleted();
} catch (InterruptedException e) {
e.printStackTrace();
subscriber.onError(e);
}
}
}).subscribeOn(Schedulers.newThread());
}
}).observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}

@Override
public void onError(Throwable e) {
System.out.println("error:" + e.getMessage());
}

@Override
public void onNext(String s) {
System.out.println("Next:" + s);
}
});

输出

call: FlatMap RxNewThreadScheduler-1
call: FlatMap RxNewThreadScheduler-2
Next:101 FlatMap
Next:102 FlatMap
call: FlatMap RxNewThreadScheduler-3
Next:103 FlatMap
Sequence complete.

SwitchMap - 当原始Observable发射一个新的数据(Observable)时,它将取消订阅并停止监视产生执之前那个数据的Observable,只监视当前这一个。
Integer[] integers = {1, 2, 3};
Observable.from(integers).switchMap(new Func1<Integer, Observable<String>>() {
@Override
public Observable<String> call(Integer integer) {
System.out.println("call: SwitchMap " + Thread.currentThread().getName());
// 如果不通过subscribeOn(Schedulers.newThread())在在子线程模拟并发操作,所有数据源依然会全部输出,也就是并发操作此操作符才有作用
// 若在此通过Thread。sleep()设置等待时间,则输出信息会不一样。相当于模拟并发程度
return Observable.just((integer + 100) + "SwitchMap")
.subscribeOn(Schedulers.newThread());
}
}).observeOn(AndroidSchedulers.mainThread()).subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}

@Override
public void onError(Throwable e) {
System.out.println("error:" + e.getMessage());
}

@Override
public void onNext(String s) {
System.out.println("Next:" + s);
}
});

输出

call: SwitchMap main
call: SwitchMap main
call: SwitchMap main
Next:103SwitchMap
Sequence complete.

GroupBy - 将原始Observable分拆为一些Observables集合,它们中的每一个发射原始Observable数据序列的一个子序列。哪个数据项由哪一个Observable发射是由一个函数判定的,这个函数给每一项指定一个Key,Key相同的数据会被同一个Observable发射。

Observable.range(1, 10).groupBy(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) {
return integer % 2 == 0;
}
}).subscribe(new Subscriber<GroupedObservable<Boolean, Integer>>() {

@Override
public void onCompleted() {
System.out.println("Sequence complete.1");
}

@Override
public void onError(Throwable e) {
System.out.println("error:1" + e.getMessage());
}

@Override
public void onNext(GroupedObservable<Boolean, Integer> booleanIntegerGroupedObservable) {
booleanIntegerGroupedObservable.toList().subscribe(new Subscriber<List<Integer>>() {
@Override
public void onCompleted() {
System.out.println("Sequence complete.2");
}

@Override
public void onError(Throwable e) {
System.out.println("error:2" + e.getMessage());
}

@Override
public void onNext(List<Integer> integers) {
System.out.println("onNext:2" + integers);
}
});
}
});

输出

onNext:2[1, 3, 5, 7, 9]
Sequence complete.2
onNext:2[2, 4, 6, 8, 10]
Sequence complete.2
Sequence complete.1

Scan - 操作符对原始Observable发射的第一项数据应用一个函数,然后将那个函数的结果作为自己的第一项数据发射。它将函数的结果同第二项数据一起填充给这个函数来产生它自己的第二项数据。它持续进行这个过程来产生剩余的数据序列。
Observable.range(1, 4).scan(new Func2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer integer, Integer integer2) {
System.out.println("call: integer:"+integer+" integer2 "+integer2);
return integer+integer2;
}
}).subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}

@Override
public void onError(Throwable e) {
System.out.println("error:" + e.getMessage());
}

@Override
public void onNext(Integer integer) {
System.out.println("onNext:" + integer);
}
});

输出

onNext:1
call: integer:1 integer2 2
onNext:3
call: integer:3 integer2 3
onNext:6
call: integer:6 integer2 4
onNext:10
Sequence complete.

Buffer - 将一个Observable变换为另一个,原来的Observable正常发射数据,变换产生的Observable发射这些数据的缓存集合,如果原来的Observable发射了一个onError通知,Buffer会立即传递这个通知,而不是首先发射缓存的数据,即使在这之前缓存中包含了原始Observable发射的数据。

Observable.range(10, 6).buffer(2).subscribe(new Subscriber<List<Integer>>() {
@Override
public void onCompleted() {
System.out.println("Sequence complete.2");
}

@Override
public void onError(Throwable e) {
System.out.println("error:2" + e.getMessage());
}

@Override
public void onNext(List<Integer> integers) {
System.out.println("onNext:2" + integers);
}
});

输出

onNext:[10, 11]
onNext:[12, 13]
onNext:[14, 15]
Sequence complete.

Window - Window和Buffer类似,但不是发射来自原始Observable的数据包,它发射的是Observables,这些Observables中的每一个都发射原始Observable数据的一个子集,最后发射一个onCompleted通知。

Observable.range(10, 6).window(2).subscribe(new Subscriber<Observable<Integer>>() {
@Override
public void onCompleted() {
System.out.println("Sequence complete.1 ");
}

@Override
public void onError(Throwable e) {
System.out.println("error:1 " + e.getMessage());
}

@Override
public void onNext(Observable<Integer> integerObservable) {
System.out.println("onNext:1 ");
integerObservable.subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
System.out.println("Sequence complete.2 ");
}

@Override
public void onError(Throwable e) {
System.out.println("error:2 " + e.getMessage());
}

@Override
public void onNext(Integer integer) {
System.out.println("onNext:2 " + integer);
}
});
}
});

输出

onNext:1 
onNext:2 10
onNext:2 11
Sequence complete.2
onNext:1
onNext:2 12
onNext:2 13
Sequence complete.2
onNext:1
onNext:2 14
onNext:2 15
Sequence complete.2
Sequence complete.1

过滤操作

Filter - 接收一个Func1参数,我们可以在其中通过运用你自己的判断条件去判断我们要过滤的数据,当数据通过判断条件后返回true表示发射该项数据,否则就不发射,这样就过滤出了我们想要的数据。
Integer[] ints = {1, 2, 3, 4, 5, 6, 7, 8, 9};
Observable<Integer> observable = Observable.from(ints).filter(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) {
return integer % 2 != 0;// 返回true,就不会过滤掉,会发射数据,过滤掉返回false的值
}
});
observable.subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}

@Override
public void onError(Throwable e) {
System.out.println("error:" + e.getMessage());
}

@Override
public void onNext(Integer integer) {
System.out.println("onNext:" + integer);
}
});

输出

onNext:1
onNext:3
onNext:5
onNext:7
onNext:9
Sequence complete.

ofType - 过滤一个Observable只返回指定类型的数据
Observable.just(0x68, "Jinlin", 6, 33, "5upport", 8, "asdfasf", 1, "four", 0)
.ofType(String.class)
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}

@Override
public void onError(Throwable e) {
System.out.println("error:" + e.getMessage());
}

@Override
public void onNext(String s) {
System.out.println("Next:" + s);
}
});

输出

Next:Jinlin
Next:5upport
Next:asdfasf
Next:four
Sequence complete.

First - 如果我们只对Observable发射的第一项数据,或者满足某个条件的第一项数据感兴趣,则可以使用First操作符。
Observable.just(0, 1, 2, 3).first(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) {
return integer > 2;
}
}).subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}

@Override
public void onError(Throwable e) {
System.out.println("error:" + e.getMessage());
}

@Override
public void onNext(Integer integer) {
System.out.println("onNext:" + integer);
}
});

变形
Observable.just(11,12,13).firstOrDefault(10).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println("call:" + integer);
}
});

Observable.just(1013, 16).firstOrDefault(15, new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) {
return integer > 20;
}
}).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println("call:" + integer);
}
});

输出

onNext:3
Sequence complete.

takeFirst - 该操作符与first操作符的区别就是如果原始Observable没有发射任何满足条件的数据,first会抛出一个NoSuchElementException直接执行onError(),而takeFist会返回一个空的Observable(不调用onNext()但是会调用onCompleted)
#### first
Observable.just(10, 11).filter(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) {
return integer > 20;
}
}).first().subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}

@Override
public void onError(Throwable e) {
System.out.println("error:" + e.getMessage());
}

@Override
public void onNext(Integer integer) {
System.out.println("onNext:" + integer);
}
});

#### takeFirst
Observable.just(10, 11).takeFirst(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) {
return integer > 20;
}
}).subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}

@Override
public void onError(Throwable e) {
System.out.println("error:" + e.getMessage());
}

@Override
public void onNext(Integer integer) {
System.out.println("onNext:" + integer);
}
});

输出

#### first
error:Sequence contains no elements

#### takeFirst
Sequence complete.