quarkusio / quarkus

Quarkus: Supersonic Subatomic Java.
https://quarkus.io
Apache License 2.0
13.36k stars 2.56k forks source link

Smallrye Reactive message nacks are not propageted to Multi onFailures #41356

Open davidvoit opened 2 weeks ago

davidvoit commented 2 weeks ago

Describe the bug

Giving the following example code:

import io.quarkus.logging.Log;
import io.quarkus.runtime.Quarkus;
import io.smallrye.mutiny.Multi;
import jakarta.enterprise.context.ApplicationScoped;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

@ApplicationScoped
public class Boom {
    @Outgoing("event")
    public Multi<Integer> loop() {
        return Multi
            .createBy().repeating()
            .supplier(() -> Multi.createFrom().range(1, 3))
            .indefinitely()
            .onFailure().invoke(() -> {
                Log.error("fatal error caught");
                Quarkus.asyncExit(1);
            })
            .onItem().transformToMultiAndConcatenate(integerMulti -> integerMulti);
    }

    @Incoming("event")
    public void boom(Integer value) {
        if (value == 1) {
            Log.info("first event");
        } else {
            Log.warn("second event");
            throw new RuntimeException("boom");
        }
    }
}

The onFailure code is never triggered

With a Message the withNack code is triggered

import io.quarkus.logging.Log;
import io.smallrye.mutiny.Multi;
import jakarta.enterprise.context.ApplicationScoped;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import java.util.Random;
import java.util.concurrent.CompletableFuture;

@ApplicationScoped
public class Boom {
    private Random r = new Random();

    @Outgoing("event")
    public Multi<Integer> loop() {
        return Multi
            .createBy().repeating()
            .supplier(() -> Multi.createFrom().range(1, 3))
            .indefinitely()
            .onItem().transformToMultiAndConcatenate(integerMulti -> integerMulti);
    }

    @Incoming("event")
    @Outgoing("event2")
    public Message<Integer> checkError(int in) {
        return Message.of(in)
            .withNack(throwable -> {
                Log.error("fatal error caught", throwable);
                System.exit(1);
                return CompletableFuture.completedFuture(null);
            });
    }

    @Incoming("event2")
    public void boom(int value) {
        if (value == 1) {
            Log.info("first event");
        } else {
            Log.warn("second event");
            throw new RuntimeException("boom");
        }
    }
}

Expected behavior

Multi::onFailure is triggered

Actual behavior

Multi::onFailure is just ignored

How to Reproduce?

Example code provided

Output of uname -a or ver

No response

Output of java -version

No response

Quarkus version or git rev

3.11.2 (3.8.5)

Build tool (ie. output of mvnw --version or gradlew --version)

No response

Additional information

No response

quarkus-bot[bot] commented 2 weeks ago

/cc @Ladicek (smallrye), @jmartisk (smallrye), @phillip-kruger (smallrye), @radcortez (smallrye)