yigit / android-priority-jobqueue

A Job Queue specifically written for Android to easily schedule jobs (tasks) that run in the background, improving UX and application stability.
3.4k stars 395 forks source link

JobQueue with RxJava #348

Closed pishguy closed 7 years ago

pishguy commented 7 years ago

is any solution to integrate rxJava and jobQueue?

yigit commented 7 years ago

nothing built in but you can easily write a worker using job manager.

pishguy commented 7 years ago

is in this library road map?

toidv commented 6 years ago

How can I trigger shouldReRunOnThrowable if Subscriber onError? Currently Throwable didn't throw in onError so it couldn't trigger shouldReRunOnThrowable (same as job execute successfully). This is my current code in onRun(). Thank you!

registerSegmentUseCase.execute(new DisposableSubscriber<SegmentPieces>() {
            @Override
            public void onNext(SegmentPieces segmentPieces) {
                updateSyncResult(segmentPieces);
            }

            @Override
            public void onError(Throwable t) {
                t.printStackTrace();
                RetryConstraint retryConstraint = shouldReRunOnThrowable(t, getCurrentRunCount(),
                        getRetryLimit());
                if (retryConstraint == null) {
                    retryConstraint = RetryConstraint.RETRY;
                }
                retryConstraint.shouldRetry();
            }

            @Override
            public void onComplete() {

            }
        }, RegisterSegmentUseCase.ParamRegisterSegment.forParams(paramRegisterSegment));
kalpeshp0310 commented 6 years ago

Job's onRun() method is not suitable to do Rx calls. Either you convert your to non Rx code. (Which I would suggest in this case). Or, you can make following changes to your current code. 1) Do not apply any scheduler to your Rx Stream, your Rx stream should run in the thread which onRun() function is called. 2) Do not implement onError() function. by doing this RxJava will throw OnErrorNotImplementedException which will be caught by JobManager and will call shouldReRunOnThrowable() with the throwable of type OnErrorNotImplementedException. 3) In shouldReRunOnThrowable() function you call getCause() on throwable to get the actual error and decide accordingly whether to Retry or cancel.

yigit commented 6 years ago

you can also do blockingGet in the job's on run method if you are using RX.

toidv commented 6 years ago

Thank your suggestion, both of ways above are working well, but currently I used same use case registerSegmentUseCase for creating segment by Job (when app is offline) as well as by User (when app is online), so it required to break the use case to handle separately. I want to keep the current logic code so I used retryWhen of Rx to retry when onError happen, but I wonder does it cause any side effect when using with job?. This is my example code. Thank you!

@Override
    Flowable<SegmentPieces> buildUseCaseObservable(ParamRegisterSegment paramRegisterSegment) {
        return unitRepository.registerSegmentPieces(paramRegisterSegment)
                .retryWhen(throwableFlowable -> throwableFlowable.compose(zipWithFlatMap()))
                .doOnNext(segmentPieces -> {
                    segmentPieces.setOffset(0);
                    segmentPieces.setTime(System.currentTimeMillis());
                    saveSegmentPieces(segmentPieces);
                });
    }

    <T> FlowableTransformer<T, Long> zipWithFlatMap() {
        return upstream -> upstream.zipWith(Flowable.range(COUNTER_START, ATTEMPTS),
                (t, repeatAttempt) -> repeatAttempt
        ).flatMap((Function<Integer, Publisher<Long>>) repeatAttempt -> {
            return Flowable.timer(repeatAttempt * 5, TimeUnit.SECONDS);
        });
    }
toidv commented 6 years ago

One downside to using retryWhen of Rx is we don't have a chance to update params when a job fails so I changed to using blockingGet. Thank for your suggestion.

if (params.getTaskId() < 0) {
            Task runningTask = getTaskRepository.getRunningTask(params.getTaskId(), params.getTaskTemplateId());
            if (runningTask != null) {
                params.setTaskId(runningTask.getId());
            }
        }
        Task task = finishTaskRepository.finishTask(params.getTaskId(), params.getTime())
                .blockingGet();

        addTaskRepository.saveTask(task);