quarkusio / quarkus

Quarkus: Supersonic Subatomic Java.
https://quarkus.io
Apache License 2.0
13.73k stars 2.67k forks source link

How to test a service that integrates with kafka? #6427

Closed FreifeldRoyi closed 4 years ago

FreifeldRoyi commented 4 years ago

I hope this is not extremely out of topic, but I'm trying to figure out the best way to consume a Kafka test message and compare it to an expected message.

A few thoughts (that might be wrong):

I'm stuck with these tests at the moment. I'd be happy for some help on how to do that...

cescoffier commented 4 years ago

@FreifeldRoyi did you look at https://github.com/quarkusio/quarkus-quickstarts/blob/master/kafka-quickstart/src/test/java/org/acme/quarkus/sample/PriceResourceTest.java?

Also in the upcoming version of SmallRye reactive messaging, we provide an in-memory connector that simplifies these tests (and about using kafka broker)

zeljkot commented 4 years ago

@cescoffier how to override Kafka connector with in-memory connector? I tried mp.messaging.incoming.some_topic.connector=smallrye-in-memory but I got

2020-01-28 08:20:16,107 ERROR [io.sma.rea.mes.imp.ConfiguredChannelFactory] (main) Unable to create the publisher or subscriber during initialization: java.lang.IllegalArgumentException: Unknown connector for some_topic.
    at io.smallrye.reactive.messaging.impl.ConfiguredChannelFactory.lambda$createPublisherBuilder$6(ConfiguredChannelFactory.java:143)
    at java.util.Optional.orElseThrow(Optional.java:290)
    at io.smallrye.reactive.messaging.impl.ConfiguredChannelFactory.createPublisherBuilder(ConfiguredChannelFactory.java:143)
    at io.smallrye.reactive.messaging.impl.ConfiguredChannelFactory.lambda$register$4(ConfiguredChannelFactory.java:123)
    at java.util.HashMap.forEach(HashMap.java:1289)
    at io.smallrye.reactive.messaging.impl.ConfiguredChannelFactory.register(ConfiguredChannelFactory.java:123)
    at io.smallrye.reactive.messaging.impl.ConfiguredChannelFactory.initialize(ConfiguredChannelFactory.java:118)

It seems there is no io/smallrye/reactive/messaging/connector/InMemoryConnector.java class in in quarkus-smallrye-reactive-messaging*.

cescoffier commented 4 years ago

Yes, it's not released yet.

zeljkot commented 4 years ago

Do you have a rough estimate when it will be released?

Legion2 commented 4 years ago

SmallRye reactive messaging 2.0.1 was release and I tried https://smallrye.io/smallrye-reactive-messaging/smallrye-reactive-messaging/2/testing/testing.html with quarkus 1.4.1. But I get the exception:

java.lang.RuntimeException: java.lang.RuntimeException: io.quarkus.builder.BuildException: Build failure: Build failed due to errors
    [error]: Build step io.quarkus.arc.deployment.ArcProcessor#validate threw an exception: javax.enterprise.inject.spi.DeploymentException: javax.enterprise.inject.UnsatisfiedResolutionException: Unsatisfied dependency for type io.smallrye.reactive.messaging.connectors.InMemoryConnector and qualifiers [@Any]
    - java member: de.uni_stuttgart.tik.viplab.websocket_api.ComputationWebSocketIntegrationTest#connector
    - declared on CLASS bean [types=[de.uni_stuttgart.tik.viplab.websocket_api.ComputationWebSocketIntegrationTest, java.lang.Object], qualifiers=[@Default, @Any], target=de.uni_stuttgart.tik.viplab.websocket_api.ComputationWebSocketIntegrationTest]
    at io.quarkus.arc.processor.BeanDeployment.processErrors(BeanDeployment.java:973)
    at io.quarkus.arc.processor.BeanDeployment.init(BeanDeployment.java:240)
    at io.quarkus.arc.processor.BeanProcessor.initialize(BeanProcessor.java:135)
    at io.quarkus.arc.deployment.ArcProcessor.validate(ArcProcessor.java:358)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:567)
    at io.quarkus.deployment.ExtensionLoader$2.execute(ExtensionLoader.java:931)
    at io.quarkus.builder.BuildContext.run(BuildContext.java:277)
    at org.jboss.threads.ContextClassLoaderSavingRunnable.run(ContextClassLoaderSavingRunnable.java:35)
    at org.jboss.threads.EnhancedQueueExecutor.safeRun(EnhancedQueueExecutor.java:2046)
    at org.jboss.threads.EnhancedQueueExecutor$ThreadBody.doRunTask(EnhancedQueueExecutor.java:1578)
    at org.jboss.threads.EnhancedQueueExecutor$ThreadBody.run(EnhancedQueueExecutor.java:1452)
    at java.base/java.lang.Thread.run(Thread.java:830)
    at org.jboss.threads.JBossThread.run(JBossThread.java:479)
Caused by: javax.enterprise.inject.UnsatisfiedResolutionException: Unsatisfied dependency for type io.smallrye.reactive.messaging.connectors.InMemoryConnector and qualifiers [@Any]
    - java member: de.uni_stuttgart.tik.viplab.websocket_api.ComputationWebSocketIntegrationTest#connector
    - declared on CLASS bean [types=[de.uni_stuttgart.tik.viplab.websocket_api.ComputationWebSocketIntegrationTest, java.lang.Object], qualifiers=[@Default, @Any], target=de.uni_stuttgart.tik.viplab.websocket_api.ComputationWebSocketIntegrationTest]
    at io.quarkus.arc.processor.Beans.resolveInjectionPoint(Beans.java:486)
    at io.quarkus.arc.processor.BeanInfo.init(BeanInfo.java:362)
    at io.quarkus.arc.processor.BeanDeployment.init(BeanDeployment.java:232)
    ... 14 more

    at io.quarkus.test.junit.QuarkusTestExtension.beforeEach(QuarkusTestExtension.java:255)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeBeforeEachCallbacks$1(TestMethodTestDescriptor.java:161)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeBeforeMethodsOrCallbacksUntilExceptionOccurs$5(TestMethodTestDescriptor.java:197)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeBeforeMethodsOrCallbacksUntilExceptionOccurs(TestMethodTestDescriptor.java:197)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeBeforeEachCallbacks(TestMethodTestDescriptor.java:160)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:131)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:71)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:135)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:125)
    at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:135)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:123)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:122)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:80)
    at java.base/java.util.ArrayList.forEach(ArrayList.java:1507)
    at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:139)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:125)
    at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:135)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:123)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:122)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:80)
    at java.base/java.util.ArrayList.forEach(ArrayList.java:1507)
    at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:139)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:125)
    at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:135)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:123)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:122)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:80)
    at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:32)
    at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
    at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:51)
    at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:248)
    at org.junit.platform.launcher.core.DefaultLauncher.lambda$execute$5(DefaultLauncher.java:211)
    at org.junit.platform.launcher.core.DefaultLauncher.withInterceptedStreams(DefaultLauncher.java:226)
    at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:199)
    at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:141)
    at org.eclipse.jdt.internal.junit5.runner.JUnit5TestReference.run(JUnit5TestReference.java:98)
    at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:41)
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:542)
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:770)
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:464)
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:210)
Caused by: java.lang.RuntimeException: io.quarkus.builder.BuildException: Build failure: Build failed due to errors
    [error]: Build step io.quarkus.arc.deployment.ArcProcessor#validate threw an exception: javax.enterprise.inject.spi.DeploymentException: javax.enterprise.inject.UnsatisfiedResolutionException: Unsatisfied dependency for type io.smallrye.reactive.messaging.connectors.InMemoryConnector and qualifiers [@Any]
    - java member: de.uni_stuttgart.tik.viplab.websocket_api.ComputationWebSocketIntegrationTest#connector
    - declared on CLASS bean [types=[de.uni_stuttgart.tik.viplab.websocket_api.ComputationWebSocketIntegrationTest, java.lang.Object], qualifiers=[@Default, @Any], target=de.uni_stuttgart.tik.viplab.websocket_api.ComputationWebSocketIntegrationTest]
    at io.quarkus.arc.processor.BeanDeployment.processErrors(BeanDeployment.java:973)
    at io.quarkus.arc.processor.BeanDeployment.init(BeanDeployment.java:240)
    at io.quarkus.arc.processor.BeanProcessor.initialize(BeanProcessor.java:135)
    at io.quarkus.arc.deployment.ArcProcessor.validate(ArcProcessor.java:358)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:567)
    at io.quarkus.deployment.ExtensionLoader$2.execute(ExtensionLoader.java:931)
    at io.quarkus.builder.BuildContext.run(BuildContext.java:277)
    at org.jboss.threads.ContextClassLoaderSavingRunnable.run(ContextClassLoaderSavingRunnable.java:35)
    at org.jboss.threads.EnhancedQueueExecutor.safeRun(EnhancedQueueExecutor.java:2046)
    at org.jboss.threads.EnhancedQueueExecutor$ThreadBody.doRunTask(EnhancedQueueExecutor.java:1578)
    at org.jboss.threads.EnhancedQueueExecutor$ThreadBody.run(EnhancedQueueExecutor.java:1452)
    at java.base/java.lang.Thread.run(Thread.java:830)
    at org.jboss.threads.JBossThread.run(JBossThread.java:479)
Caused by: javax.enterprise.inject.UnsatisfiedResolutionException: Unsatisfied dependency for type io.smallrye.reactive.messaging.connectors.InMemoryConnector and qualifiers [@Any]
    - java member: de.uni_stuttgart.tik.viplab.websocket_api.ComputationWebSocketIntegrationTest#connector
    - declared on CLASS bean [types=[de.uni_stuttgart.tik.viplab.websocket_api.ComputationWebSocketIntegrationTest, java.lang.Object], qualifiers=[@Default, @Any], target=de.uni_stuttgart.tik.viplab.websocket_api.ComputationWebSocketIntegrationTest]
    at io.quarkus.arc.processor.Beans.resolveInjectionPoint(Beans.java:486)
    at io.quarkus.arc.processor.BeanInfo.init(BeanInfo.java:362)
    at io.quarkus.arc.processor.BeanDeployment.init(BeanDeployment.java:232)
    ... 14 more

    at io.quarkus.runner.bootstrap.AugmentActionImpl.runAugment(AugmentActionImpl.java:247)
    at io.quarkus.runner.bootstrap.AugmentActionImpl.createInitialRuntimeApplication(AugmentActionImpl.java:130)
    at io.quarkus.runner.bootstrap.AugmentActionImpl.createInitialRuntimeApplication(AugmentActionImpl.java:52)
    at io.quarkus.test.junit.QuarkusTestExtension.doJavaStart(QuarkusTestExtension.java:131)
    at io.quarkus.test.junit.QuarkusTestExtension.ensureStarted(QuarkusTestExtension.java:269)
    at io.quarkus.test.junit.QuarkusTestExtension.beforeAll(QuarkusTestExtension.java:292)
    at org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.lambda$invokeBeforeAllCallbacks$7(ClassBasedTestDescriptor.java:359)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.invokeBeforeAllCallbacks(ClassBasedTestDescriptor.java:359)
    at org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.before(ClassBasedTestDescriptor.java:189)
    at org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.before(ClassBasedTestDescriptor.java:78)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:132)
    ... 31 more
Caused by: io.quarkus.builder.BuildException: Build failure: Build failed due to errors
    [error]: Build step io.quarkus.arc.deployment.ArcProcessor#validate threw an exception: javax.enterprise.inject.spi.DeploymentException: javax.enterprise.inject.UnsatisfiedResolutionException: Unsatisfied dependency for type io.smallrye.reactive.messaging.connectors.InMemoryConnector and qualifiers [@Any]
    - java member: de.uni_stuttgart.tik.viplab.websocket_api.ComputationWebSocketIntegrationTest#connector
    - declared on CLASS bean [types=[de.uni_stuttgart.tik.viplab.websocket_api.ComputationWebSocketIntegrationTest, java.lang.Object], qualifiers=[@Default, @Any], target=de.uni_stuttgart.tik.viplab.websocket_api.ComputationWebSocketIntegrationTest]
    at io.quarkus.arc.processor.BeanDeployment.processErrors(BeanDeployment.java:973)
    at io.quarkus.arc.processor.BeanDeployment.init(BeanDeployment.java:240)
    at io.quarkus.arc.processor.BeanProcessor.initialize(BeanProcessor.java:135)
    at io.quarkus.arc.deployment.ArcProcessor.validate(ArcProcessor.java:358)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:567)
    at io.quarkus.deployment.ExtensionLoader$2.execute(ExtensionLoader.java:931)
    at io.quarkus.builder.BuildContext.run(BuildContext.java:277)
    at org.jboss.threads.ContextClassLoaderSavingRunnable.run(ContextClassLoaderSavingRunnable.java:35)
    at org.jboss.threads.EnhancedQueueExecutor.safeRun(EnhancedQueueExecutor.java:2046)
    at org.jboss.threads.EnhancedQueueExecutor$ThreadBody.doRunTask(EnhancedQueueExecutor.java:1578)
    at org.jboss.threads.EnhancedQueueExecutor$ThreadBody.run(EnhancedQueueExecutor.java:1452)
    at java.base/java.lang.Thread.run(Thread.java:830)
    at org.jboss.threads.JBossThread.run(JBossThread.java:479)
Caused by: javax.enterprise.inject.UnsatisfiedResolutionException: Unsatisfied dependency for type io.smallrye.reactive.messaging.connectors.InMemoryConnector and qualifiers [@Any]
    - java member: de.uni_stuttgart.tik.viplab.websocket_api.ComputationWebSocketIntegrationTest#connector
    - declared on CLASS bean [types=[de.uni_stuttgart.tik.viplab.websocket_api.ComputationWebSocketIntegrationTest, java.lang.Object], qualifiers=[@Default, @Any], target=de.uni_stuttgart.tik.viplab.websocket_api.ComputationWebSocketIntegrationTest]
    at io.quarkus.arc.processor.Beans.resolveInjectionPoint(Beans.java:486)
    at io.quarkus.arc.processor.BeanInfo.init(BeanInfo.java:362)
    at io.quarkus.arc.processor.BeanDeployment.init(BeanDeployment.java:232)
    ... 14 more

    at io.quarkus.builder.Execution.run(Execution.java:115)
    at io.quarkus.builder.BuildExecutionBuilder.execute(BuildExecutionBuilder.java:79)
    at io.quarkus.deployment.QuarkusAugmentor.run(QuarkusAugmentor.java:156)
    at io.quarkus.runner.bootstrap.AugmentActionImpl.runAugment(AugmentActionImpl.java:245)
    ... 42 more
Caused by: javax.enterprise.inject.spi.DeploymentException: javax.enterprise.inject.UnsatisfiedResolutionException: Unsatisfied dependency for type io.smallrye.reactive.messaging.connectors.InMemoryConnector and qualifiers [@Any]
    - java member: de.uni_stuttgart.tik.viplab.websocket_api.ComputationWebSocketIntegrationTest#connector
    - declared on CLASS bean [types=[de.uni_stuttgart.tik.viplab.websocket_api.ComputationWebSocketIntegrationTest, java.lang.Object], qualifiers=[@Default, @Any], target=de.uni_stuttgart.tik.viplab.websocket_api.ComputationWebSocketIntegrationTest]
    at io.quarkus.arc.processor.BeanDeployment.processErrors(BeanDeployment.java:973)
    at io.quarkus.arc.processor.BeanDeployment.init(BeanDeployment.java:240)
    at io.quarkus.arc.processor.BeanProcessor.initialize(BeanProcessor.java:135)
    at io.quarkus.arc.deployment.ArcProcessor.validate(ArcProcessor.java:358)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:567)
    at io.quarkus.deployment.ExtensionLoader$2.execute(ExtensionLoader.java:931)
    at io.quarkus.builder.BuildContext.run(BuildContext.java:277)
    at org.jboss.threads.ContextClassLoaderSavingRunnable.run(ContextClassLoaderSavingRunnable.java:35)
    at org.jboss.threads.EnhancedQueueExecutor.safeRun(EnhancedQueueExecutor.java:2046)
    at org.jboss.threads.EnhancedQueueExecutor$ThreadBody.doRunTask(EnhancedQueueExecutor.java:1578)
    at org.jboss.threads.EnhancedQueueExecutor$ThreadBody.run(EnhancedQueueExecutor.java:1452)
    at java.base/java.lang.Thread.run(Thread.java:830)
    at org.jboss.threads.JBossThread.run(JBossThread.java:479)
Caused by: javax.enterprise.inject.UnsatisfiedResolutionException: Unsatisfied dependency for type io.smallrye.reactive.messaging.connectors.InMemoryConnector and qualifiers [@Any]
    - java member: de.uni_stuttgart.tik.viplab.websocket_api.ComputationWebSocketIntegrationTest#connector
    - declared on CLASS bean [types=[de.uni_stuttgart.tik.viplab.websocket_api.ComputationWebSocketIntegrationTest, java.lang.Object], qualifiers=[@Default, @Any], target=de.uni_stuttgart.tik.viplab.websocket_api.ComputationWebSocketIntegrationTest]
    at io.quarkus.arc.processor.Beans.resolveInjectionPoint(Beans.java:486)
    at io.quarkus.arc.processor.BeanInfo.init(BeanInfo.java:362)
    at io.quarkus.arc.processor.BeanDeployment.init(BeanDeployment.java:232)
    ... 14 more
cescoffier commented 4 years ago

You are right, some recent change broke it. I will fix it before next release.

For the record: It requires the in-memory connector to be indexed.

FreifeldRoyi commented 4 years ago

WOOHOO! Thnx @cescoffier

Legion2 commented 4 years ago

Now I get java.lang.IllegalArgumentException: Unknown channel computations when calling connector.sink("computations"); in the test. I think the channels are configured and connected before the @BeforeAll is called and so the in-memory connector is not used. Log output before @BeforeAll is called in a @QuarkusTest

Mai 03, 2020 11:41:52 AM org.jboss.threads.Version <clinit>
INFO: JBoss Threads version 3.1.1.Final
Mai 03, 2020 11:41:54 AM io.undertow.websockets.jsr.ServerWebSocketContainer addEndpointInternal
INFO: UT026003: Adding annotated server endpoint class de.uni_stuttgart.tik.viplab.websocket_api.ComputationWebSocket for path /computations
Mai 03, 2020 11:41:54 AM org.jboss.threads.Version <clinit>
INFO: JBoss Threads version 3.1.1.Final
Mai 03, 2020 11:41:54 AM io.smallrye.reactive.messaging.extension.MediatorManager initializeAndRun
INFO: Deployment done... start processing
Mai 03, 2020 11:41:54 AM io.smallrye.reactive.messaging.impl.ConfiguredChannelFactory <init>
INFO: Found incoming connectors: [smallrye-amqp, smallrye-in-memory]
Mai 03, 2020 11:41:54 AM io.smallrye.reactive.messaging.impl.ConfiguredChannelFactory <init>
INFO: Found outgoing connectors: [smallrye-amqp, smallrye-in-memory]
Mai 03, 2020 11:41:54 AM io.smallrye.reactive.messaging.impl.ConfiguredChannelFactory initialize
INFO: Channel manager initializing...
Mai 03, 2020 11:41:54 AM io.smallrye.reactive.messaging.amqp.AmqpConnector getClient
INFO: AMQP broker configured to localhost:5672 for channel computations
Mai 03, 2020 11:41:54 AM io.smallrye.reactive.messaging.extension.MediatorManager initializeAndRun
INFO: Initializing mediators
Mai 03, 2020 11:41:54 AM io.smallrye.reactive.messaging.extension.MediatorManager weaving
INFO: Connecting mediators
Mai 03, 2020 11:41:54 AM io.smallrye.reactive.messaging.extension.MediatorManager weaving
INFO: Connecting emitter to sink computations
Mai 03, 2020 11:41:55 AM io.quarkus.runtime.Timing printStartupTime
INFO: Quarkus 1.4.1.Final started in 2.781s. Listening on: http://0.0.0.0:8081
Mai 03, 2020 11:41:55 AM io.quarkus.runtime.Timing printStartupTime
INFO: Profile test activated. 
Mai 03, 2020 11:41:55 AM io.quarkus.runtime.Timing printStartupTime
INFO: Installed features: [cdi, mutiny, resteasy, scheduler, servlet, smallrye-context-propagation, smallrye-health, smallrye-metrics, smallrye-reactive-messaging, smallrye-reactive-messaging-amqp, smallrye-reactive-streams-operators, undertow-websockets, vertx]

You can see INFO: AMQP broker configured to localhost:5672 for channel computations that the channel "computations" uses the amqp connector.

Workaround: Adding %test.mp.messaging.outgoing.computations.connector=smallrye-in-memory to the application.properties solve the problem.

cescoffier commented 4 years ago

It's still not integrated in Quarkus. I'm going to open the PR today.

cescoffier commented 4 years ago

You also need to switch the channel as indicated on https://smallrye.io/smallrye-reactive-messaging/smallrye-reactive-messaging/2/testing/testing.html.

Legion2 commented 4 years ago

I called InMemoryConnector.switchOutgoingChannelsToInMemory("computations"); here is the source code: https://github.com/VirtualProgrammingLab/viplab-websocket-api/blob/test-parameters/websocket-api-impl/src/test/java/de/uni_stuttgart/tik/viplab/websocket_api/ComputationWebSocketTest.java#L60

cescoffier commented 4 years ago

Did you try from a Quarkus Test Resource, as @BeforeAll might be called after the application start?

Legion2 commented 4 years ago

@cescoffier I don't know what you mean by "Quarkus Test Resource", I have a test class annotated with @QuarkusTest and a method annotated with @BeforeAll which switch to in memory connector and a method annotated with @Test. I stepped through the test with the debugger and the @BeforeAll method is called but I think to late.

cescoffier commented 4 years ago

@Legion2 something like https://github.com/quarkusio/quarkus-quickstarts/blob/master/kafka-quickstart/src/test/java/org/acme/kafka/KafkaResource.java, but instead of starting a container, switch the channels.

cescoffier commented 4 years ago

@Legion2 something like:

/**
 * Use the in-memory connector to avoid having to use a broker.
 */
public class FakeKafkaResource implements QuarkusTestResourceLifecycleManager {

    @Override
    public Map<String, String> start() {
        Map<String, String> env = new HashMap<>();
        Map<String, String> props1 = InMemoryConnector.switchIncomingChannelsToInMemory("prices");
        Map<String, String> props2 = InMemoryConnector.switchOutgoingChannelsToInMemory("my-data-stream");
        env.putAll(props1);
        env.putAll(props2);
        return env;
    }

    @Override
    public void stop() {
         InMemoryConnector.clear();
    }
}
Legion2 commented 4 years ago

@cescoffier Thanks the test now works with the QuarkusTestResource.

Kasokaso commented 2 years ago

Hi all, Is there any real test with InMemoryConnector? It seems it doesn't work for me. I've tried to implement a test using this example:https://quarkus.io/guides/kafka#testing-without-a-broker and getting the error:

Caused by: javax.enterprise.inject.UnsatisfiedResolutionException: Unsatisfied dependency for type io.smallrye.reactive.messaging.providers.connectors.InMemoryConnector and qualifiers [@Default]

cescoffier commented 2 years ago

You are probably missing the @Any annotation when injecting the in-memory connector

Kasokaso commented 2 years ago

@cescoffier Thanks, it works now :)

fcauti commented 1 year ago

@cescoffier that helped me too, thanks

hantsy commented 11 months ago

I do not know how the dev service affects the reactive messaging infrastructure, I used the same codes to test message flow against an in-memory connector in the env with different message broker in dev services.

https://github.com/hantsy/quarkus-sandbox/tree/master/pulsar/src/test/java/com/example

Only in the pulsar example, the in-memory test MessageHandlerTest failed. In other example with amqp, kafka, rabbitmq, it did not work.