Open bingoogolapple opened 6 years ago
private PublishSubject<String> mKeywordPs;
private void optimizeSearch() {
mKeywordPs = PublishSubject.create();
mKeywordSv.setOnQueryTextListener(new SearchView.OnQueryTextListener() {
@Override
public boolean onQueryTextSubmit(String query) {
return false;
}
@Override
public boolean onQueryTextChange(String keyword) {
mKeywordPs.onNext(keyword);
return true;
}
});
mKeywordPs.debounce(400, TimeUnit.MILLISECONDS) // debounce 默认是在 computation 线程的。发送一个延时消息给下游,如果在这段延时时间内没有收到新的请求,那么下游就会收到该消息;而如果在这段延时时间内收到来新的请求,那么就会取消之前的消息,并重新发送一个新的延时消息
.observeOn(AndroidSchedulers.mainThread()) // 这里手动将后续操作符切换到主线程,否则 filter 也是在 computation 线程的
.filter(keyword -> { // 只有返回 true 时,才会将事件发送给下游,否则就丢弃该事件
if (StringUtil.isNotEmpty(keyword)) {
return true;
} else {
mResultTv.setText("清空了关键字");
return false;
}
})
.switchMap(keyword -> getSearchObservable(keyword)) // 将上游 Observable 发送的数据集合变换为 Observable 集合,然后只发射这些 Observable 最近发射的数据「在该节点收到一个新的事件之后,如果之前收到的事件所产生的 Observable A 还没有发送事件给下游,那么下游就再也不会收到 Observable A」
.observeOn(AndroidSchedulers.mainThread())
.compose(bindToLifecycle())
.doOnError(throwable -> {
mResultTv.setText("错误:" + throwable.getMessage()); // 将 Observer 的 onError 中的错误处理放到 doOnError 中处理
})
.retryWhen(throwableObservable -> throwableObservable) // 处理 onError 时重订阅,避免发生一次错误后就再也搜索不到结果。Observer 的 onError 将不会再被回调。如果不做延时操作,可以直接用 retry 操作符
.filter(result -> StringUtil.isNotEmpty(mKeywordSv.getQuery())) // 避免返回结果时,如果当前搜索框关键字为空则忽略此次搜索结果
.subscribe(result -> {
Logger.d("onNext:" + result);
mResultTv.setText(result);
});
}
private Observable<String> getSearchObservable(final String keyword) {
return Observable.create((ObservableOnSubscribe<String>) emitter -> {
try {
if (StringUtil.isEqual("q", keyword)) { // 搜索 q 时延时 3 秒返回结果
Thread.sleep(3000);
} else if (StringUtil.isEqual("e", keyword)) { // 搜索 q 时延时 3 秒返回网络异常
Thread.sleep(3000);
emitter.onError(new Exception("网络异常"));
} else { // 搜索其他关键字时延时 1 秒返回结果
Thread.sleep(1000);
}
if (!emitter.isDisposed()) {
emitter.onNext("关键词为:" + keyword);
emitter.onComplete();
}
} catch (Exception e) {
if (!emitter.isDisposed()) {
emitter.onError(e);
}
}
}).subscribeOn(Schedulers.io());
}
public class Engine {
public static final String BASE_URL = "http://192.168.31.152:8080/";
private RxJavaApi mRxJavaApi;
private Engine() {
boolean isBuildDebug = AppManager.getInstance().isBuildDebug();
HttpLoggingInterceptor.Level logLevel = isBuildDebug ? HttpLoggingInterceptor.Level.BODY : HttpLoggingInterceptor.Level.NONE;
OkHttpClient client = new OkHttpClient().newBuilder()
.addInterceptor(new HttpLoggingInterceptor().setLevel(logLevel))
.addInterceptor(new HeaderInterceptor())
.connectTimeout(10000, TimeUnit.MILLISECONDS)
.readTimeout(10000, TimeUnit.MILLISECONDS)
.writeTimeout(10000, TimeUnit.MILLISECONDS)
.build();
Retrofit retrofit = new Retrofit.Builder()
.baseUrl(BASE_URL)
.addCallAdapterFactory(RxJava2CallAdapterFactory.createWithScheduler(Schedulers.io())) // 指定在 io 线程进行网络请求
.addConverterFactory(GsonConverterFactory.create())
.client(client)
.build();
mRxJavaApi = retrofit.create(RxJavaApi.class);
}
private static class SingletonHolder {
private static final Engine INSTANCE = new Engine();
}
public static Engine getInstance() {
return Engine.SingletonHolder.INSTANCE;
}
public static RxJavaApi getRxJavaApi() {
return getInstance().mRxJavaApi;
}
}
public interface RxJavaApi {
// 查询博客列表
@GET("api/blogs")
Observable<NetResult<List<Blog>>> findBlogList(@Query("keyword") String keyword);
// 查询分类列表
@GET("api/categorys")
Observable<NetResult<List<Category>>> getCategoryList();
}
mKeywordSv.setOnQueryTextListener(new SearchView.OnQueryTextListener() {
@Override
public boolean onQueryTextSubmit(String query) {
return false;
}
@Override
public boolean onQueryTextChange(String keyword) {
mKeywordPs.onNext(keyword);
return true;
}
});
private PublishSubject<String> mKeywordPs;
private void initSearch() {
mKeywordPs = PublishSubject.create();
mKeywordPs.debounce(400, TimeUnit.MILLISECONDS) // debounce 默认是在 computation 线程的。发送一个延时消息给下游,如果在这段延时时间内没有收到新的请求,那么下游就会收到该消息;而如果在这段延时时间内收到来新的请求,那么就会取消之前的消息,并重新发送一个新的延时消息
.observeOn(AndroidSchedulers.mainThread()) // 这里手动将后续操作符切换到主线程,否则 filter 也是在 computation 线程的
.filter(keyword -> { // 只有返回 true 时,才会将事件发送给下游,否则就丢弃该事件
if (StringUtil.isNotEmpty(keyword)) {
return true;
} else {
mBlogAdapter.clear();
return false;
}
})
.switchMap(keyword -> { // 将上游 Observable 发送的数据集合变换为 Observable 集合,然后只发射这些 Observable 最近发射的数据「在该节点收到一个新的事件之后,如果之前收到的事件所产生的 Observable A 还没有发送事件给下游,那么下游就再也不会收到 Observable A」
/**
* combineLatest 和 zip 类似,都是组合两个 Observable 的数据为新的 Observable
* zip 当原始 Observable 中每一个都发射了一条数据时才发射数据
* combineLatest 当原始 Observable 中任何一个发射了一条数据时发射数据
*/
return Observable.combineLatest(
getCategoryListObservable(),
mBlogApi.findBlogList(keyword).flatMap(netResult -> Observable.just(netResult.data)),
(blogCategoryList, blogList) -> {
convertToCategoryArray(blogCategoryList);
return blogList;
}
);
})
.observeOn(AndroidSchedulers.mainThread())
.compose(bindToLifecycle())
.doOnError(throwable -> {
throwable.printStackTrace();
Logger.d("错误:" + throwable.getMessage()); // 将 Observer 的 onError 中的错误处理放到 doOnError 中处理
})
.retryWhen(throwableObservable -> throwableObservable) // 处理 onError 时重订阅,避免发生一次错误后就再也搜索不到结果。Observer 的 onError 将不会再被回调
.filter(result -> StringUtil.isNotEmpty(mKeywordSv.getQuery())) // 避免返回结果时,如果当前搜索框关键字为空则忽略此次搜索结果
.subscribe(result -> {
Logger.d("查询成功");
mBlogAdapter.setData(result);
});
}
// 获取发送分类列表的被观察者
private Observable<List<Category>> getCategoryListObservable() {
// switchIfEmpty 如果原始 Observable 正常终止后仍然没有发射任何数据,就使用备用的 Observable
return getCategoryListFromCache()
.switchIfEmpty(mBlogApi.getCategoryList().flatMap(netResult -> Observable.just(netResult.data)))
.doOnNext(categoryList -> convertToCategoryArray(categoryList));
// concat 操作符是接收若干个 Observables,发射数据是有序的,不会交叉。concat + takeUntil 实现二级缓存
// return Observable.concat(
// getCategoryListFromCache(),
// mBlogApi.getCategoryList().flatMap(netResult -> Observable.just(netResult.data))
// ).takeUntil(categoryList -> categoryList != null)
// .doOnNext(categoryList -> convertToCategoryArray(categoryList));
}
// 从缓存中获取分类列表
private Observable<List<Category>> getCategoryListFromCache() {
// defer 只有当订阅者订阅时才创建 Observable,为每个订阅者创建一个新的 Observable。内部通过 ObservableDefer 在订阅时调用 Callable 的 call 方法创建 Observable
return Observable.defer(() -> {
Logger.d("defer");
return CollectionUtil.isEmpty(mCategoryList) ? Observable.empty() : Observable.just(mCategoryList);
});
}
public interface RxJavaApi {
// 添加博客
@POST("api/blogs")
Observable<NetResult<Blog>> addBlog(@Body Blog blog);
// 获取文件上传 token
@GET("api/file/token")
Observable<NetResult<UploadToken>> getUploadToken();
// 上传文件
@POST("api/file/upload")
Observable<NetResult<String>> upload(@Body RequestBody requestBody);
}
private String mCoverFilePath = "/sdcard/avatar.png";
private void addBlog() {
final Blog blog = new Blog();
blog.setCategoryId(1L);
blog.setTitle("Token + 图片上传 + 错误重试");
blog.setContent("我是内容");
Observable.defer(() -> UploadManager.getInstance().getUploadObservable(mCoverFilePath))
.switchMap(filePath -> {
mCoverFilePath = filePath;
blog.setCover(mCoverFilePath);
return Engine.getRxJavaApi().addBlog(blog);
}).compose(RxUtil.handleResultThreadLifecycleRetry(this))
.subscribe(result -> Logger.d("添加博客成功"), throwable -> Logger.d("添加博客失败"));
}
public class UploadManager {
private AtomicBoolean mRefreshing = new AtomicBoolean(false);
private static final String SP_KEY_TOKEN = "SP_KEY_TOKEN";
private static final String SP_KEY_EXPIRE_TIME = "SP_KEY_EXPIRE_TIME";
private UploadManager() {
}
private static class SingletonHolder {
private static final UploadManager INSTANCE = new UploadManager();
}
public static UploadManager getInstance() {
return UploadManager.SingletonHolder.INSTANCE;
}
public static String getToken() {
return SPUtil.getString(SP_KEY_TOKEN);
}
public static long getExpireTime() {
return SPUtil.getLong(SP_KEY_EXPIRE_TIME);
}
// 获取文件上传 Token 的 Observable
public Observable getImageTokenObservable() {
if (mRefreshing.compareAndSet(false, true)) {
Logger.d("没有请求,发起一次新的 Token 请求");
return Engine.getRxJavaApi()
.getUploadToken()
.compose(RxUtil.handleResult())
.doOnNext(uploadToken -> {
SPUtil.putString(SP_KEY_TOKEN, uploadToken.getToken());
SPUtil.putLong(SP_KEY_EXPIRE_TIME, uploadToken.getExpireTime());
mRefreshing.set(false);
})
.doOnError(throwable -> mRefreshing.set(false));
} else {
Logger.d("已经有 Token 请求,延迟 5 秒重试");
return Observable.timer(5000, TimeUnit.MILLISECONDS);
}
}
// 获取文件上传的 Observable
public Observable<String> getUploadObservable(String filePath) {
if (filePath != null && !filePath.startsWith("http://") && !filePath.startsWith("https://")) {
File file = new File(filePath);
RequestBody body = new MultipartBody.Builder().setType(MultipartBody.FORM)
.addFormDataPart("file", file.getName(), RequestBody.create(MediaType.parse("image/*"), file))
.build();
return Engine.getRxJavaApi().upload(body)
.compose(RxUtil.handleResult())
.map(fileName -> Engine.BASE_URL + "api/file/browse/" + fileName);
} else {
return Observable.just(filePath == null ? "" : filePath);
}
}
}
public class RxUtil {
private RxUtil() {
}
// 处理结果
public static <T> ObservableTransformer<NetResult<T>, T> handleResult() {
return observable -> observable.flatMap(result -> {
if (result.code == 0) {
return Observable.just(result.data);
} else {
return Observable.error(new ApiException(result.msg, result.code));
}
});
}
// 处理结果、主线程、生命周期绑定
public static <T> ObservableTransformer<NetResult<T>, T> handleResultMainThreadLifecycle(LifecycleProvider lifecycleProvider) {
return observable -> observable.compose(handleResult())
.observeOn(AndroidSchedulers.mainThread())
.compose(lifecycleProvider.bindToLifecycle());
}
// 处理结果、主线程、生命周期绑定、错误重试
public static <T> ObservableTransformer<NetResult<T>, T> handleResultThreadLifecycleRetry(LifecycleProvider lifecycleProvider) {
return observable -> observable.compose(handleResult())
.observeOn(AndroidSchedulers.mainThread())
.retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
private int mRetryCount;
@Override
public ObservableSource<?> apply(Observable<Throwable> throwableObservable) throws Exception {
return throwableObservable.flatMap(throwable -> {
if (throwable instanceof IOException || throwable instanceof SocketException) { // 网络异常重试3次
mRetryCount++;
if (mRetryCount > 3) {
Logger.d("错误超过3次");
return Observable.error(throwable);
} else {
Logger.d("错误" + mRetryCount + "次");
return Observable.timer(mRetryCount * 1000, TimeUnit.MILLISECONDS);
}
} else if (throwable instanceof ApiException) {
if (((ApiException) throwable).getCode() == 401) {
return UploadManager.getInstance().getImageTokenObservable();
}
return Observable.error(throwable);
} else { // 未知异常直接返回发送 error 的 Observable
Logger.d("未知异常");
throwable.printStackTrace();
return Observable.error(throwable);
}
});
}
})
.compose(lifecycleProvider.bindToLifecycle());
}
}
public static <T> ObservableTransformer<NetResult<T>, T> handleResultOrigin() {
return new ObservableTransformer<NetResult<T>, T>() {
@Override
public Observable<T> apply(Observable<NetResult<T>> observable) {
return observable.flatMap(new Function<NetResult<T>, Observable<T>>() {
@Override
public Observable<T> apply(NetResult<T> result) throws Exception {
if (result.code == 0) {
return Observable.just(result.data);
} else {
return Observable.error(new ApiException(result.msg, result.code));
}
}
});
}
};
}
private void customOperator() {
Engine.getRxJavaApi().getUploadToken()
.delay(2000, TimeUnit.MILLISECONDS)
.lift(new NetResultOperator())
.doOnSubscribe(disposable -> showLoadingDialog("正在获取Token..."))
.subscribeOn(AndroidSchedulers.mainThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(uploadToken -> {
dismissLoadingDialog();
Logger.d("获取成功:" + GsonUtil.toJson(uploadToken));
}, throwable -> {
dismissLoadingDialog();
Logger.d("获取文件上传 Token 失败" + throwable.getMessage());
});
}
private class NetResultOperator implements ObservableOperator<UploadToken, NetResult<UploadToken>> {
@Override
public Observer<? super NetResult<UploadToken>> apply(Observer<? super UploadToken> observer) throws Exception {
return new Observer<NetResult<UploadToken>>() {
private Disposable mDisposable;
@Override
public void onSubscribe(Disposable disposable) {
mDisposable = disposable;
observer.onSubscribe(mDisposable);
}
@Override
public void onNext(NetResult<UploadToken> netResult) {
if (!mDisposable.isDisposed()) {
observer.onNext(netResult.data);
}
}
@Override
public void onError(Throwable e) {
if (!mDisposable.isDisposed()) {
observer.onError(e);
}
}
@Override
public void onComplete() {
if (!mDisposable.isDisposed()) {
observer.onComplete();
}
}
};
}
}
// 初始化 RxJava 错误处理器
private void initRxJavaErrorHandler() {
RxJavaPlugins.setErrorHandler(e -> {
if (e instanceof UndeliverableException) {
e = e.getCause();
}
if ((e instanceof IOException) || (e instanceof SocketException)) { // 没事,无关紧要的网络问题或 API 在取消时抛出的异常
return;
}
if (e instanceof InterruptedException) { // 没事,一些阻塞代码被 dispose 调用中断
return;
}
if ((e instanceof NullPointerException) || (e instanceof IllegalArgumentException)) { // 这可能是程序的一个bug
Thread.currentThread().getUncaughtExceptionHandler().uncaughtException(Thread.currentThread(), e);
return;
}
if (e instanceof IllegalStateException) { // 这是 RxJava 或自定义操作符的一个 bug
Thread.currentThread().getUncaughtExceptionHandler().uncaughtException(Thread.currentThread(), e);
return;
}
Logger.w("Undeliverable exception received, not sure what to do");
e.printStackTrace();
});
}
RxJava2
RxJava2 学习资料推荐
Rxjava2.0新特性
HelloWorld