RxJava之subscribeOn解惑

RxJava之subscribeOn解惑


有一天,我在使用RxJava和Retrofit实现Android上面的网络请求。忽然,遇到了一个坑,在过了这些坑以后获得一些经验,以为须要和你们分享一二。java

引子

用Retrofit搭配RxJava的朋友应该都知道,通常实现代码最终大都长得一幅下面的样子。api

public interface BeanApi {
    @GET("bean/{id}")
    Observable<Bean> getBean(@Path("id") int beanId);
}

BeanApi api = ···经过Retrofit的create实例化···

api.getBean(1)
  .subscribeOn(Schedulers.io())
  .observeOn(AndroidSchedulers.mainThread())
  .subscribe(
    bean -> {
      // do something on main thread.
    }, 
    throwable -> {
      // do something on main thread. 
    }
  );

上面的代码形式相信各位都写得很熟了,可是我实在烦透了每一个api调用的地方都写一次subscribOn,observeOn。而后,我找到一篇不错的文章——Don't break the chain: use RxJava's compose() operator,里面提到了一个方法避免这种重复代码(其实做者本意是要体现“不要打破链式操做”,而非避免重复代码)。最后改进到的代码就长下面的样子了。网络

// This is a common method.
<T> Transformer<T, T> applySchedulers() {  
    return observable -> observable.subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread());
}

api.getBean(1)
  .compose(applySchedulers())
  .subscribe(
    bean -> {
      // do something on main thread.
    }, 
    throwable -> {
      // do something on main thread. 
    }
  );

改进后的代码比原来的代码少了一行。可是写多几回以后,我仍是烦透了这个applySchedulers()。因而我疯了,就本身实现一个Retrofit的CallAdapter.Factory,让Retrotfit在每次调用api的时候自动就给我封装好subscribeOn和observeOn这些重复的代码,具体实现能够参考个人另一篇文章——经过委派模式包装一个RxJavaCallAdapterFactory。最后,个人代码就是长下面这个样子了。app

api.getBean(1)
  .subscribe(
    bean -> {
      // do something on main thread.
    }, 
    throwable -> {
      // do something on main thread. 
    }
  );

全部的subscribeOn和observeOn不用再写了。由于每次调用api.getBean(1),Retrotfit就调用我自定义的CallAdapter.Factory把结果封装成Observable对象的时候就已经把subscribeOn和observeOn添加上去了。async

问题

好,用得很爽。可是问题问题比办法多,因此问题来了。有几个特殊的地方我须要网络加载和结果监听都在当前线程。相信理解了上面代码的朋友都已经看出来了,如今我经过api.getBean(1)获取到的Observable<Bean>最后被订阅都会是在io线程获取网络数据,而后在mainThread线程进行结果处理。因此,我要想个办法出来,覆盖原来的Schedulers,包括subscribeOn的Scheduler和observeOn的Scheduler。因而,我写了下面的代码。ide

// isAsync is a boolean variable indicate whether the request is a asynchronous or not.
api.getBean(1)
  .subscribeOn(isAsync ? Schedulers.io() : Schedulers.immediate())
  .observeOn(isAsync ? AndroidSchedulers.mainThread() : Schedulers.immediate())
  .subscribe(
    bean -> {
      // do something on main thread.
    }, 
    throwable -> {
      // do something on main thread. 
    }
  );

上面的代码再结合我以前写的CallAdapter.Factory,其实就是至关于没有自定义CallAdapter.Factory以前显式调用两次subscribeOn和两次observeOn,就像下面的样子。this

api.getBean(1)
  .subscribeOn(Schedulers.io())
  .observeOn(AndroidSchedulers.mainThread())
  .subscribeOn(isAsync ? Schedulers.io() : Schedulers.immediate())
  .observeOn(isAsync ? AndroidSchedulers.mainThread() : Schedulers.immediate())
  .subscribe(
    bean -> {
      // do something on main thread.
    }, 
    throwable -> {
      // do something on main thread. 
    }
  );

前凑

做为一名RxJava的标准菜鸟,我被验证了本身的确很菜,我天真的认为后面的subscribeOn和observeOn会覆盖以前的Scheduler,我理所固然的认为,当isAsync为true的时候,此次api的调用就会在当前线程执行网络访问和结果处理。因而,我被搞疯了。我就看了subscribeOn的源码,以下。.net

public final Observable<T> subscribeOn(Scheduler scheduler) {
  ··· 省略一些于逻辑阅读不重要的代码
  return nest().lift(new OperatorSubscribeOn<T>(scheduler));
}

nest的代码以下,线程

public final Observable<Observable<T>> nest() {
  return just(this);
}

意思就是新建一个Observable,而且只会向订阅者发送一个元素——原来api.getBean(1)得到的Observale<Bean>对象。因此nest操做得到的结果是Observable<Observale<Bean>>对象。好,这里记着这个东东。下面我先分析一下lift操做,而后咱们再回头把他们串在一块儿。code

Observable结构

在看lift操做以前,咱们稍微回顾一下Observable的建立方法,

final OnSubscribe<T> onSubscribe;

public final static <T> Observable<T> create(OnSubscribe<T> f) {
    return new Observable<T>(hook.onCreate(f));
}

protected Observable(OnSubscribe<T> f) {
    this.onSubscribe = f;
}

其余的什么fromjust等建立方法最后都是把数据转化为一个OnSubscribe对象再经过上面的create方法建立。因此咱们只关注这个create方法。上面代码的意思很简单,就是new一个Observable对象,而且设置onSubscribe。因此这里的关键是onSubscribe这个对象。这里我管它作数据源,即Observable对象会用它来产生数据,而且发布给订阅者。

看到这里,咱们能够发现,Observable其实没有什么,只有两个关键点:一、装载着一个onSubscribe对象,二、有订阅者注册时,就调用用这个onSubscribecall(Subscriber)方法。

这里咱们要看一下这个call(Subscriber)方法。该方法接受一个参数Subscriber,即订阅者。当有订阅者注册到Observable对象时,Observable对象就调用onSubscribe的这个call方法,而且把当前当前注册的订阅者做为参数传递过去。因此在call方法的实现中就能够调用订阅者的onNext方法来发布数据或者作其余事(不必定是发布数据)。

lift操做说明

先把lift操做的代码贴出来。

public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
    return new Observable<R>(new OnSubscribe<R>() {
        @Override
        public void call(Subscriber<? super R> o) {
            try {
                Subscriber<? super T> st = hook.onLift(operator).call(o);
                try {
                    // new Subscriber created and being subscribed with so 'onStart' it
                    st.onStart();
                    onSubscribe.call(st);
                } catch (Throwable e) {
                    ··· 省略一些于逻辑阅读不重要的代码
                    st.onError(e);
                }
            } catch (Throwable e) {
                ··· 省略一些于逻辑阅读不重要的代码
                // as we don't have the operator available to us
                o.onError(e);
            }
        }
    });
}

从上面代码第2行的new Observable<R>能够看出,lift操做实际上是新建一个Observable对象而后返回。这里加点高级的标识方便下面的阅读,被new出来的Observable对象咱们叫它作派生Observable,而当前Observable就叫父级Observable。

在上面Observable结构一节中,咱们知道每一个Observable都持有一个onSubscribe对象做为数据源。经过lift方法派生所得的Observable也不例外,也有一个,就是上面代码第二行new OnSubscribe<R>实例化的匿名对象。这个OnSubscribe虽然也是数据源,可是本身却历来不产生数据,也不直接向订阅者直接发布数据。它作的事就只是把本身的订阅者包装成为一个父级Observable承认的订阅者,而后委派给父级的数据源(上面代码第十行)。父级Observable的数据源向本身的订阅者发布数据,就是发送到被包装出来的这个订阅者中来。

小结:到这里为止,要记住很重要的一点,经过lift操做产生的派生Observable对象的数据源(onSubscribe)是不实际产生数据的,它作的事就只是把本身的订阅者包装成为一个父级Observable承认的订阅者,而后委派给父级的数据源

lift操做实例分析

那么被包装出来的这个订阅者是怎么处理父级数据源发布的数据呢?这里就要回到上面代码的第6行。那里经过一个咱们调用lift操做时传进去的Operator把派生Observable承认的Subscriber包装成一个父级Observable承认的Subscriber。

下面我看一个lift操做的例子,用lift模拟了两次map操做。

代码视图1:

class Bean {
    int value;

    Bean(int v) {
        this.value = v;
    }
}

Observable.just(new Bean(1), new Bean(2), new Bean(3), new Bean(4))
    .lift(new Observable.Operator<Integer, Bean>() {
        @Override
        public Subscriber<? super Bean> call(Subscriber<? super Integer> subscriber) {
            return new Subscriber<Bean>(subscriber) {
                @Override
                public void onCompleted() {
                    subscriber.onCompleted();
                }

                @Override
                public void onError(Throwable e) {
                    subscriber.onError(e);
                }

                @Override
                public void onNext(Bean bean) {
                    subscriber.onNext(bean.value);
                }
            };
        }
    })
    .lift(new Observable.Operator<String, Integer>() {
        @Override
        public Subscriber<? super Integer> call(Subscriber<? super String> subscriber) {
            return new Subscriber<Integer>(subscriber) {
                @Override
                public void onCompleted() {
                    subscriber.onCompleted();
                }

                @Override
                public void onError(Throwable e) {
                    subscriber.onError(e);
                }

                @Override
                public void onNext(Integer i) {
                    subscriber.onNext(String.valueOf(i));
                }
            };
        }
    })
    .subscribe(System.out::println);

上面的代码中Observable的链式操做实际上是等价于下面代码形式的,

代码视图2:

Observable<Bean> o1 = Observable.just(new Bean(1), new Bean(2), new Bean(3), new Bean(4));
Observable<Integer> o2 = o1.lift(···);
Observable<String> o3 = o2.lift(···);

Subscriber<String> subscriber3 = new Subscriber<String>() {
        @Override
        public void onCompleted() {}

        @Override
        public void onError(Throwable e) {}

        @Override
        public void onNext(String s) {
            System.out.println(s);
        }
    };

o3.subscribe(subscriber3);

代码视图2中,咱们能够发现这一连串的操做下来,一共产生了3个Observable对象:o一、o二、o3。以前咱们说过每一个Observable对象都会持有一个onSubscribe对象做为数据源,用来向订阅者发布数据。咱们以不一样的标识来区分一下上面三个Observable对象对应的onSubscribe对象:o1 => onSubscribe1, o2 => onSubscribe2, o3 => onSubscribe3。

从上面lift操做说明一节,咱们知道lift操做产生的Observable对象的数据源是不产生数据的,它作的事就只是把本身的订阅者包装成为一个父级Observable承认的订阅者,而后委派给父级的数据源。o3是一个经过lift操做产生的派生Observable,当订阅者subscriber3注册到o3时,o3的数据源onSubscribe3就会把subscriber3包装成一个父级(o2)能够的订阅者(这里命名为subscriber2),而后委派给父级数据源(onSubscribe2)。 如今请回头看代码视图1的第34-49行。这一段代码就显示了把subscriber3包装成为subscriber2的过程。能够发现,被包装出来的subscriber2只作一件事,就是把onSubscribe2发布给本身的数据转化为subscriber3能够消化的数据,而后就交给subscriber3,至关于充当了一个subscriber3和onSubscribe2之间的桥梁。

接着分析,onSubscribe2虽说,经过subscriber2间接把数据发布到了subscriber3。可是实际上,做为数据源,它的持有者o2,也是经过lift操做产生的派生Observable,因此这个onSubscribe2也是不直接产生数据的。它也是把本身的订阅者(subscriber2)包装一个父级(o1)承认的订阅者(这里命名为subscriber1),而后委派给父级数据源(onSubscribe1)。 如今请再回头看代码视图1的第13-28行。这段代码显示了如何把subscriber2包装成为subscriber1的过程。一样,subscriber1也只作一件事,就是把onSubscribe1发布给本身的数据转化为subscriber2能够消化的数据,而后就交给subscriber2,至关于充当了一个subscriber2和onSubscribe1之间的桥梁。

最后,整个过程能够描述为下面的一个示意图,

输入图片说明

subscribeOn与lift

RxJava中,lift操做几乎贯穿了整个Observable,由于差很少全部全部的操做符都是经过lift操做来实现的。好比map操做符,其实就是经过lift操做产生的派生Observable而已。因此这个派生Observable的数据源也就如上面我所述,本身不产生数据,而是把本身的订阅者包装成一个父级承认的订阅者。怎么包装呢?上面的讲述中,这个包装过程实际上是经过咱们调用lift操做时传递的参数Operator来完成的。

咱们再回顾subscribeOn操做符的源码。首先,经过nest操做产生一个Observable<Observable<Bean>>(咱们原来操做的是Observable<Bean>),而后它上面调用lift操做,那么Observable<Observable<Bean>>就是父级了,lift操做最终产生的派生Observable就是整个subscribeOn操做产生的结果。看subscribeOn操做的源码咱们能够发现,它是经过OperatorSubscribeOn这么一个Operator来实现Subscriber的包装。那么咱们来看一下OperatorSubscribeOn的源码,分析一下具体的包装过程。

public class OperatorSubscribeOn<T> implements Operator<T, Observable<T>> {

    private final Scheduler scheduler;

    public OperatorSubscribeOn(Scheduler scheduler) {
        this.scheduler = scheduler;
    }

    @Override
    public Subscriber<? super Observable<T>> call(final Subscriber<? super T> subscriber) {
        final Worker inner = scheduler.createWorker();
        subscriber.add(inner);
        return new Subscriber<Observable<T>>(subscriber) {
        
            ···这里省略了一些于逻辑阅读无关的代码。

            @Override
            public void onNext(final Observable<T> o) {
                inner.schedule(new Action0() {

                    @Override
                    public void call() {
                        final Thread t = Thread.currentThread();
                        o.unsafeSubscribe(new Subscriber<T>(subscriber) {

                            @Override
                            public void onCompleted() {
                                subscriber.onCompleted();
                            }

                            @Override
                            public void onError(Throwable e) {
                                subscriber.onError(e);
                            }

                            @Override
                            public void onNext(T t) {
                                subscriber.onNext(t);
                            }
                            
                            ···这里省略了一些于逻辑阅读无关的代码。

                        });
                    }
                });
            }

        };
    }
}

从上面的分析中,咱们知道subscribeOn操做实际上是先经过nest操做产生一个Observable<Observable<Bean>>对象,再经过lift操做产生派生Observable(即(Observable<Bean>)对象的,因此Observable<Observable<Bean>>对象就是父级。因此OperatorSubscribeOn的职责就是为包装一个Observable<Observable<Bean>>承认的订阅者。被包装出来的订立者接受到父级发布的数据(即一个Observable<Bean>对象)时,它这里没有把数据转换成下级subscriber(即上面代码第10行传递给call方法的参数)可消化的数据,而是经过inner对象来安装一次订阅。

小结,通过subscribeOn操做产生了一个派生Observable<Bean>对象,这个对象的数据源(onSubscribe)的工做是为本身的订阅者在某个线程上安排订阅。

样例分析

代码视图3

Observable.just(new Bean(1), new Bean(2), new Bean(3), new Bean(4))
  .subscribeOn(Schedulers.io())
  .subscribeOn(Schedulers.immediate())
  .subscribe(bean -> System.out.println(bean.value));

看过subscribeOn的源码以后,咱们应该知道上面的代码几乎等价于下面的写法,

代码视图4

Observable<Bean> o1 = Observable.just(new Bean(1), new Bean(2), new Bean(3), new Bean(4));
Observable<Observable<Bean>> o2 = o1.nest();
Observable<Bean> o3 = o2.lift(new OperatorSubscribeOn<Bean>(Schedulers.io()));
Observable<Observable<Bean>> o4 = o3.nest();
Observable<Bean> o5 = o4.lift(new OperatorSubscribeOn<Bean>(Schedulers.immediate()));

Subscriber<Bean> subscriber4 = new Subscriber<Bean>() {
        @Override
        public void onCompleted() {}

        @Override
        public void onError(Throwable e) {}

        @Override
        public void onNext(Bean s) {
            System.out.println(s.value);
        }
    };

o5.subscribe(subscriber5);

上面代码的执行过程,能够表示成以下示意图,

输入图片说明

经过上面示意图能够看出,最后整个整个订阅过程的运行线程是 currentThread -> immediateThread -> ioThread。

声名

By 啪嗒科技 AtanL(atanl@padakeji.com)

©啪嗒科技版本全部,没有经做者容许,只能发表于padakeji.com相关域名下。