Open vvLavida opened 8 years ago
核心概念
observeOn()
return observableFactory.<T>toObservable(this) .compose(this.<T>transform(observableRequest)) .observeOn(Schedulers.io()) .map(new ResponseMetadataOperator<>(this)) .flatMap(this::mapResponse) .observeOn(AndroidSchedulers.mainThread()) .<AirResponse<T>>compose(group.transform(tag)) .doOnError(new ErrorLoggingAction(request)) .doOnError(NetworkUtil::checkForExpiredToken) .subscribe(request.observer());
这是我们的应用中创建 RxJava observable 流的时候的一段代码。我们调用 observeOn 两次,看起来好像无意义。实际上,你每次调用 observeOn,后面的代码都会运行在那个 scheduler 上,然后你之后又调用一遍,它就又切换一次。
当我们使用 RxJava 的时候,你创建了一个流。一个关于 RxJava 的误解就是它是异步的,但是事实上每件事情都是默认同步的。当你创建一个流的时候,你仅仅是创建了一个点,这里我们会向它订阅。当你订阅的时候,你把所有的东西都才创建在一起了,然后才能执行它。在你调用 subscribe 之前,你仅仅是创建了一个流。这比较类似声明的流程。当你说 observeOn,你切换到另外一个线程。如果你不调用 observeOn,每件事都还是在原来那个需要订阅给 observable 的线程里。这里我们有 subscribe,所以如果这是从主线程中调用的话,主线程里的所有事情都会发生而不论你做了些什么。所以 observeOn 是一个有效的调用其他线程工作的方法,而且它会使过程异步化。
我们第一次调用 observeOn 的时候,我们传入了一个 scheduler。RxJava 有一些内嵌的 scheduler,其中一个就是 I/O scheduler,这当然是和 I/O 线程工作在一起的,I/O 线程是一个和你的 I/O 绑定的线程池。map 和 flatMap 操作符在那个线程里面执行,然后当它结束的时候,我们把它发送回到主线程。所以,你正在主线程里面工作,假设这是从主线程里面调用的,然后加载到后台线程, 最后把它移回到主线程。
如果你不使用 RxJava,这会是个非常复杂的事情。然而,现在我们有这么简单的描述性的方法来实现你想做的事情。这也是为什么 RxJava 会很复杂的原因:这么少的代码,但是却要花很长的时间来真正理解里面发生了些什么。
subscribeOn()
return observableFactory.<T>toObservable(this) .compose(this.<T>transform(observableRequest)) .observeOn(Schedulers.io()) .map(new ResponseMetadataOperator<>(this)) .flatMap(this::mapResponse) .observeOn(AndroidSchedulers.mainThread()) .<AirResponse<T>>compose(group.transform(tag)) .doOnError(new ErrorLoggingAction(request)) .doOnError(NetworkUtil::checkForExpiredToken) .subscribeOn(Schedulers.io()) .subscribe(request.observer());
第一个调用,observableFactory.
错误处理
return observableFactory.<T>toObservable(this) .compose(this.<T>transform(observableRequest)) .observeOn(Schedulers.io()) .map(new ResponseMetadataOperator<>(this)) .flatMap(this::mapResponse) .observeOn(AndroidSchedulers.mainThread()) .<AirResponse<T>>compose(group.transform(tag)) .doOnError(new ErrorLoggingAction(request)) .doOnError(NetworkUtil::checkForExpiredToken) .subscribeOn(Schedulers.io()) .subscribe(request.observer());
我们使用 doOnError 作为错误日志的一个方法。你的网络出现异常了,然后你想给你的分析服务注入日志,你想知道这种情况发生了多少次。doOnError 是个每次你在流上出现错误都会被执行的动作,然后你会有多次调用,所以你有对于一个流的多次错误处理。当它看见一个错误事件的时候,它就会调用它的方法,但是这是个副作用。
return observableRequest .rawRequest() .<Observable<Response<T>>>newCall() .observeOn(Schedulers.io()) .unsubscribeOn(Schedulers.io()) .flatMap(responseMapper(airRequest)) .onErrorResumeNext(errorMapper(airRequest));
另一个可以被使用的结构是 onErrorResumeNext,这个工作起来像是个 catch 块,这在 reactive 世界里就是个没有意义的事情。这就好像你再说,“Hey,当我看到一个错误的时候,我想运行这个动作来扑捉这个错误,然后继续执行,然后打包那个异常,写个日志,然后返回个空的数据集合或者其他什么东西。”如果你还是 imperative 思维,这就像个 catch 块。
单元测试
给同步的流数据做单元测试听起来很复杂,所以 RxJava 提供了这个漂亮的类叫做 TestSubscriber。
@Test public void testErrorResponseNonJSON() { server.enqueue(new MockResponse() .setBody("something bad happened") .setResponseCode(500)); TestRequest request = new TestRequest.Builder<String>().build(); TestSubscriber<AirResponse<String>> subscriber = new TestSubscriber<>(); observableFactory.<String>toObservable(request).subscribe(subscriber); subscriber.awaitTerminalEvent(3L, TimeUnit.SECONDS); NetworkException exception = (NetworkException) subscriber.getOnErrorEvents().get(0); assertThat(exception.errorResponse(), equalTo(null)); assertThat(exception.bodyString(), equalTo("something bad happened")); }
你可以使用 TestSubscriber 来订阅你的流,然后你可以阻塞它,直到它获得了一个事件。有一些简便的方法,例如 .awaitTerminalEvent,这也会阻塞你的线程直到一个终端事件(例如:onCompleted 或者 onError)。对于你的流中的每个事件,你可以得到 0 次到 n 次的 onNext 事件,然后当它结束的时候,你获得 onCompleted 或者它失败了,你获得 onError,之后你再也收不到任何时间了,流也结束了。
@Test public void testUnicodeHeader() { server.enqueue(new MockResponse().setBody("\"Hello World\"")); TestRequest request = new TestRequest.Builder<String>() .header("Bogus", "中華電信") .build(); observableFactory.toObservable(request) .toBlocking() .first(); RecordedRequest recordedRequest = server.takeRequest(); assertThat(recordedRequest.getHeader("Bogus"), equalTo("????")); }
另一件事是你可以使用 toBlocking。这会立即阻塞线程,这在单元测试里面十分有用。当然,作为产品代码用处不大。如果你使用 RxJava,你不太可能会阻塞你自己线程,但是在测试的时候就非常方便了。这会比使用测试 subscriber 代码量少点。如果你知道不会失败,你可以直接阻塞然后获得第一个事件。
内存泄漏
当你向一个流订阅的时候,你得到了一个 subscription。当你得到这个 subscription 之后,你可以注销它,所以你需要显示地释放资源。你不在需要引用那个流了。我们都知道发起请求的重要性,例如,从安卓的 activity 或者 fragment 中发起请求。你不要忘记了,你想在 acitivity 销毁的时候释放这些资源,这是一个常见的模式。
`private final CompositeSubscription pendingSubscriptions = new CompositeSubscription();
@Override public void onCreate() { pendingSubscriptions.add( observable.subscribe(observer)); }
@Override public void onDestroy() { pendingSubscriptions.clear(); }`
你可以使用 CompositeSubscription,这是个能够集合多个订阅的类,而且你给它增加一个订阅。然后,一旦你销毁 activity,你就能清除它了。
The introduction to Reactive Programming you've been missing
RxJava 是 ReactiveX 的一部分,一组开源库。它们有许多不同的库,包括 JavaScript, Groovy, Ruby, Java, C#,以及其他。然而,它们都有着同样的概念,这就是 functional 编程。
ReactiveX是Reactive Extensions的缩写,一般简写为Rx。 ReactiveX 是一个使用可观察数据流进行异步编程的编程接口,ReactiveX结合了观察者模式、迭代器模式和函数式编程的精华
为什么用RxJava
我们都知道移动开发是困难的。移动用户期望即时响应,而且还有在不同的线程间来回切换的需求。除了主线程,你还要做网络连接,同时你还需要在后台处理其他的各种不同的事情。最重要的是,你不能阻塞 UI 线程。
RxJava 是解决这类问题的好方法,因为他能够使得线程间的切换比较容易。这已经集成在框架里面了。异步操作非常笨重而且容易出错,RxJava 使得你不用再这样做了,这也是你能把不同的线程组合在一起的原因。
我觉得 imperative 编程是我们不应该采用的方法。当然,面向对象编程已经流行很多年了。它已经深入到了现代程序员的骨髓里了。每个人都盲目的使用它,但是它不是我们开发软件的必需品。
Functional 编程是 RxJava 里面的概念,而且我觉得用这种方法,代码更加健壮,而且永远不需要维护状态了。代码更加可靠而且你知道它一定工作。