If you want to introduce multithreading into your cascade of Observable operators, you can do so by instructing those operators (or particular Observables) to operate on particular Schedulers.
Some ReactiveX Observable operators have variants that take a Scheduler as a parameter. These instruct the operator to do some or all of its work on a particular Scheduler.
By default, an Observable and the chain of operators that you apply to it will do its work, and will notify its observers, on the same thread on which its Subscribe method is called. The SubscribeOn operator changes this behavior by specifying a different Scheduler on which the Observable should operate. The ObserveOn operator specifies a different Scheduler that the Observable will use to send notifications to its observers.
As shown in this illustration, the SubscribeOn operator designates which thread the Observable will begin operating on, no matter at what point in the chain of operators that operator is called. ObserveOn, on the other hand, affects the thread that the Observable will use below where that operator appears. For this reason, you may call ObserveOn multiple times at various points during the chain of Observable operators in order to change on which threads certain of those operators operate.
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer"); // 这里调用了抽象方法`subscribeActual` subscribeActual(observer); } catch (NullPointerException e) { // NOPMD throw e; } catch (Throwable e) { Exceptions.throwIfFatal(e); // can't call onError because no way to know if a Disposable has been set or not // can't call onSubscribe because the call might have set a Subscription already RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS"); npe.initCause(e); throw npe; } } }
ObservableObserveOn实现了subscribeActual方法
publicfinalclassObservableObserveOn<T> extendsAbstractObservableWithUpstream<T, T> { final Scheduler scheduler; ...省略代码专用...
@Override protectedvoidsubscribeActual(Observer<? super T> observer){ // 在当前线程调度,但不是立即执行,放入到队列中 if (scheduler instanceof TrampolineScheduler) { source.subscribe(observer); } else { Scheduler.Worker w = scheduler.createWorker(); // 为上游Observable source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize)); } }
publicabstractstaticclassWorkerimplementsDisposable{ /** * Schedules a Runnable for execution without any time delay. * * <p>The default implementation delegates to {@link #schedule(Runnable, long, TimeUnit)}. * * @param run * Runnable to schedule * @return a Disposable to be able to unsubscribe the action (cancel it if not executed) */ @NonNull public Disposable schedule(@NonNull Runnable run){ return schedule(run, 0L, TimeUnit.NANOSECONDS); } /** * Schedules an Runnable for execution at some point in the future specified by a time delay * relative to the current time. * <p> * Note to implementors: non-positive {@code delayTime} should be regarded as non-delayed schedule, i.e., * as if the {@link #schedule(Runnable)} was called. * * @param run * the Runnable to schedule * @param delay * time to "wait" before executing the action; non-positive values indicate an non-delayed * schedule * @param unit * the time unit of {@code delayTime} * @return a Disposable to be able to unsubscribe the action (cancel it if not executed) */ @NonNull publicabstract Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit unit); ...省略代码专用... }
@NonNull @Override public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit){ if (tasks.isDisposed()) { // don't schedule, we are unsubscribed return EmptyDisposable.INSTANCE; }
return threadWorker.scheduleActual(action, delayTime, unit, tasks); } }
调用scheduleActual,在看下实现
publicclassNewThreadWorkerextendsScheduler.WorkerimplementsDisposable{ ...省略代码专用... /** * Wraps the given runnable into a ScheduledRunnable and schedules it * on the underlying ScheduledExecutorService. * <p>If the schedule has been rejected, the ScheduledRunnable.wasScheduled will return * false. * @param run the runnable instance * @param delayTime the time to delay the execution * @param unit the time unit * @param parent the optional tracker parent to add the created ScheduledRunnable instance to before it gets scheduled * @return the ScheduledRunnable instance */ @NonNull public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent){ Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
if (parent != null) { if (!parent.add(sr)) { return sr; } }
Future<?> f; try { // 判断延迟时间,然后使用线程池运行 Runnable if (delayTime <= 0) { f = executor.submit((Callable<Object>)sr); } else { f = executor.schedule((Callable<Object>)sr, delayTime, unit); } sr.setFuture(f); } catch (RejectedExecutionException ex) { if (parent != null) { parent.remove(sr); } RxJavaPlugins.onError(ex); }
booleancheckTerminated(boolean d, boolean empty, Observer<? super T> a){ if (cancelled) { // 订阅已经取消,则取消队列 queue.clear(); returntrue; } // 这个d就是传进来的done if (d) { // done == true可能的情况onNext刚被调度完,onError或者onComplete被调用 Throwable e = error; if (delayError) { // delayError == true时等到队列为空才调用 if (empty) { if (e != null) { a.onError(e); } else { a.onComplete(); } worker.dispose(); returntrue; } } else { // 直接调用 if (e != null) { queue.clear(); a.onError(e); worker.dispose(); returntrue; } else if (empty) { a.onComplete(); worker.dispose(); returntrue; } } } // 不终结 returnfalse; }
最后来看下drainNormal做了哪些事情
oid drainNormal(){ int missed = 1;
final SimpleQueue<T> q = queue; final Observer<? super T> a = actual; // 死循环,注意出口操作 for (;;) { if (checkTerminated(done, q.isEmpty(), a)) { return; // 上面方法被终止,不进行下面操作 } // 再次死循环 for (;;) { boolean d = done; T v;
/** A {@link Scheduler} which executes actions on the Android main thread. */ publicstatic Scheduler mainThread(){ return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD); }
/** A {@link Scheduler} which executes actions on {@code looper}. */ publicstatic Scheduler from(Looper looper){ if (looper == null) thrownew NullPointerException("looper == null"); returnnew HandlerScheduler(new Handler(looper)); }