MicroKibaco / CrazyDailyQuestion

每日一问: 水滴石穿,聚沙成塔,坚持数月, 必有收获~
37 stars 1 forks source link

2019-8-26: 说说RxJava工作原理和优势(源码角度)? #22

Open MicroKibaco opened 5 years ago

MicroKibaco commented 5 years ago

引言

  RxJava已经成为了Android开发必备的技能,鉴于大部分敏捷开发团队,掌握RxJavaRetroft快速开发网络框架,能大幅度减少网络框架重构时间。

  有大量文章教程在写RxJava如何使用,如扔物线的RxJava详解,还有南尘的RxJava2.x 教程,那我为什么写这多余的文章呢?

  本文不是一个指导如何使用RxJava特性和基本的探索,而是从更高的角度上挖掘它,去了解代码库是怎么做的,内部工作原理又是怎样的,相比其他网络请求框架有什么优势?

概述

  RxJava 在 GitHub 主页上的简介是 "Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM."

  翻译过来就是:活性扩展JVM库编写异步和基于事件的程序使用Java VM可观察序列。
  很费解的一句话,你可以这么理解,压缩版能优简化代码的异步请求库。

使用:

1. 添加RxJava ,Retrofit ,Rxandroid 相关依赖

    implementation 'com.squareup.retrofit2:retrofit:2.5.0'
    implementation 'com.squareup.retrofit2:converter-gson:2.5.0'
    implementation 'com.squareup.retrofit2:adapter-rxjava2:2.5.0'
    implementation 'io.reactivex.rxjava2:rxandroid:2.1.1'
    implementation 'io.reactivex.rxjava2:rxjava:2.2.8'

2. 添加网络权限

<uses-permission android:name="android.permission.INTERNET"/>

3. 创建实体类

  访问 小木箱 github 仓库,通过get请求得到了以下报文:

  然后,通过 Gsonformat 得到相关实体类对象:

 class MicroKibacoRepo {

    private int id;
    private String node_id;
    private String name;
    private String full_name;
   // ---为了避免浪费篇幅,省略无用代码--- 
}

4. 创建⼀个 Single interface 作为 Web Service 的请求集合,在⾥⾯⽤注解(Annotation)写⼊需要配置的请求方法

public interface Api {
    @GET("users/{username}/repos")
    Single<List<MicroKibacoRepo>> listRepos(@Path("username") String user);
}

5. 在正式代码里⽤Retrofit 创建出interface 的实例

 Retrofit retrofit = new Retrofit.Builder()
                .baseUrl("https://api.github.com/")
                .addConverterFactory(GsonConverterFactory.create(gson))
                .addCallAdapterFactory(RxJava2CallAdapterFactory.create())
                .build();

Api api = retrofit.create(Api.class);

6. 通过 observeOn,observeOnsubscribe 等订阅事件切换线程达到网络请求效果。

api.listRepos("MicroKibaco")
                .subscribeOn(Schedulers.io()) // 切换网络请求
                .observeOn(AndroidSchedulers.mainThread()) // 切换到UI线程
                .subscribe(new SingleObserver<List<MicroKibacoRepo>>() {
                    @Override
                    public void onSubscribe(Disposable d) {
// 订阅者模式,我向你订阅事件
                        mDisposable = d;
                    }

                    @Override
                    public void onSuccess(List<MicroKibacoRepo> microKibacoRepos) {
                        text_view.setText(microKibacoRepos.get(0).getName());
                    }

                    @Override
                    public void onError(Throwable e) {
                        String msg = e.getMessage();

                        if (msg == null) {

                            msg = e.getClass().getName();

                        }

                        text_view.setText(msg);

                    }
                });

框架架构

   一个简单的RxJava网络处理库就做好了,下面我们来分析一下,具体Api的底层实现:    RxJava的整体结构是一条链,其中:

  1. 链的最上游: 生产者Observable
  2. 链的最下游: 观察者Observer
  3. 链的中间: 各个中介点,既是下游 的Observable,又是上游的Observer

Single

  我们访问 Singlejust 方法发现: 里面有一个关键的钩子方法: onAssembly

public static <T> Single<T> just(final T item) {
        ObjectHelper.requireNonNull(item, "item is null");
        // 钩子方法
        return RxJavaPlugins.onAssembly(new SingleJust<T>(item));
    }

        public static <T> Single<T> onAssembly(@NonNull Single<T> source) {

         // 查询静态对象,是否存在
         Function<? super Single, ? extends Single> f = onSingleAssembly;
            // 如果存在,额外处理
            if (f != null) {
                return apply(f, source);
            }

             // 如果不存在,直接返回
            return source;
        }

  整个Single.just内部其实创建了一个SingleJust,SingleJust里面有一个关键方法: subscribeActualsubscribeActual 有一个比较重要的抽象方法onSubscribeonSuccess


public final class SingleJust<T> extends Single<T> {

    final T value;

    public SingleJust(T value) {
        this.value = value;
    }

    @Override
    protected void subscribeActual(SingleObserver<? super T> observer) {
        observer.onSubscribe(Disposables.disposed());
        observer.onSuccess(value);
    }

}

   整个方法做的事情就是优先执行onSubscribe,后执行onSuccess


// just 方法创建了一个上层被观察的对象
 Single<String> single = Single.just("1");
        single = single.subscribeOn(Schedulers.io());
        single = single.subscribeOn(AndroidSchedulers.mainThread());

 // subscribe 把观察者传进来
        single.subscribe(new SingleObserver<String>() {

 // subscribe 内部会调用 subscribeActual       
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onSuccess(String s) {
                text_view.setText(s);
            }

            @Override
            public void onError(Throwable e) {
                String msg = e.getMessage();

                if (msg == null) {

                    msg = e.getClass().getName();

                }

                text_view.setText(msg);
            }
        });

   而 Disposabledisposed 方法其实是:在我不需要订阅关系的时候,我们切断这种关系。

操作符 Operator( map() 等等)

  1. 基于原 Observable 创建一个新的 Observable
  2. Observable 内部创建一个 Observer
  3. 通过定制 Observable 的 subscribeActual() 方法和 Observer 的 onXxx()方法,来实现自己的中介角色(例如:数据切换,线程切换)

Disposable

可以通过 dispose() 方法来让上游停止工作,达到 [丢弃] 的效果

subscribeOn()

原理: 在 Scheduler 指定的线程里启动 subscribe()

效果

observeOn()

原理: 在内部创建的 Observer 的 onNext() onError() onSuccess() 等回调方法里面,通过 Scheduler 指定的线程 来调用下级 Observer 的 对应回调方法

效果

Scheduler 的原理

  1. Schedulers.newThread() 和 Schedulers.io():

    • 当 scheduleDirect() 被调用的时候,会创建一个 Worker,这个 Worker 的内部会有一个 Executor,由 一个 Executor 来完成实际线程切换
    • scheduleDirect() 还会创建出一个 Disposable 对象,交给外侧 Observer,让它 能执行 dispose() 操作,取消订阅链
    • newThread() 和 io() 的区别在于,io() 可能会对 Executor 进行重写
  2. AndroidSchedulers.mainThread(): 通过内部的 Handle 把任务放到主线程去做