转载请以链接形式标明出处: 本文出自:103style的博客
Base on RxJava 2.X
首先我们来看subscribeOn和observeOn这两个方法的实现:
subscribeOn
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}observeOn
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError) {
return observeOn(scheduler, delayError, bufferSize());
}
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}我们可以看到分别返回了ObservableSubscribeOn和ObservableObserveOn对象,下面对这两个类分别介绍。
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
@Override
public void subscribeActual(final Observer<? super T> observer) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
observer.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
....
}通过之前的 Rxjava之create操作符源码解析 的介绍,我们知道subscribe(observer)实际上是调用前一步返回对象的subscribeActual(observer);方法。
这里首先构造了一个 SubscribeOnObserver对象,然后执行 观察者 的 onSubscribe 方法。
然后将在传入的Scheduler中执行任务完成返回的结果传入 SubscribeOnObserver的 setDisposable方法。
scheduler.scheduleDirect(new SubscribeTask(parent)),这里通过之前 RxJava之Schedulers源码介绍 我们知道,实际时候执行了 SubscribeTask(parent)的 run方法。通过下面的源代码source.subscribe(parent),我们知道 实际上 run 方法 就是 调用了subscribeOn前一步操作符返回对象的 subscribeActual(observer);方法。
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
source.subscribe(parent);
}
}SubscribeOnObserver源码:
static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
private static final long serialVersionUID = 8094547886072529208L;
final Observer<? super T> downstream;
final AtomicReference<Disposable> upstream;
SubscribeOnObserver(Observer<? super T> downstream) {
this.downstream = downstream;
this.upstream = new AtomicReference<Disposable>();
}
@Override
public void onSubscribe(Disposable d) {
DisposableHelper.setOnce(this.upstream, d);
}
@Override
public void onNext(T t) {
downstream.onNext(t);
}
@Override
public void onError(Throwable t) {
downstream.onError(t);
}
@Override
public void onComplete() {
downstream.onComplete();
}
...
void setDisposable(Disposable d) {
DisposableHelper.setOnce(this, d);
}
}我们可以看到 onNext、onError、onComplete 实际上还是调用了 观察者的 对应方法。
DisposableHelper.setOnce(this, d); 即为设置SubscribeOnObserver的value值为线程池执行的任务结果。
public static boolean setOnce(AtomicReference<Disposable> field, Disposable d) {
ObjectHelper.requireNonNull(d, "d is null");
if (!field.compareAndSet(null, d)) {
d.dispose();
if (field.get() != DISPOSED) {
reportDisposableSet();
}
return false;
}
return true;
}我们来个示例介绍下:
Observable
.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
System.out.println("subscribe = " + Thread.currentThread().getName());
for (int i = 0; i < 3; i++) {
emitter.onNext(String.valueOf(i));
}
emitter.onComplete();
}
})
.subscribeOn(Schedulers.single())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("onSubscribe thread name = " + Thread.currentThread().getName());
}
@Override
public void onNext(String s) {
System.out.println("onNext s = " + s + " thread name = " + Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
System.out.println("onError thread name = " + Thread.currentThread().getName());
}
@Override
public void onComplete() {
System.out.println("onComplete thread name = " + Thread.currentThread().getName());
}
});输出结果:
onSubscribe thread name = main
subscribe = RxSingleScheduler-1
onNext s = 0 thread name = RxSingleScheduler-1
onNext s = 1 thread name = RxSingleScheduler-1
onNext s = 2 thread name = RxSingleScheduler-1
onComplete thread name = RxSingleScheduler-1通过输出结果我们可以看到 任务处理都是在 Schedulers.single()构建的线程池中执行的。
现在来一步一步介绍,顺便复习一下:
流程图大致如下:

(1.0) create 操作符 返回的是 ObservableCreate对象。(2.0) 然后 ObservableCreate.subscribeOn(Schedulers.single())返回 source 为ObservableCreate,scheduler 为 SingleScheduler 的 ObservableSubscribeOn对象。(3.0) 然后 ObservableSubscribeOn.subscribe(new Observer<T>(){}),即调用 ObservableSubscribeOn 的 subscribeActual(observer)。(4.0) 然后执行 observer.onSubscribe(parent);,即执行观察者的 onSubscribe(...)方法。(5.0) 接着在SingleScheduler构建的线程池中执行 SubscribeTask 的 run方法(source.subscribe(parent))。
即执行 ObservableCreate.subscribe(new SubscribeOnObserver<T>(observer))。
即为 ObservableCreate.subscribeActual(new SubscribeOnObserver<T>(observer))。(6.0) 然后执行 SubscribeOnObserver 的 onSubscribe(...) 。(7.0) 然后执行create操作符传进来的ObservableOnSubscribe的 subscribe(ObservableEmitter<String> emitter)方法。(8.0) 接着我们在subscribe(...)中依次执行了 三次onNext和 一次onComplete。
即调用 new SubscribeOnObserver<T>(observer)的三次onNext和 一次onComplete。
即为subscribe传入的observer的三次onNext和 一次onComplete。observeOn函数中的bufferSize,在2.X中默认为 128.public static int bufferSize() {
return Flowable.bufferSize();
}
public static int bufferSize() {
return BUFFER_SIZE;
}
static final int BUFFER_SIZE;
static {
BUFFER_SIZE = Math.max(1, Integer.getInteger("rx2.buffer-size", 128));
}observeOn 方法:
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}ObservableObserveOn主要的方法:
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
final boolean delayError;
final int bufferSize;
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
super(source);
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
...
}source为链式调用上一步返回的对象。Scheduler、delayError、bufferSize的值。subscribe 的时候 调用 subscribeActual, 先判断 scheduler是否是 TrampolineScheduler的子类: observer 传给 链式调用上一步返回的对象的 subscribeActual方法。observer 包装成一个ObserveOnObserver 对象传给 链式调用上一步返回的对象的 subscribeActual方法。subscribeOn 的介绍, 我们知道接下来就是调用 观察者的 onSubscribe 方法,以及后续的调用逻辑 onNext、onComplete以及onError,即ObserveOnObserver 对象对应的方法。接下来我们看看 ObserveOnObserver 的源码:
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
implements Observer<T>, Runnable {
final Observer<? super T> downstream;
final Scheduler.Worker worker;
final boolean delayError;
final int bufferSize;
...
ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
this.downstream = actual;
this.worker = worker;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
@Override
public void onSubscribe(Disposable d) {
if (DisposableHelper.validate(this.upstream, d)) {
...
queue = new SpscLinkedArrayQueue<T>(bufferSize);
downstream.onSubscribe(this);
}
}
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
schedule();
}
@Override
public void onError(Throwable t) {
if (done) {
RxJavaPlugins.onError(t);
return;
}
error = t;
done = true;
schedule();
}
@Override
public void onComplete() {
if (done) {
return;
}
done = true;
schedule();
}
...
}重写的onSubscribe 即调用观察者的 onSubscribe。
onNext、onError、onComplete都是调用 schedule()。
我们来看看schedule()的实现:即在传进来的 Scheduler 对象构建的线程池里执行当前类的 run()。
void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}run()的代码实现:
public void run() {
if (outputFused) {
drainFused();
} else {
drainNormal();
}
}outputFused 默认是 false,我们看看 drainNormal()的代码实现:
当outputFused为 true是,则下面调用的onNext 改成 onComplete。
void drainNormal() {
int missed = 1;
final SimpleQueue<T> q = queue; //1.0
final Observer<? super T> a = downstream;
for (;;) {
if (checkTerminated(done, q.isEmpty(), a)) {
return;
}
for (;;) {
boolean d = done;
T v;
try {
v = q.poll();//2.0
} catch (Throwable ex) {
...
a.onError(ex);
worker.dispose();
return;
}
boolean empty = v == null;
if (checkTerminated(d, empty, a)) {//2.1
return;
}
if (empty) {//2.2
break;
}
a.onNext(v);//2.3
}
missed = addAndGet(-missed);//3.0
if (missed == 0) {//3.1
break;
}
}
}(1.0): 我们在上面的 onNext() 中看到,每次调用都会把传入的对象存入queue中。(2.0): 在循环中依次获取存入的对象,(2.1)如果 已经是done状态 或者 disposed则直接结束。(2.2)如果 队列中没有对象了,即终止循环。(2.3)否则调用 观察者 的 onNext 方法。(3.0): addAndGet(-missed);即通过原子操作把·missed·的值置为0。(3.1)然后结束onNext。来我们继续举个例子:给subscribeOn例子加上observeOn 方法:
Observable
.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
System.out.println("subscribe = " + Thread.currentThread().getName());
for (int i = 0; i < 5; i++) {
emitter.onNext(String.valueOf(i));
}
emitter.onComplete();
}
})
.subscribeOn(Schedulers.single())
.observeOn(Schedulers.io())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("d.classname = " + d.getClass().getSimpleName());
System.out.println("onSubscribe thread name = " + Thread.currentThread().getName());
}
@Override
public void onNext(String s) {
System.out.println("onNext s = " + s + " thread name = " + Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
System.out.println("onError thread name = " + Thread.currentThread().getName());
}
@Override
public void onComplete() {
System.out.println("onComplete thread name = " + Thread.currentThread().getName());
}
});输出结果:
System.out: d.classname = ObserveOnObserver
System.out: onSubscribe thread name = main
System.out: subscribe = RxSingleScheduler-1
System.out: onNext s = 0 thread name = RxCachedThreadScheduler-1
System.out: onNext s = 1 thread name = RxCachedThreadScheduler-1
System.out: onNext s = 2 thread name = RxCachedThreadScheduler-1
System.out: onComplete thread name = RxCachedThreadScheduler-1通过输出结果我们可以看到 :
create操作符 传入的ObservableOnSubscribe 的 subscribe方法是在Schedulers.single()构建的线程池中执行的。onNext 和onComplete 则是在Schedulers.io()构建的线程池中执行的 。继续来看下subscribeOn流程图:

上述示例相对于 subscribeOn来说只是 把 subscribe(observer) 里得参数改成了 ObserveOnObserver对象。
(4.0:) 执行ObserveOnObserver 的 onSubscribe方法。即observer.onSubscribe(ObserveOnObserver) 即下面方法的 Disposable对象为ObserveOnObserver对象。
new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
...
});(5.0:) 在SingleScheduler构建的线程池中执行source.subscribe(parent);,即运行如下代码:
ObservableCreate.subscribeActual(
new ObserveOnObserver<T>(
observer,
new EventLoopWorker(new CachedWorkerPool(KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT, threadFactory)),
delayError,
bufferSize)
);我们再来回顾下ObservableCreate.subscribeActual(observer):
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
...
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException(...));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
@Override
public void onComplete() {
if (!isDisposed()) {
try {
observer.onComplete();
} finally {
dispose();
}
}
}
...
}(8.0:) 所以调用 onNext(T t)和onComplete()即调用 ObserveOnObserver对象的 onNext(T t)和onComplete()。 即切换到Schedulers.io()构建的线程池执行onNext(T t)和onComplete()。
subscribeOn返回得即ObservableSubscribeOn对象。
ObservableSubscribeOn的subscribeActual即为在 传入的 XXXScheduler中 执行 上一步返回对象的 subscribeActual方法。
observeOn返回得即ObservableObserveOn对象。
ObservableObserveOn的subscribeActual即为把 传入的 XXXScheduler 和 observer包装成一个 Observer 传给上一步返回对象的 subscribeActual方法,让 onNext、onComplete、onNext都在传入的 XXXScheduler 构建的线程池中执行。
所以,你知道RxJava是如何完成线程切换的了吗?
以上