smallrye / smallrye-reactive-messaging

SmallRye Reactive Messaging
http://www.smallrye.io/smallrye-reactive-messaging/
Apache License 2.0
235 stars 176 forks source link

KafkaRequestReply fails with ` java.lang.IllegalStateException` if the partition assignment takes longer than `replyTimeout` at startup #2714

Open Malandril opened 1 month ago

Malandril commented 1 month ago

Context

When using quarkus 3.13.0 with a KafkaRequestReply at startup if the KafkaRequestReply's waitForAssignments takes longer than the reply topic timeout, the KafkaRequestReply will always throw

   java.lang.IllegalStateException: SRMSG00028: The subscription to question-out has been cancelled
        at io.smallrye.reactive.messaging.providers.extension.AbstractEmitter.verify(AbstractEmitter.java:196)
        at io.smallrye.reactive.messaging.providers.extension.AbstractEmitter.emit(AbstractEmitter.java:161)
        at io.smallrye.reactive.messaging.providers.extension.MutinyEmitterImpl.lambda$sendMessage$4(MutinyEmitterImpl.java:72)
        at io.smallrye.context.impl.wrappers.SlowContextualConsumer.accept(SlowContextualConsumer.java:21)
        at io.smallrye.mutiny.operators.uni.builders.UniCreateWithEmitter.subscribe(UniCreateWithEmitter.java:22)
        at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:36)
        at io.smallrye.mutiny.operators.uni.UniEmitOn.subscribe(UniEmitOn.java:22)
        at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:36)
        at io.smallrye.mutiny.operators.uni.UniOnItemConsume.subscribe(UniOnItemConsume.java:30)
        at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:36)
        at io.smallrye.mutiny.operators.uni.UniOnItemTransformToUni.subscribe(UniOnItemTransformToUni.java:25)
        at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:36)
        at io.smallrye.mutiny.operators.uni.UniOnItemOrFailureConsume.subscribe(UniOnItemOrFailureConsume.java:24)
        at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:36)
        at io.smallrye.mutiny.operators.uni.UniOnItemTransform.subscribe(UniOnItemTransform.java:22)
        at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:36)
        at io.smallrye.mutiny.operators.uni.UniOnItemTransform.subscribe(UniOnItemTransform.java:22)
        at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:36)
        at io.smallrye.mutiny.operators.uni.UniFailOnTimeout.subscribe(UniFailOnTimeout.java:36)
        at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:36)
        at io.smallrye.mutiny.operators.uni.UniOnFailureFlatMap.subscribe(UniOnFailureFlatMap.java:31)
        at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:36)
        at io.smallrye.mutiny.groups.UniSubscribe.withSubscriber(UniSubscribe.java:51)
        at io.smallrye.mutiny.groups.UniSubscribe.with(UniSubscribe.java:110)
        at io.smallrye.mutiny.groups.UniSubscribe.with(UniSubscribe.java:88)
        at org.jboss.resteasy.reactive.server.handlers.UniResponseHandler.handle(UniResponseHandler.java:21)
        at io.quarkus.resteasy.reactive.server.runtime.QuarkusResteasyReactiveRequestContext.invokeHandler(QuarkusResteasyReactiveRequestContext.java:150)
        at org.jboss.resteasy.reactive.common.core.AbstractResteasyReactiveContext.run(AbstractResteasyReactiveContext.java:147)
        at org.jboss.resteasy.reactive.server.handlers.RestInitialHandler.beginProcessing(RestInitialHandler.java:48)
        at org.jboss.resteasy.reactive.server.vertx.ResteasyReactiveVertxHandler.handle(ResteasyReactiveVertxHandler.java:23)
        at org.jboss.resteasy.reactive.server.vertx.ResteasyReactiveVertxHandler.handle(ResteasyReactiveVertxHandler.java:10)
        at io.vertx.ext.web.impl.RouteState.handleContext(RouteState.java:1285)
        at io.vertx.ext.web.impl.RoutingContextImplBase.iterateNext(RoutingContextImplBase.java:177)
        at io.vertx.ext.web.impl.RoutingContextImpl.next(RoutingContextImpl.java:140)
        at io.quarkus.vertx.http.runtime.options.HttpServerCommonHandlers$1.handle(HttpServerCommonHandlers.java:62)
        at io.quarkus.vertx.http.runtime.options.HttpServerCommonHandlers$1.handle(HttpServerCommonHandlers.java:40)
        at io.vertx.ext.web.impl.RouteState.handleContext(RouteState.java:1285)
        at io.vertx.ext.web.impl.RoutingContextImplBase.iterateNext(RoutingContextImplBase.java:177)
        at io.vertx.ext.web.impl.RoutingContextImpl.next(RoutingContextImpl.java:140)
        at io.quarkus.resteasy.reactive.server.runtime.ResteasyReactiveRecorder$13.handle(ResteasyReactiveRecorder.java:358)
        at io.quarkus.resteasy.reactive.server.runtime.ResteasyReactiveRecorder$13.handle(ResteasyReactiveRecorder.java:347)
        at io.vertx.ext.web.impl.RouteState.handleContext(RouteState.java:1285)
        at io.vertx.ext.web.impl.RoutingContextImplBase.iterateNext(RoutingContextImplBase.java:177)
        at io.vertx.ext.web.impl.RoutingContextImpl.next(RoutingContextImpl.java:140)
        at io.quarkus.vertx.http.runtime.devmode.VertxHttpHotReplacementSetup$5.handle(VertxHttpHotReplacementSetup.java:204)
        at io.quarkus.vertx.http.runtime.devmode.VertxHttpHotReplacementSetup$5.handle(VertxHttpHotReplacementSetup.java:192)
        at io.vertx.core.impl.future.FutureImpl$4.onSuccess(FutureImpl.java:176)
        at io.vertx.core.impl.future.FutureBase.lambda$emitSuccess$0(FutureBase.java:60)
        at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173)
        at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:166)
        at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(* Connection #0 to host localhost left intact SingleThreadEventExecutor.java:469)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:569)
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:994)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.base/java.lang.Thread.run(Thread.java:1583)

It might be caused when we get the publisher here https://github.com/smallrye/smallrye-reactive-messaging/blob/8427a52664c023341f24dd2a19ab7f1ca90a6f6d/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/reply/KafkaRequestReplyImpl.java#L155

Reproducing the bug

Clone https://github.com/Malandril/quarkus-kafka-bug

Run the application dev mode

quarkus dev

After 5 seconds (the default reply timeout) start the kafka broker with docker compose up -d.

When you query the endpoint /question curl -v localhost:8080/question it fails.

When you query the endpoint /question-2 curl -v localhost:8080/question-2 it works as expected.

When you query the endpoint /answers curl -v localhost:8080/answers it works as expected.

Expected behaviour

The KafkaRequestReply should be able to recover from kafka connection failure on startup. And should work after the kafka connection has been restored. It should be the same behaviour as a classic Emitter.

ozangunalp commented 1 month ago

Thanks for reporting this. You've touched an important point on how to use the Kafka request-reply feature. For context, that wait is there to make sure that the reply consumer is actually listening for replies. Currently, it waits on subscription (wiring time), with the goal of preserving the order of sent requests.

We can introduce another timeout option and even maybe another flag to whether wait or not on partition assignment.

Your feedback would be appreciated.

Malandril commented 1 month ago

I think a different timeout property would make sense for the initial subscription, and a way to disable it. But would it fix the issue ? If the subscription occurs later wouldn't the KafkaRequestReply still enter in an unrecoverable Illegal state in case of an unavailability of the Kafka broker ?

ozangunalp commented 3 weeks ago

The current behavior is described in here. The waiting only makes sense if you've auto.offset.reset=latest.

I think at least having an option to disable the wait makes sense. The subscription occurs on wiring time (runtime startup) because the subscriber for that emitter is the outgoing Kafka channel.