概述:RxJava 在切换线程时用到了两个方法 subscribeOn()observeOn() ,那么它纠结做了什么呢?

If you want to introduce multithreading into your cascade of Observable operators, you can do so by instructing those operators (or particular Observables) to operate on particular Schedulers.

Some ReactiveX Observable operators have variants that take a Scheduler as a parameter. These instruct the operator to do some or all of its work on a particular Scheduler.

By default, an Observable and the chain of operators that you apply to it will do its work, and will notify its observers, on the same thread on which its Subscribe method is called. The SubscribeOn operator changes this behavior by specifying a different Scheduler on which the Observable should operate. The ObserveOn operator specifies a different Scheduler that the Observable will use to send notifications to its observers.

As shown in this illustration, the SubscribeOn operator designates which thread the Observable will begin operating on, no matter at what point in the chain of operators that operator is called. ObserveOn, on the other hand, affects the thread that the Observable will use below where that operator appears. For this reason, you may call ObserveOn multiple times at various points during the chain of Observable operators in order to change on which threads certain of those operators operate.

  • subscribeOn():影响的是最开始的被观察者所在的线程。当使用多个 subscribeOn() 的时候,只有第一个 subscribeOn() 起作用;
  • observeOn():影响的是跟在后面的操作(指定观察者运行的线程)。所以如果想要多次改变线程,可以多次使用 observeOn;

示例

Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
Log.e(TAG, "create: " + Thread.currentThread().getName());
emitter.onNext("1");
}
})
.map(new Function<String, Integer>() {
@Override
public Integer apply(String s) throws Exception {
Log.e(TAG, "String -> Integer: " + Thread.currentThread().getName());
return Integer.valueOf(s);
}
})
.flatMap(new Function<Integer, Observable<String>>() {
@Override
public Observable<String> apply(Integer integer) throws Exception {
Log.e(TAG, "Integer->Observable: " + Thread.currentThread().getName());
return Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
Log.e(TAG, "Observable<String> call: " + Thread.currentThread().getName());
for (int i = 0; i < integer; i++) {
emitter.onNext(i + "");
}
emitter.onComplete();
}
});
}
})
.map(new Function<String, Long>() {
@Override
public Long apply(String s) throws Exception {
Log.e(TAG, "String->Long: " + Thread.currentThread().getName());
return Long.parseLong(s);
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Long>() {
@Override
public void onError(Throwable e) {
}

@Override
public void onComplete() {

}

@Override
public void onSubscribe(Disposable d) {

}

@Override
public void onNext(Long aLong) {
Log.e(TAG, "onNext: " + Thread.currentThread().getName());
}
});

observeOn() 的线程切换原理

observeOn切入

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError) {
return observeOn(scheduler, delayError, bufferSize());
}

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}

observeOn() 执行后是得到 ObservableObserveOn 对象,那么当 ObservableObserveOn 绑定监听者的时候要运行 subscribe() 方法

...省略代码专用...

public abstract class Observable<T> implements ObservableSource<T> {

...省略代码专用...

@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);

ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
// 这里调用了抽象方法`subscribeActual`
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);

NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
}

ObservableObserveOn实现了subscribeActual方法

public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;

...省略代码专用...

@Override
protected void subscribeActual(Observer<? super T> observer) {
// 在当前线程调度,但不是立即执行,放入到队列中
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
// 为上游Observable
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}

...省略代码专用...

}

这里就可以知道ObservableObserveOn 是被 ObserveOnObserver 监听的,所以收到通知也是由 ObserveOnObserver 作出响应,接下来我们假设当 Rxjava 发送 onNext 通知时会调用 ObserveOnObserveronNext() 方法 ( PS:当然如果是 onComplete()、onError() 等也是一样的逻辑 ),然后来看一看 ObserveOnObserveronNext() 方法,

static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
implements Observer<T>, Runnable {

private static final long serialVersionUID = 6576896619930983584L;
// 下游的Observer
final Observer<? super T> actual;
// 调度工作者
final Scheduler.Worker worker;
// 是否延迟错误,默认false
final boolean delayError;
// 队列大小
final int bufferSize;
// 存储上游Observable下发的数据队列
SimpleQueue<T> queue;
// 存储下游的Observer的Disposable
Disposable s;
// 错误信息
Throwable error;
// 校验是否完毕
volatile boolean done;
// 是否被取消
volatile boolean cancelled;
// 执行模式,同步或者异步
int sourceMode;

boolean outputFused;

...省略代码专用...

@Override
public void onSubscribe(Disposable s) {
if (DisposableHelper.validate(this.s, s)) {
this.s = s;
if (s instanceof QueueDisposable) {
@SuppressWarnings("unchecked")
QueueDisposable<T> qd = (QueueDisposable<T>) s;

int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);
// 判断执行模式并调用onSubscribe传递给下游Observer
if (m == QueueDisposable.SYNC) {
sourceMode = m;
queue = qd;
// 后面的onXX方法都不会被调用
done = true;
actual.onSubscribe(this);
// 同步模式下,直接调用schedule
schedule();
return;
}
if (m == QueueDisposable.ASYNC) {
sourceMode = m;
queue = qd;
actual.onSubscribe(this);
// 异步模式下,等待schedule
return;
}
}

queue = new SpscLinkedArrayQueue<T>(bufferSize);
// 判断执行模式并调用onSubscribe传递给下游Observer
actual.onSubscribe(this);
}
}

@Override
public void onNext(T t) {
// 数据源是同步模式或者执行过 error / complete 会是true
if (done) {
return;
}
// 如果数据源不是异步类型
if (sourceMode != QueueDisposable.ASYNC) {
// 上游Observable下发的数据压入queue
queue.offer(t);
}
// 开始调度
schedule();
}

@Override
public void onError(Throwable t) {
if (done) {
// 已经完成再执行会抛异常
RxJavaPlugins.onError(t);
return;
}
// 记录错误信息
error = t;
// 标记已完成
done = true;
// 开始调度
schedule();
}

@Override
public void onComplete() {
if (done) {
return;
}
done = true;
schedule();
}

...省略代码专用...

}

眼光转向schedule()

void schedule() {
// 原子性
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}

调用workerschedule()方法,将自己传递进去。来看声明,schedule方法接收Runnable对象为参数,而ObserveOnObserver实现了Runnable接口

public abstract static class Worker implements Disposable {
/**
* Schedules a Runnable for execution without any time delay.
*
* <p>The default implementation delegates to {@link #schedule(Runnable, long, TimeUnit)}.
*
* @param run
* Runnable to schedule
* @return a Disposable to be able to unsubscribe the action (cancel it if not executed)
*/
@NonNull
public Disposable schedule(@NonNull Runnable run) {
return schedule(run, 0L, TimeUnit.NANOSECONDS);
}

/**
* Schedules an Runnable for execution at some point in the future specified by a time delay
* relative to the current time.
* <p>
* Note to implementors: non-positive {@code delayTime} should be regarded as non-delayed schedule, i.e.,
* as if the {@link #schedule(Runnable)} was called.
*
* @param run
* the Runnable to schedule
* @param delay
* time to "wait" before executing the action; non-positive values indicate an non-delayed
* schedule
* @param unit
* the time unit of {@code delayTime}
* @return a Disposable to be able to unsubscribe the action (cancel it if not executed)
*/
@NonNull
public abstract Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit unit);

...省略代码专用...

}

这里调用了重载方法,是个抽象方法。示例使用的是Schedulers.io(),追根溯源最终到了IoScheduler,直接找到schedule方法

static final class EventLoopWorker extends Scheduler.Worker {
private final CompositeDisposable tasks;
private final CachedWorkerPool pool;
private final ThreadWorker threadWorker;

final AtomicBoolean once = new AtomicBoolean();

EventLoopWorker(CachedWorkerPool pool) {
this.pool = pool;
this.tasks = new CompositeDisposable();
this.threadWorker = pool.get();
}

@Override
public void dispose() {
if (once.compareAndSet(false, true)) {
tasks.dispose();

// releasing the pool should be the last action
pool.release(threadWorker);
}
}

@Override
public boolean isDisposed() {
return once.get();
}

@NonNull
@Override
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
if (tasks.isDisposed()) {
// don't schedule, we are unsubscribed
return EmptyDisposable.INSTANCE;
}

return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}
}

调用scheduleActual,在看下实现

public class NewThreadWorker extends Scheduler.Worker implements Disposable {

...省略代码专用...

/**
* Wraps the given runnable into a ScheduledRunnable and schedules it
* on the underlying ScheduledExecutorService.
* <p>If the schedule has been rejected, the ScheduledRunnable.wasScheduled will return
* false.
* @param run the runnable instance
* @param delayTime the time to delay the execution
* @param unit the time unit
* @param parent the optional tracker parent to add the created ScheduledRunnable instance to before it gets scheduled
* @return the ScheduledRunnable instance
*/
@NonNull
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);

if (parent != null) {
if (!parent.add(sr)) {
return sr;
}
}

Future<?> f;
try {
// 判断延迟时间,然后使用线程池运行 Runnable
if (delayTime <= 0) {
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
if (parent != null) {
parent.remove(sr);
}
RxJavaPlugins.onError(ex);
}

return sr;
}

...省略代码专用...

}

兜圈回到ObservableObserveOnrun方法,然后调用onNext方法

@Override
public void run() {
if (outputFused) {
drainFused();
} else {
drainNormal();
}
}

先看下checkTerminated方法,做了什么操作

boolean checkTerminated(boolean d, boolean empty, Observer<? super T> a) {
if (cancelled) {
// 订阅已经取消,则取消队列
queue.clear();
return true;
}
// 这个d就是传进来的done
if (d) {
// done == true可能的情况onNext刚被调度完,onError或者onComplete被调用
Throwable e = error;
if (delayError) {
// delayError == true时等到队列为空才调用
if (empty) {
if (e != null) {
a.onError(e);
} else {
a.onComplete();
}
worker.dispose();
return true;
}
} else {
// 直接调用
if (e != null) {
queue.clear();
a.onError(e);
worker.dispose();
return true;
} else
if (empty) {
a.onComplete();
worker.dispose();
return true;
}
}
}
// 不终结
return false;
}

最后来看下drainNormal做了哪些事情

oid drainNormal() {
int missed = 1;

final SimpleQueue<T> q = queue;
final Observer<? super T> a = actual;
// 死循环,注意出口操作
for (;;) {
if (checkTerminated(done, q.isEmpty(), a)) {
return; // 上面方法被终止,不进行下面操作
}
// 再次死循环
for (;;) {
boolean d = done;
T v;

try {
// 分发数据出队列
v = q.poll();
} catch (Throwable ex) {
// 有异常时终止退出
Exceptions.throwIfFatal(ex);
s.dispose();
q.clear();
a.onError(ex);
// 停止woker(线程)
worker.dispose();
return;
}
boolean empty = v == null;
// 判断队列是否为空
if (checkTerminated(d, empty, a)) {
return;
}
// 没数据跳出循环
if (empty) {
break;
}
// 数据下发给下游Observer,onNext, onComplete和onError主要放在了checkTerminated里面回调
a.onNext(v);
}
// 保证此时有一个worker.schedule(this);正在执行
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}

subscribeOn() 的线程切换原理

同样,这里从subscribeOn作为切入点

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}

执行subscribeOn得到装饰者模式对象ObservableSubscribeOn

public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;

public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}

@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);

s.onSubscribe(parent);

parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}

...省略代码专用...

}

说一说RxAndroid

/** Android-specific Schedulers. */
public final class AndroidSchedulers {

private static final class MainHolder {

static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
}

private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
new Callable<Scheduler>() {
@Override public Scheduler call() throws Exception {
return MainHolder.DEFAULT;
}
});

/** A {@link Scheduler} which executes actions on the Android main thread. */
public static Scheduler mainThread() {
return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
}

/** A {@link Scheduler} which executes actions on {@code looper}. */
public static Scheduler from(Looper looper) {
if (looper == null) throw new NullPointerException("looper == null");
return new HandlerScheduler(new Handler(looper));
}

private AndroidSchedulers() {
throw new AssertionError("No instances.");
}
}

AndroidSchedulers.mainThread()返回的是HandlerScheduler,传递了一个Handler拿到主线程的Looper的对象