ReactiveX / RxJava

RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.
Apache License 2.0
47.91k stars 7.6k forks source link

3.x: recursive concat causes StackOverflowError #6958

Open akarnokd opened 4 years ago

akarnokd commented 4 years ago

Originally posted on StackOverflow.

The following code crashes with StackOverflowError and the stacktrace shows a long chain of request calls.

import io.reactivex.rxjava3.core.Flowable
import io.reactivex.rxjava3.core.Single
import io.reactivex.rxjava3.schedulers.Schedulers
import java.util.concurrent.TimeUnit.SECONDS

fun main() {
    fun incr(n: Int): Single<Int> = Single.just(n + 1)

    fun numbers(n: Int, max: Int): Flowable<Int> = Flowable.just(n).concatWith(
        if (n < max)
            incr(n)
            .observeOn(Schedulers.single())
            .toFlowable()
            .concatMap { next -> numbers(next, max) }
        else
            Flowable.empty()
    )

    numbers(1, 10_000)
    .blockingForEach(::println)
}
Exception in thread "main" java.lang.StackOverflowError
    at io.reactivex.rxjava3.internal.subscriptions.SubscriptionArbiter.request(SubscriptionArbiter.java:135)
    at io.reactivex.rxjava3.internal.subscriptions.SubscriptionArbiter.request(SubscriptionArbiter.java:135)
    at io.reactivex.rxjava3.internal.operators.flowable.FlowableConcatMap$ConcatMapImmediate.request(FlowableConcatMap.java:215)
    at io.reactivex.rxjava3.internal.subscriptions.SubscriptionArbiter.request(SubscriptionArbiter.java:135)
    at io.reactivex.rxjava3.internal.subscriptions.SubscriptionArbiter.request(SubscriptionArbiter.java:135)
    at io.reactivex.rxjava3.internal.operators.flowable.FlowableConcatMap$ConcatMapImmediate.request(FlowableConcatMap.java:215)
    at io.reactivex.rxjava3.internal.subscriptions.SubscriptionArbiter.request(SubscriptionArbiter.java:135)
    at io.reactivex.rxjava3.internal.subscriptions.SubscriptionArbiter.request(SubscriptionArbiter.java:135)
    at io.reactivex.rxjava3.internal.operators.flowable.FlowableConcatMap$ConcatMapImmediate.request(FlowableConcatMap.java:215)

I'm not sure why there is such a chain created and if this is a result of an RxJava bug or not.

tmankita commented 4 years ago

Hi, I would like to pick up this issue if no one is working on it yet.

akarnokd commented 4 years ago

Sure.

tmankita commented 4 years ago

I want to share my theory: "ConcatWith" function needs to get @NonNull Publisher<@NonNull ? extends T> as a parameter but in the example above it gets if-else expression instead. In my opinion, the reason for the StackOverflow Error is that "ConcatWith" take somehow only the following expression : incr(n) .observeOn(Schedulers.single()).toFlowable().concatMap { next -> numbers(next, max)} without the base case of the recursion.

one of the comments In the original post on stackOverFlow suggest alternative working implementation:

fun <T> unfold(seed: T, next: (T) -> T?): Flowable<T> =
        UnicastProcessor.create<T>().toSerialized().let { proc ->
            proc
                .startWithItem(seed)
                .doOnNext { prev ->
                    when (val curr = next(prev)) {
                        null ->
                            proc.onComplete()
                        else ->
                            proc.onNext(curr)
                    }
                }
        }

    fun numbers(first: Int, max: Int): Flowable<Int> =
        unfold(first) { prev -> if (prev < max) prev + 1 else null }

    numbers(1, 1_000_000_000)
        .sample(1, SECONDS)
        .blockingForEach(::println)

As we can see the unfold function takes as an argument a function that contains the if-else expression, and this is the right way to do so because the unfold return a flowable object that contains the base case of the recursion.

@akarnokd , What do you think about my theory?

akarnokd commented 4 years ago

It creates a long chain because of concatMap is kept alive even though it ever emits one item due to incr. I still have to think about what we can do about it or even if we should.

anastr commented 4 years ago

it looks like each time you add new couple of concatWith-concatMap operators, so while the sequence grow up it must pass the emit to the whole concatWith-concatMap couples then to the Consumer. My only proof for that is that the last '1000' emits will take longer time than first one.

I'm sorry to bother, but i have a question: in this case, why the error didn't deliver to the consumer (onError has been implemented for sure)! if the max number was 1000, error will be thrown after complete!