smallrye / smallrye-mutiny

An Intuitive Event-Driven Reactive Programming Library for Java
https://smallrye.io/smallrye-mutiny
Apache License 2.0
779 stars 124 forks source link

Mutiny retry with backoff not resetting counter #1619

Closed sschellh closed 3 weeks ago

sschellh commented 4 weeks ago

I am using a smallrye mutiny Uni with subsequent retries. If the first retry has a backoff configured, then the retry counter is not reset properly. This results in a wrong (unexpected) number of retries.

Please see the below unit test as demo.

Tested with Quarkus 3.11.1 => Mutiny 2.5.8

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import org.junit.jupiter.api.Test;

import java.time.Duration;
import java.util.concurrent.atomic.AtomicInteger;

import static org.junit.jupiter.api.Assertions.assertEquals;

...

    @Test
    public void uniWithMultipleRetryAndBackoff() {
        Duration d = Duration.ofMillis(10);
        AtomicInteger invokeCounter = new AtomicInteger(0);
        final int retryInner = 2;
        final int retryOuter = 1;

        // Example 1: No backoff configured
        invokeCounter.set(0);
        Uni.createFrom().voidItem()
                .onItem().invoke(invokeCounter::incrementAndGet)
                .onItem().invoke(() -> {
                    throw new RuntimeException("Test Exception");
                })
                .onFailure().retry().atMost(retryInner)
                .onItem().transform(s -> "Just do some transformation")
                .onFailure().retry().atMost(retryOuter)
                .onFailure().recoverWithNull()
                .await().indefinitely();

        // works as expected
        assertEquals((retryInner+1) * (retryOuter + 1), invokeCounter.get(), "Example 1: Number of invokes wrong");

        // Example 2: Backoff configured for inner retry
        // (Only difference to example 1 is the backoff)
        invokeCounter.set(0);
        Uni.createFrom().voidItem()
                .onItem().invoke(invokeCounter::incrementAndGet)
                .onItem().invoke(() -> {
                    throw new RuntimeException("Test Exception");
                })
                .onFailure().retry().withBackOff(d).atMost(retryInner)    // Note the "withBackoff(...) here!
                .onItem().transform(s -> "Just do some transformation")
                .onFailure().retry().atMost(retryOuter)
                .onFailure().recoverWithNull()
                .await().indefinitely();

        // Fails currently (works not as expected)
        assertEquals((retryInner+1) * (retryOuter + 1), invokeCounter.get(), "Example 2: Number of invokes wrong");
    }
jponge commented 3 weeks ago

I've reduced the test case to:

AtomicInteger counter = new AtomicInteger();
assertThatThrownBy(() -> Uni.createFrom().item(123)
        .onItem().invoke(counter::incrementAndGet)
        .onItem().failWith(i -> new RuntimeException("boom"))
        .onFailure().retry().atMost(3)
        .onFailure().retry().withBackOff(Duration.ofMillis(10)).atMost(3)
        .await().atMost(Duration.ofSeconds(1))).isInstanceOf(RuntimeException.class).hasMessage("boom");
assertThat(counter.get()).isEqualTo(16);

This is the correct behaviour:

Note that you get the very same behaviour with:

 AtomicInteger counter = new AtomicInteger();
 assertThatThrownBy(() -> Uni.createFrom().item(123)
         .onItem().invoke(counter::incrementAndGet)
         .onItem().failWith(i -> new RuntimeException("boom"))
         .onFailure().retry().atMost(3)
         .onFailure().retry().atMost(3)
         .await().atMost(Duration.ofSeconds(1))).isInstanceOf(RuntimeException.class).hasMessage("boom");
 assertThat(counter.get()).isEqualTo(16);
sschellh commented 3 weeks ago

Hello @jponge

if you change your test case such that the withBackOff is added to the first retry, your testcase also fails. (Exchange line 5 with line 6) Therefore, I think this is a bug and like to request to re-open this issue.

AtomicInteger counter = new AtomicInteger();
assertThatThrownBy(() -> Uni.createFrom().item(123)
        .onItem().invoke(counter::incrementAndGet)
        .onItem().failWith(i -> new RuntimeException("boom"))
        .onFailure().retry().withBackOff(Duration.ofMillis(10)).atMost(3)
        .onFailure().retry().atMost(3)
        .await().atMost(Duration.ofSeconds(1))).isInstanceOf(RuntimeException.class).hasMessage("boom");
assertThat(counter.get()).isEqualTo(16);

Best regards Stefan

jponge commented 3 weeks ago

This is because the withBackOff retry operator won't allow re-subscription once it has reached its threshold, hence it always forwards a failure instead of resubscribing the upstream.

sschellh commented 3 weeks ago

Hi @jponge Many thanks for your reply and investigations. Is this behavior documented somewhere? The challenge I see here is the following: Assume you receive a Uni from some other method. You do not know the internals how that Uni is created. For instance whether there are any retries or backoffs configured. And vice versa, who developed the method that created the uni does not know what others will add at some later stages. You process this uni and at some point you add error handling and configure a retry. As a consequence the number of retries configured in the "internal" method will not work properly anymore.

The behavior (i.e. the number of executions) is different depending on whether there is a backoff configured or not. This does not feel right in my opinion.

jponge commented 3 weeks ago

@sschellh do you want to investigate and open a PR?

The interesting bits are in io.smallrye.mutiny.groups.UniRetry#atMost