ReactiveX / RxJava

RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.
Apache License 2.0
47.91k stars 7.6k forks source link

Some operators sometimes wait a long time #6118

Closed yvone1991 closed 6 years ago

yvone1991 commented 6 years ago

【version】

    api 'io.reactivex.rxjava2:rxjava:2.x.y'
    api 'io.reactivex.rxjava2:rxandroid:2.0.1'
    api 'com.trello.rxlifecycle2:rxlifecycle-components:2.2.1'
    api 'com.jakewharton.rxbinding2:rxbinding:2.0.0'
    api 'com.squareup.retrofit2:retrofit:2.3.0'
    api 'com.squareup.retrofit2:adapter-rxjava2:2.3.0'
    api 'com.squareup.retrofit2:converter-gson:2.3.0'
    api 'com.squareup.okhttp3:logging-interceptor:3.8.1'

【question】 I integrate rxjava2 operations in the parent class of activity, but sometimes the operator has to wait a long time to get into the method. Is it caused by rxjava2? Using rxjava1 does not appear to be a problem...

【question-ex】

---------Sometimes the duration will more than 2 min!!!!!!!!!!!!!!!!!!!!!!!!!!!!-------

import android.os.SystemClock;

import com.centerm.selforderfork9fnp.R;
import com.centerm.selforderfork9fnp.base.activity.BaseActivity;
import com.centerm.selforderfork9fnp.tools.Logger;

import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.functions.Consumer;
import io.reactivex.schedulers.Schedulers;

/**
 * author : koge_yvone
 * date : 2018/7/31
 * desc : 描述
 * version : 1.0
 */

public class DemoActivity extends BaseActivity {
    private Long obStartTime;
    private Long obCallbackTime;

    private Long newObStartTime;
    private Long newObCallbackTime;

    @Override
    protected boolean isNeedLoadDataThread() {
        return true;//start rxjava2
    }

    @Override
    protected boolean isHideStatusBar() {
        return true;
    }

    @Override
    protected void onBindMainLayoutId() {
        setContentView(R.layout.activity_launcher);
    }

    @Override
    protected void onInitView() {
        //todo  findViewById
        obStartTime = SystemClock.currentThreadTimeMillis();
        Logger.d("create time on======>" + obStartTime);
    }

    @Override
    protected void onLoadData() {
        //todo do something on thread(io(),newThread()... and so on)
        obCallbackTime = SystemClock.currentThreadTimeMillis();
        Logger.d("end time on======>" + obCallbackTime);
        Logger.d("duration======>" + (obCallbackTime - obStartTime));
    }

    @Override
    protected void onYourThings() {
        testNewOb();
    }

    private void testNewOb() {
        newObStartTime = SystemClock.currentThreadTimeMillis();
        Logger.d("new Ob create time on======>" + newObStartTime);
        Observable observable = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                newObCallbackTime = SystemClock.currentThreadTimeMillis();
                Logger.d("new Ob end time on======>" + newObCallbackTime);
                Logger.d("new Ob duration======>" + (newObCallbackTime - newObStartTime));
                int i = 0;
                while (i < 100) {
                    i++;
                    SystemClock.sleep(100);
                }
                e.onNext(i);
                e.onComplete();
            }
        }).subscribeOn(Schedulers.io());
        send(observable, new Consumer() {
            @Override
            public void accept(Object o) throws Exception {

                //todo refreshView();
            }
        });

    }

}

【code-BaseActivity】

/**
 * author : koge_yvone
 * date : 2017/12/26
 * desc : 基础类activity
 * version : 1.0
 */

public abstract class BaseActivity extends RxAppCompatActivity implements BaseView {
    private LoadingDialog dialog;
    public static final int FLAG_HOMEKEY_DISPATCHED = 0x80000000; //需要自己定义标志
    private CompositeDisposable mCompositeDisposable = new CompositeDisposable();

    /**
     * @return 是否开启状态栏隐藏功能
     */
    protected abstract boolean isHideStatusBar();

    /**
     * 绑定布局
     */
    protected abstract void onBindMainLayoutId();

    /**
     * 初始化你的页面-【主线程】
     */
    protected abstract void onInitView();

    /**
     * 初始化你需要耗时的事情,这里是不会影响界面阻塞的,不要在这里做UI刷新的事情-【io线程】
     */
    protected abstract void onLoadData();

    /**
     * 耗时结束,界面刷新后,开始你的搞事吧-【主线程】
     */
    protected abstract void onYourThings();

    @Override
    protected void onCreate(@Nullable Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        //先判断是否隐藏状态栏
        if (isHideStatusBar()) hideStatusBar();
        this.getWindow().setFlags(FLAG_HOMEKEY_DISPATCHED, FLAG_HOMEKEY_DISPATCHED);//屏蔽HOME关键代码
        //设置布局
        onBindMainLayoutId();
        //初始化布局
        onInitView();
        //是否需要开启线程加载,默认开启
        if (isNeedLoadDataThread()) initObservable();
            //不需要开启,直接搞自己的事情
        else onYourThings();
        //将activity页面推入堆栈
        ActMgr.getAppManager().addActivity(this);
        dialog = new LoadingDialog(this);
    }

    /**
     * 给懒人用的,帮你循环注册到同一个rxClickListener,建议写为Activity的成员……
     *
     * @param rxClickListener
     * @param viewOrViewIds
     */
    public void addClickViews(RxClickListener rxClickListener, Object... viewOrViewIds) {
        if (viewOrViewIds == null) throw new SecurityException("viewOrViewId is not support");
        for (Object viewOrViewId : viewOrViewIds) registerClickView(viewOrViewId, rxClickListener);
    }

    /**
     * 注册防止抖动的点击事件
     * 支持viewId或者view类型
     *
     * @param viewOrViewId
     * @param listener
     */
    public void registerClickView(Object viewOrViewId, RxClickListener listener) {
        View view;
        if (viewOrViewId instanceof Integer) {
            view = findViewById(Integer.parseInt(String.valueOf(viewOrViewId)));
        } else if (viewOrViewId instanceof View) {
            view = (View) viewOrViewId;
        } else throw new SecurityException("viewOrViewId is not support");
        Observable observable = new MyViewClickObservable(view)
                .throttleFirst(1, TimeUnit.SECONDS);
        if (listener != null) send(observable, listener);
    }

    /**
     * 初始化load加载观察者事件
     */
    protected void initObservable() {
        Observable observable = Observable.create(new ObservableOnSubscribe<Boolean>() {
            @Override
            public void subscribe(ObservableEmitter<Boolean> e) throws Exception {
                onLoadData();//这个是耗时操作,不要做UI刷新
                e.onNext(true);
                e.onComplete();
            }
        }).subscribeOn(Schedulers.io());
        DisposableObserver observer = new DisposableObserver() {
            @Override
            protected void onStart() {
                super.onStart();
                onLoadStart();
            }

            @Override
            public void onNext(Object o) {
                onYourThings();
            }

            @Override
            public void onError(Throwable e) {
                e.printStackTrace();
                onLoadFail(e.getMessage());
                onLoadEnd();
            }

            @Override
            public void onComplete() {
                onLoadEnd();
            }
        };
        mCompositeDisposable.add(observer);
        send(observable, observer);
    }

    /**
     * @return 是否需要开启本地数据加载线程?默认开启...
     */
    protected boolean isNeedLoadDataThread() {
        return true;
    }

    /**
     * @return 是否需要让页面的文字不随着字体大小的变动而变动?默认不随着变动...
     */
    protected boolean isNeedFontSizeNotChange() {
        return true;
    }

    /**
     * 隐藏状态栏,这个方法,还是私有吧,都要在setContentView之前,给你们用了也没用。。。
     * Android 5.0以下版本是设置全屏哈,是全屏哈~~哈
     */
    private void hideStatusBar() {
        Window window = getWindow();
        window.requestFeature(Window.FEATURE_NO_TITLE);
        if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.LOLLIPOP) {
            window.clearFlags(WindowManager.LayoutParams.FLAG_TRANSLUCENT_STATUS
                    | WindowManager.LayoutParams.FLAG_TRANSLUCENT_NAVIGATION);
            window.getDecorView().setSystemUiVisibility(View.SYSTEM_UI_FLAG_LAYOUT_FULLSCREEN
                    | View.SYSTEM_UI_FLAG_LAYOUT_HIDE_NAVIGATION
                    | View.SYSTEM_UI_FLAG_LAYOUT_STABLE);
            window.addFlags(WindowManager.LayoutParams.FLAG_DRAWS_SYSTEM_BAR_BACKGROUNDS);
            window.setStatusBarColor(Color.TRANSPARENT);
            window.setNavigationBarColor(Color.TRANSPARENT);
        } else {
            window.setFlags(WindowManager.LayoutParams.FLAG_FULLSCREEN,
                    WindowManager.LayoutParams.FLAG_FULLSCREEN); // 设置全屏
        }
    }

    /**
     * @param v
     * @param event
     * @return 判断点击的位置是否可以隐藏输入法
     */
    private boolean isShouldHideInput(View v, MotionEvent event) {
        if (v != null && (v instanceof EditText)) {
            int[] leftTop = {0, 0};
            //获取输入框当前的location位置
            v.getLocationInWindow(leftTop);
            int left = leftTop[0];
            int top = leftTop[1];
            int bottom = top + v.getHeight();
            int right = left + v.getWidth();
            if (event.getX() > left && event.getX() < right
                    && event.getY() > top && event.getY() < bottom) {
                // 点击的是输入框区域,保留点击EditText的事件
                return false;
            } else {
                return true;
            }
        }
        return false;
    }

    /**
     * 发送rx的线程消息,以observer回调
     * 绑定activity的onDestroy生命周期,当页面销毁的时候,线程就不收你的回调啦~
     *
     * @param observable
     * @param observer
     */
    @SuppressLint("RestrictedApi")
    @Override
    public void send(Observable observable, Observer observer) {
        checkNotNull(observable, "observable is null");
//        Logger.d("rxjava=========>send开始时间为:" + DateUtils.getNowDate(DateUtils.FXGALL));
        observable.compose(this.<Long>bindUntilEvent(ActivityEvent.DESTROY))//当activity执行destroy的时候结束订阅
                .observeOn(AndroidSchedulers.mainThread()).subscribe(observer);

//        observable
//                .observeOn(AndroidSchedulers.mainThread()).subscribe(observer);
    }

    /**
     * 发送rx的线程消息,以consumer回调
     * 绑定activity的onDestroy生命周期,当页面销毁的时候,线程就不收你的回调啦~
     *
     * @param observable
     * @param consumer
     */
    @SuppressLint("RestrictedApi")
    @Override
    public void send(Observable observable, Consumer consumer) {
        checkNotNull(observable, "observable is null");
//        Logger.d("rxjava=========>send开始时间为:" + DateUtils.getNowDate(DateUtils.FXGALL));
        observable.compose(this.<Long>bindUntilEvent(ActivityEvent.DESTROY))//当activity执行destroy的时候结束订阅
                .observeOn(AndroidSchedulers.mainThread()).subscribe(consumer);

//        observable
//                .observeOn(AndroidSchedulers.mainThread()).subscribe(consumer);
    }

    /**
     * 开始加载的状态,可以用来转转转-【主线程】
     */
    @Override
    public void onLoadStart() {

    }

    /**
     * 加载失败的情况,可以用来填充失败情况-【主线程】
     *
     * @param errorMsg
     */
    @Override
    public void onLoadFail(String errorMsg) {

    }

    /**
     * 加载结束的状态,可以用来停止转转转-【主线程】
     */
    @Override
    public void onLoadEnd() {

    }

    @Override
    public boolean dispatchTouchEvent(MotionEvent ev) {
        if (ev.getAction() == MotionEvent.ACTION_DOWN) {
            View v = getCurrentFocus();
            if (isShouldHideInput(v, ev)) {
                KeyboardUtils.hideKeyboard(v);
            }
            return super.dispatchTouchEvent(ev);
        }
        // 必不可少,否则所有的组件都不会有TouchEvent了
        if (getWindow().superDispatchTouchEvent(ev)) {
            return true;
        }
        return onTouchEvent(ev);
    }

    @Override
    public boolean onKeyDown(int keyCode, KeyEvent event) {
        if (keyCode == event.KEYCODE_HOME) {
            return true;
        }
        return super.onKeyDown(keyCode, event);
    }

    @Override
    public Resources getResources() {
        Resources res = super.getResources();
        if (!isNeedFontSizeNotChange()) {//默认的话系统字体变更了,我们这边不变更
            return res;
        }
        Configuration config = new Configuration();
        config.setToDefaults();
        res.updateConfiguration(config, res.getDisplayMetrics());
        return res;
    }

    @Override
    protected void onDestroy() {
        //移除堆栈信息中的activity
        ActMgr.getAppManager().removeActivity(this);
        mCompositeDisposable.clear();
        super.onDestroy();
    }

    /**
     * Activity jump method start
     */

    public void jumpToActivity(Intent intent) {
        jumpToActivity(intent, false);
    }

    public void jumpToActivity(Intent intent, View view, String sharedName) {
        jumpTo(intent, view, sharedName, false);
    }

    public void jumpToActivityForResult(Intent intent, int code) {
        if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.LOLLIPOP)
            jump_v21ForResult(intent, code);
        else
            jumpForResult(intent, code);
    }

    protected void jumpToActivity(Intent intent, boolean isFinishSelf) {
        if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.LOLLIPOP)
            jump_v21(intent);
        else
            jump(intent);
        if (isFinishSelf)
            supportedFinish();
    }

    protected void supportedFinish() {
        if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.LOLLIPOP) {
            finishAfterTransition();
        } else {
            super.finish();
        }
    }

    private void jumpTo(Intent intent, View view, String sharedName, boolean finishSelf) {
        if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.LOLLIPOP) {
            jumpWithSharedElement(intent, view, sharedName);
        } else {
            jump(intent);
        }
        if (finishSelf) {
            supportedFinish();
        }
    }

    @TargetApi(value = Build.VERSION_CODES.LOLLIPOP)
    private void jumpWithSharedElement(Intent intent, View view, String sharedName) {
        startActivity(intent, ActivityOptions.makeSceneTransitionAnimation(this, view, sharedName).toBundle());
    }

    private void jump(Intent intent) {
        startActivity(intent);
    }

    @TargetApi(value = Build.VERSION_CODES.LOLLIPOP)
    private void jump_v21(Intent intent) {
        startActivity(intent, ActivityOptions.makeSceneTransitionAnimation(this).toBundle());
    }

    private void jumpForResult(Intent intent, int code) {
        startActivityForResult(intent, code);
    }

    @SuppressLint("RestrictedApi")
    @TargetApi(value = Build.VERSION_CODES.LOLLIPOP)
    private void jump_v21ForResult(Intent intent, int code) {
        startActivityForResult(intent, code, ActivityOptions.makeSceneTransitionAnimation(this).toBundle());
    }

    /**
     * end
     */

    @Override
    public void onBackPressed() {
        supportedFinish();
    }

    public void showLoading() {
        if (dialog != null) {
            dialog.show();
        }
    }

    public void showLoading(Object msgOrResId) {
        if (dialog != null) {
            if (msgOrResId instanceof String)
                dialog.getText().setText(String.valueOf(msgOrResId));
            else if (msgOrResId instanceof Integer)
                dialog.getText().setText(Integer.parseInt(String.valueOf(msgOrResId)));
            dialog.getText().setVisibility(View.VISIBLE);
            dialog.show();
        }
    }

    public void hideLoading() {
        if (dialog != null) {
            dialog.dismiss();
        }
    }

    public Context getThisActivity() {
        return this;
    }
}

【code-BaseView】

/**
 * author : koge_yvone
 * date : 2017/12/15
 * desc : BaseView接口
 * version : 1.0
 */

public interface BaseView {

    /**
     * 发送rx的线程消息,以observer回调
     *
     * @param observable
     * @param observer
     */
    void send(Observable observable, Observer observer);

    /**
     * 发送rx的线程消息,以consumer回调
     *
     * @param observable
     * @param consumer
     */
    void send(Observable observable, Consumer consumer);

    /**
     * 开始加载的状态,可以用来转转转
     */
    void onLoadStart();

    /**
     * 加载失败的情况,可以用来填充失败情况
     *
     * @param errorMsg
     */
    void onLoadFail(String errorMsg);

    /**
     * 加载结束的状态,可以用来停止转转转
     */
    void onLoadEnd();

}

【code-MyViewClickObservable】

public class MyViewClickObservable extends Observable<Object> {
    private final View view;

    public MyViewClickObservable(View view) {
        this.view = view;
    }

    @SuppressLint("RestrictedApi")
    @Override
    protected void subscribeActual(Observer<? super Object> observer) {
        if (!checkMainThread(observer)) {
            return;
        }
        MyViewClickObservable.Listener listener = new MyViewClickObservable.Listener(view, observer);
        observer.onSubscribe(listener);
        view.setOnClickListener(listener);
    }

    static final class Listener extends MainThreadDisposable implements View.OnClickListener {
        private final View view;
        private final Observer<? super Object> observer;

        Listener(View view, Observer<? super Object> observer) {
            this.view = view;
            this.observer = observer;
        }

        @Override
        public void onClick(View v) {
            if (!isDisposed()) {
                observer.onNext(v);
            }
        }

        @Override
        protected void onDispose() {
            view.setOnClickListener(null);
        }
    }
}

【code-RxClickListener】

public abstract class RxClickListener implements Consumer<View> {
    public abstract void onClick(View view);

    @Override
    public void accept(View view) throws Exception {
        onClick(view);
    }
}
akarnokd commented 6 years ago

I spent a lot of time trying to recreate your application but you omitted a lot of things. Please provide a standalone application, preferably a GitHub project, that compiles and demonstrates your problem.

Also provide a clear indication in between which points in your application do you experience that 2 minute delay. Please run your application in debug mode and pause it to see which thread does what that could hint about the source of the delay.

I also recommend starting over with your application, step-by-step, and see what change introduces that huge delay, if any.

yvone1991 commented 6 years ago

Logger.d("new Ob duration======>" + (newObCallbackTime - newObStartTime));

sometimes the duration more than 2 min~~~~sometimes..sometimes...sometimes...

【code】

private void testNewOb() { newObStartTime = SystemClock.currentThreadTimeMillis(); Logger.d("new Ob create time on======>" + newObStartTime); Observable observable = Observable.create(new ObservableOnSubscribe() { @Override public void subscribe(ObservableEmitter e) throws Exception { newObCallbackTime = SystemClock.currentThreadTimeMillis(); Logger.d("new Ob end time on======>" + newObCallbackTime); Logger.d("new Ob duration======>" + (newObCallbackTime - newObStartTime)); int i = 0; while (i < 100) { i++; SystemClock.sleep(100); } e.onNext(i); e.onComplete(); } }).subscribeOn(Schedulers.io()); send(observable, new Consumer() { @Override public void accept(Object o) throws Exception {

            //todo refreshView();
        }
    });

}
akarnokd commented 6 years ago

Please use the triple backtick to denote code. Check out your question's sources which I fixed for you.

I can't see any reason for that large delay. Maybe your device has a particular runtime where class initialization takes really long. Try with a very basic project and that code part only that measures the time to create body.

yvone1991 commented 6 years ago

At first, when I wrote the UI interface, there was no such problem. (maybe I didn't test too many times.)

When I write request logic on each interface, the create side will start waiting for a long time to start the launch(sometimes...sometimes...sometimes).

I try to write a base project, but I can't guarantee that it will happen. And my whole project itself is just like my example code, and there's no extra thing to add, but I don't know why there's such a big delay when it's created.

yvone1991 commented 6 years ago

whole project logic:

【Every activity goes like this】 Activity→bindLayout→findViewById→createObservable(delay more than 2 min, sometimes)→loadData( two minutes later, start this method )→onYourThings→createNewObservable(sometimes,delay more than 2 min)....

akarnokd commented 6 years ago

Without the entire project, I can't help you much further. Does it happen inside an emulator or on a device? Could you post your entire project on your GiHub repository?

yvone1991 commented 6 years ago

Can I send your e-mail?

-----my project(^_^)

akarnokd commented 6 years ago

Yes. The address is my github handle + gmail.com

yvone1991 commented 6 years ago

Your email is classified as spam by the service provider (akarnokd@gmail.com), and the other party will not accept it.

akarnokd commented 6 years ago

I can't give you any other email address. Are you on Twitter? Upload your code somewhere and PM me the URL.

yvone1991 commented 6 years ago

I sent you with my company mailbox. Did you recieve it?

akarnokd commented 6 years ago

There is no attachment or link.

akarnokd commented 6 years ago

In one of your emails, you mentioned this delay happens on POS devices. It could be device/Android specific anomaly. Are you using ProGuard? If not, try it; if yes, disable it and see if that helps.

yvone1991 commented 6 years ago

I try it again,Did you recieve it?

akarnokd commented 6 years ago

I received a letter which asked me to ask for Google Drive permission of two files.

yvone1991 commented 6 years ago

yes

yvone1991 commented 6 years ago

pen***555+gmail.com

akarnokd commented 6 years ago

I've already asked for permissions.

yvone1991 commented 6 years ago

I have shared files

akarnokd commented 6 years ago

I've downloaded the project and built it. Where is the code that takes too long?

yvone1991 commented 6 years ago

OrderMenuActivity.class AddOrderActivity.class OrderStatusActivity.class OrderSureActivity.class TableAvtivity.class

1、sometimes,onYourThings() delay 2 min reach。。。 2、I have annotated a method: HttpManager.java line106 to line 110 log... ====>

   /**
     * 请求前的准备
     */
    private Observable readyRequest(final String method, final Object body) {
        Logger.d("这里开始开始发送数据源===========>readyRequest()" + DateUtils.getNowDate(DateUtils.FXGALL));
        Observable observable = Observable.just(body).map(new Function<Object, HttpRequest>() {
            @Override
            public HttpRequest apply(Object o) throws Exception {
                Logger.d("这里开始组包===========>" + DateUtils.getNowDate(DateUtils.FXGALL));
                HttpRequest request = new HttpRequest();
                request.setMethod(method);
                request.setTimestamp(DateUtils.getNowDate(DateUtils.FXGALL));
                request.setBiz_content(body);
                request.setMac("222");
                request.setSession_id("222");
                Logger.d("这里组包完毕,发出去给retrofit2了===========>");
                return request;
            }
        }).subscribeOn(Schedulers.io());
yvone1991 commented 6 years ago

All Observable that needs to be created may be delayed. The previous use of rxjava1 did not have this problem

akarnokd commented 6 years ago

That readyRequest seems to be not in use. Are you sure you match the time difference properly in your log?

yvone1991 commented 6 years ago

It was originally used, and later removed。。。

line159 to line 327,I have annotated them……

Finally, I found that the problem of rxjava2 creation is not a create operator problem.

akarnokd commented 6 years ago

It was originally used, and later removed。。。

How would you expect me to diagnose your problem when the code isn't the one exhibiting the problem?

Finally, I found that the problem of rxjava2 creation is not a create operator problem.

It is likely you have a flaky network so it takes sometimes longer to get all response.

yvone1991 commented 6 years ago

Thank you very much for your patience.

At first I also suspected that it was a network problem, or something else, but until I used that way to verify it, I found that the time for creating OB was delay for a very long time.

But the problem I'm describing is that all the annotations I've done are to prove that something went wrong when I created the OB. Personally, I think the key to the problem is to delay when creating OB objects. Because after 2min is delayed, all log will be brushed out in a flash, rather than a normal brush.