quarkusio / quarkus

Quarkus: Supersonic Subatomic Java.
https://quarkus.io
Apache License 2.0
13.87k stars 2.71k forks source link

graceful shutdown NPE when using kafka consumer with stork #41658

Open vsevel opened 5 months ago

vsevel commented 5 months ago

Describe the bug

I have a kafka consumer calling a rest service using a stork url. under reasonable load, the rest call fails with a NPE in the StorkClientRequestFilter: "io.smallrye.stork.Stork.getInstance()" is null it sounds like the shutdown of the application does not wait for in flight messages to finish processing, before shutting down parts of the application. the integrity of data is preserved, but some messages are sent to the dead letter topic, delaying our ability to quickly process them, when there is actually another replica that could have processed them.

Expected behavior

in a ideal situation, upon shutdown, we should have:

Actual behavior

we see the following exception:

2024-07-03 10:37:15,144 ERROR [org.jbo.res.rea.cli.imp.StorkClientRequestFilter] (quarkus-virtual-thread-10) Error selecting service instance for serviceName: greeting-service [Error Occurred After Shutdown]: java.lang.NullPointerException: Cannot invoke "io.smallrye.stork.Stork.getService(String)" because the return value of "io.smallrye.stork.Stork.getInstance()" is null
        at org.jboss.resteasy.reactive.client.impl.StorkClientRequestFilter.filter(StorkClientRequestFilter.java:38)
        at org.jboss.resteasy.reactive.client.spi.ResteasyReactiveClientRequestFilter.filter(ResteasyReactiveClientRequestFilter.java:21)
        at org.jboss.resteasy.reactive.client.handlers.ClientRequestFilterRestHandler.handle(ClientRequestFilterRestHandler.java:25)
        at org.jboss.resteasy.reactive.client.handlers.ClientRequestFilterRestHandler.handle(ClientRequestFilterRestHandler.java:10)
        at org.jboss.resteasy.reactive.common.core.AbstractResteasyReactiveContext.invokeHandler(AbstractResteasyReactiveContext.java:231)
        at org.jboss.resteasy.reactive.common.core.AbstractResteasyReactiveContext.run(AbstractResteasyReactiveContext.java:147)
        at org.jboss.resteasy.reactive.client.impl.AsyncInvokerImpl.performRequestInternal(AsyncInvokerImpl.java:285)
        at org.jboss.resteasy.reactive.client.impl.AsyncInvokerImpl.performRequestInternal(AsyncInvokerImpl.java:275)
        at org.jboss.resteasy.reactive.client.impl.AsyncInvokerImpl.method(AsyncInvokerImpl.java:215)
        at org.jboss.resteasy.reactive.client.impl.InvocationBuilderImpl.method(InvocationBuilderImpl.java:313)
        at org.acme.IGreetingResource$$QuarkusRestClientInterface.hello(Unknown Source)
        at org.acme.IGreetingResource$$CDIWrapper.hello(Unknown Source)
        at org.acme.IGreetingResource$$CDIWrapper_ClientProxy.hello(Unknown Source)
        at org.acme.PriceConsumer.consume(PriceConsumer.java:29)
        at org.acme.PriceConsumer_ClientProxy.consume(Unknown Source)
        at org.acme.PriceConsumer_SmallRyeMessagingInvoker_consume_ff7ab742c6cdb0956d1e3afc1d8ed7fd43c75ad0.invoke(Unknown Source)
        at io.smallrye.reactive.messaging.providers.AbstractMediator.lambda$invokeBlocking$15(AbstractMediator.java:190)
        at io.smallrye.context.impl.wrappers.SlowContextualSupplier.get(SlowContextualSupplier.java:21)
        at io.smallrye.mutiny.operators.uni.builders.UniCreateFromDeferredSupplier.subscribe(UniCreateFromDeferredSupplier.java:25)
        at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:36)
        at io.smallrye.mutiny.operators.uni.UniRunSubscribeOn.lambda$subscribe$0(UniRunSubscribeOn.java:27)
        at io.quarkus.virtual.threads.ContextPreservingExecutorService$ContextPreservingRunnable.run(ContextPreservingExecutorService.java:45)
        at java.base/java.util.concurrent.ThreadPerTaskExecutor$TaskRunner.run(ThreadPerTaskExecutor.java:314)
        at java.base/java.lang.VirtualThread.run(VirtualThread.java:309)

and messages may be sent to the DLT.

How to Reproduce?

create a new application with additional deps:

    <dependency>
      <groupId>io.quarkus</groupId>
      <artifactId>quarkus-rest-client</artifactId>
    </dependency>
    <dependency>
      <groupId>io.quarkus</groupId>
      <artifactId>quarkus-messaging-kafka</artifactId>
    </dependency>
    <dependency>
      <groupId>io.quarkus</groupId>
      <artifactId>quarkus-smallrye-stork</artifactId>
    </dependency>
    <dependency>
      <groupId>io.smallrye.stork</groupId>
      <artifactId>stork-service-discovery-static-list</artifactId>
    </dependency>

add properties file:

quarkus.http.port=18080

mp.messaging.incoming.prices.connector=smallrye-kafka
mp.messaging.outgoing.prices-out.connector=smallrye-kafka

mp.messaging.incoming.prices.topic=mytopic
mp.messaging.outgoing.prices-out.topic=mytopic

quarkus.rest-client.greeting.url=stork://greeting-service
#quarkus.rest-client.greeting.url=http://localhost:18080

quarkus.stork.greeting-service.service-discovery.type=static
quarkus.stork.greeting-service.service-discovery.address-list=localhost:18080
quarkus.stork.greeting-service.load-balancer.type=round-robin

a producer:

@ApplicationScoped
public class KafkaPriceProducer {

    private final Random random = new Random();

    @Outgoing("prices-out")
    public Multi<Double> generate() {
        // Build an infinite stream of random prices
        // It emits a price every second
        return Multi.createFrom().ticks().every(Duration.ofMillis(100))
                .map(x -> random.nextDouble());
    }
}

a consumer:

@ApplicationScoped
public class PriceConsumer {

    Logger log = LoggerFactory.getLogger(PriceConsumer.class);

    @Inject
    @RestClient
    IGreetingResource greetingResource;

    @Incoming("prices")
    @RunOnVirtualThread
    @Blocking(ordered = false)
    public void consume(double price) throws InterruptedException {
        log.info("receive " + price);
        Thread.sleep(1000);
        log.info("greeting => " + greetingResource.hello());
    }

}

start the application with quarkus:dev after startup, press s you should see multiple exceptions.

Output of uname -a or ver

No response

Output of java -version

No response

Quarkus version or git rev

3.12

Build tool (ie. output of mvnw --version or gradlew --version)

No response

Additional information

No response

quarkus-bot[bot] commented 5 months ago

/cc @alesj (kafka), @aureamunoz (stork), @cescoffier (kafka,stork), @ozangunalp (kafka)

cescoffier commented 2 months ago

@ozangunalp started looking at a better way to handle graceful shutdowns.

So, we are planning something but it's too early to share details (I don't have the details anyway, they are in Ozan's head :-))

cescoffier commented 1 week ago

FTR, @ozangunalp is still working on it.