quarkusio / quarkus

Quarkus: Supersonic Subatomic Java.
https://quarkus.io
Apache License 2.0
13.84k stars 2.7k forks source link

Kafka serializer cannot be autodetected when injecting an Instance #44500

Closed rokkolesa closed 1 week ago

rokkolesa commented 1 week ago

Describe the bug

When injecting an Emitter or MutinyEmitter wrapped in Instance, the serializer is not autodetected and instead fails the application startup.

Expected behavior

I expect that it should not matter if the emitter is injected directly or via Instance regarding the serializer autodetection.

Actual behavior

The application startup fails with the error:

 Failed to start quarkus: io.quarkus.dev.appstate.ApplicationStartException: java.lang.RuntimeException: Failed to start quarkus
        at io.quarkus.dev.appstate.ApplicationStateNotification.waitForApplicationStart(ApplicationStateNotification.java:63)
        at io.quarkus.runner.bootstrap.StartupActionImpl.runMainClass(StartupActionImpl.java:142)
        at io.quarkus.deployment.dev.IsolatedDevModeMain.restartApp(IsolatedDevModeMain.java:202)
        at io.quarkus.deployment.dev.IsolatedDevModeMain.restartCallback(IsolatedDevModeMain.java:183)
        at io.quarkus.deployment.dev.RuntimeUpdatesProcessor.doScan(RuntimeUpdatesProcessor.java:555)
        at io.quarkus.deployment.console.ConsoleStateManager.forceRestart(ConsoleStateManager.java:175)
        at io.quarkus.deployment.console.ConsoleStateManager.lambda$installBuiltins$0(ConsoleStateManager.java:112)
        at io.quarkus.deployment.console.ConsoleStateManager$1.accept(ConsoleStateManager.java:77)
        at io.quarkus.deployment.console.ConsoleStateManager$1.accept(ConsoleStateManager.java:49)
        at io.quarkus.deployment.console.AeshConsole.lambda$setup$1(AeshConsole.java:278)
        at org.aesh.terminal.EventDecoder.accept(EventDecoder.java:118)
        at org.aesh.terminal.EventDecoder.accept(EventDecoder.java:31)
        at org.aesh.terminal.io.Decoder.write(Decoder.java:133)
        at org.aesh.readline.tty.terminal.TerminalConnection.openBlocking(TerminalConnection.java:216)
        at org.aesh.readline.tty.terminal.TerminalConnection.openBlocking(TerminalConnection.java:203)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
        at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: java.lang.RuntimeException: Failed to start quarkus
        at io.quarkus.runner.ApplicationImpl.doStart(Unknown Source)
        at io.quarkus.runtime.Application.start(Application.java:101)
        at io.quarkus.runtime.ApplicationLifecycleManager.run(ApplicationLifecycleManager.java:121)
        at io.quarkus.runtime.Quarkus.run(Quarkus.java:71)
        at io.quarkus.runtime.Quarkus.run(Quarkus.java:44)
        at io.quarkus.runtime.Quarkus.run(Quarkus.java:124)
        at io.quarkus.runner.GeneratedMain.main(Unknown Source)
        at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
        at java.base/java.lang.reflect.Method.invoke(Method.java:580)
        at io.quarkus.runner.bootstrap.StartupActionImpl$1.run(StartupActionImpl.java:116)
        ... 1 more
Caused by: jakarta.enterprise.inject.spi.DeploymentException: java.lang.IllegalArgumentException: The attribute `value.serializer` on connector 'smallrye-kafka' (channel: foo_out) must be set
        at io.quarkus.smallrye.reactivemessaging.runtime.SmallRyeReactiveMessagingLifecycle.onApplicationStart(SmallRyeReactiveMessagingLifecycle.java:58)
        at io.quarkus.smallrye.reactivemessaging.runtime.SmallRyeReactiveMessagingLifecycle_Observer_onApplicationStart_qTrMuLFyQ1IvGfeSxRVitl6CCBQ.notify(Unknown Source)
        at io.quarkus.arc.impl.EventImpl$Notifier.notifyObservers(EventImpl.java:351)
        at io.quarkus.arc.impl.EventImpl$Notifier.notify(EventImpl.java:333)
        at io.quarkus.arc.impl.EventImpl.fire(EventImpl.java:80)
        at io.quarkus.arc.runtime.ArcRecorder.fireLifecycleEvent(ArcRecorder.java:163)
        at io.quarkus.arc.runtime.ArcRecorder.handleLifecycleEvents(ArcRecorder.java:114)
        at io.quarkus.deployment.steps.LifecycleEventsBuildStep$startupEvent1144526294.deploy_0(Unknown Source)
        at io.quarkus.deployment.steps.LifecycleEventsBuildStep$startupEvent1144526294.deploy(Unknown Source)
        ... 11 more
Caused by: java.lang.IllegalArgumentException: The attribute `value.serializer` on connector 'smallrye-kafka' (channel: foo_out) must be set
        at io.smallrye.reactive.messaging.kafka.KafkaConnectorOutgoingConfiguration.lambda$getValueSerializer$0(KafkaConnectorOutgoingConfiguration.java:40)
        at java.base/java.util.Optional.orElseThrow(Optional.java:403)
        at io.smallrye.reactive.messaging.kafka.KafkaConnectorOutgoingConfiguration.getValueSerializer(KafkaConnectorOutgoingConfiguration.java:40)
        at io.smallrye.reactive.messaging.kafka.KafkaConnectorOutgoingConfiguration.validate(KafkaConnectorOutgoingConfiguration.java:295)
        at io.smallrye.reactive.messaging.kafka.KafkaConnectorOutgoingConfiguration.<init>(KafkaConnectorOutgoingConfiguration.java:16)
        at io.smallrye.reactive.messaging.kafka.KafkaConnector.getSubscriber(KafkaConnector.java:272)
        at io.smallrye.reactive.messaging.kafka.KafkaConnector_Subclass.getSubscriber$$superforward(Unknown Source)
        at io.smallrye.reactive.messaging.kafka.KafkaConnector_Subclass$$function$$10.apply(Unknown Source)
        at io.quarkus.arc.impl.AroundInvokeInvocationContext.proceed(AroundInvokeInvocationContext.java:73)
        at io.quarkus.arc.impl.AroundInvokeInvocationContext$NextAroundInvokeInvocationContext.proceed(AroundInvokeInvocationContext.java:97)
        at io.quarkus.smallrye.reactivemessaging.runtime.devmode.DevModeSupportConnectorFactoryInterceptor.intercept(DevModeSupportConnectorFactoryInterceptor.java:97)
        at io.quarkus.smallrye.reactivemessaging.runtime.devmode.DevModeSupportConnectorFactoryInterceptor_Bean.intercept(Unknown Source)
        at io.quarkus.arc.impl.InterceptorInvocation.invoke(InterceptorInvocation.java:42)
        at io.quarkus.arc.impl.AroundInvokeInvocationContext.proceed(AroundInvokeInvocationContext.java:70)
        at io.quarkus.arc.impl.AroundInvokeInvocationContext.proceed(AroundInvokeInvocationContext.java:62)
        at io.quarkus.smallrye.reactivemessaging.runtime.DuplicatedContextConnectorFactoryInterceptor.intercept(DuplicatedContextConnectorFactoryInterceptor.java:37)
        at io.quarkus.smallrye.reactivemessaging.runtime.DuplicatedContextConnectorFactoryInterceptor_Bean.intercept(Unknown Source)
        at io.quarkus.arc.impl.InterceptorInvocation.invoke(InterceptorInvocation.java:42)
        at io.quarkus.arc.impl.AroundInvokeInvocationContext.perform(AroundInvokeInvocationContext.java:30)
        at io.quarkus.arc.impl.InvocationContexts.performAroundInvoke(InvocationContexts.java:27)
        at io.smallrye.reactive.messaging.kafka.KafkaConnector_Subclass.getSubscriber(Unknown Source)
        at io.smallrye.reactive.messaging.kafka.KafkaConnector_ClientProxy.getSubscriber(Unknown Source)
        at io.smallrye.reactive.messaging.providers.impl.ConfiguredChannelFactory.createSubscriber(ConfiguredChannelFactory.java:226)
        at io.smallrye.reactive.messaging.providers.impl.ConfiguredChannelFactory.register(ConfiguredChannelFactory.java:167)
        at io.smallrye.reactive.messaging.providers.impl.ConfiguredChannelFactory.initialize(ConfiguredChannelFactory.java:115)
        at io.smallrye.reactive.messaging.providers.impl.ConfiguredChannelFactory_ClientProxy.initialize(Unknown Source)
        at java.base/java.util.Iterator.forEachRemaining(Iterator.java:133)
        at java.base/java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1939)
        at java.base/java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:762)
        at io.smallrye.reactive.messaging.providers.extension.MediatorManager.start(MediatorManager.java:250)
        at io.smallrye.reactive.messaging.providers.extension.MediatorManager_ClientProxy.start(Unknown Source)
        at io.quarkus.smallrye.reactivemessaging.runtime.SmallRyeReactiveMessagingLifecycle.onApplicationStart(SmallRyeReactiveMessagingLifecycle.java:53)
        ... 19 more

If i instead inject the emitter directly, the error is not thrown and the emitter works correctly.

How to Reproduce?

Using the below snippet causes an exception at startup, even if the bean is not used.

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Instance;

import org.eclipse.microprofile.reactive.messaging.Channel;

import io.smallrye.reactive.messaging.MutinyEmitter;

@ApplicationScoped
public class Foo
{
    @Channel("foo_out")
    Instance<MutinyEmitter<String>> emitter;

    public void emit()
    {
        emitter.get().sendAndAwait("bar");
    }
}

Output of uname -a or ver

No response

Output of java -version

openjdk version "21.0.5" 2024-10-15 OpenJDK Runtime Environment Homebrew (build 21.0.5) OpenJDK 64-Bit Server VM Homebrew (build 21.0.5, mixed mode, sharing)

Quarkus version or git rev

3.16.3

Build tool (ie. output of mvnw --version or gradlew --version)

Apache Maven 3.9.9

Additional information

I can work around this issue by specifying mp.messaging.outgoing.foo_out.value.serializer=io.quarkus.kafka.client.serialization.ObjectMapperSerializer in application.properties.

quarkus-bot[bot] commented 1 week ago

/cc @alesj (kafka), @cescoffier (kafka), @ozangunalp (kafka)

cescoffier commented 1 week ago

@ozangunalp @Ladicek Do you think we can extend the detection to handle this case?

Ladicek commented 1 week ago

Yes, that should be possible.

ozangunalp commented 1 week ago

For the Instance injection we do support the recognition of the Emitter in Quarkus (as opposed to upstream), so that's not a problem. For serde detection, it should not be a problem.