spring-projects / spring-boot

Spring Boot helps you to create Spring-powered, production-grade applications and services with absolute minimum fuss.
https://spring.io/projects/spring-boot
Apache License 2.0
75.14k stars 40.68k forks source link

@ServiceConnection not working as expected #42312

Closed corneil closed 1 month ago

corneil commented 1 month ago

In the process of creating a stream application to test it as a native container I discovered the @ServiceConnection is not working as expected.

I am sharing the repo. https://github.com/corneil/test-processor It contains a Java and Kotlin version of the processor.

The java project contains com.example.testprocessor.testcontainers.TestProcessorServiceConnectionTests that fails with @ServiceConnection There are versions using @EmbeddedKafka and traditional @Testcontainers with @DynamicPropertySource

I also discovered that some tests fail unless forkEvery is 1

Reproduce

./build-test.sh
./build-fail.sh
philwebb commented 1 month ago

@corneil The error I'm getting is as follows. Is that the same one you have?

Sep 14, 2024 12:48:20 PM org.junit.platform.launcher.core.LauncherConfigurationParameters loadClasspathResource
WARNING: Discovered 2 'junit-platform.properties' configuration files in the classpath; only the first will be used.
Sep 14, 2024 12:48:20 PM org.junit.platform.launcher.core.LauncherConfigurationParameters loadClasspathResource
WARNING: Discovered 2 'junit-platform.properties' configuration files in the classpath; only the first will be used.

  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/

 :: Spring Boot ::                (v3.3.3)

2024-09-14T12:48:20.878-07:00  INFO 73173 --- [test-processor] [           main] .t.t.TestProcessorServiceConnectionTests : Starting TestProcessorServiceConnectionTests using Java 17.0.12 with PID 73173 (started by pwebb in /Users/pwebb/projects/spring-boot/samples/gh-42312/test-processor/java)
2024-09-14T12:48:20.879-07:00 DEBUG 73173 --- [test-processor] [           main] .t.t.TestProcessorServiceConnectionTests : Running with Spring Boot v3.3.3, Spring v6.1.12
2024-09-14T12:48:20.879-07:00  INFO 73173 --- [test-processor] [           main] .t.t.TestProcessorServiceConnectionTests : No active profile set, falling back to 1 default profile: "default"
2024-09-14T12:48:49.317-07:00  INFO 73173 --- [test-processor] [           main] c.e.t.t.TestProcessorTestBase            : kafkaContainer:created:confluentinc/cp-kafka:latest
2024-09-14T12:50:36.683-07:00 ERROR 73173 --- [test-processor] [           main] o.s.cloud.stream.binding.BindingService  : Failed to create producer binding; retrying in 30 seconds

org.springframework.cloud.stream.provisioning.ProvisioningException: Provisioning exception encountered for output-queue
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopic(KafkaTopicProvisioner.java:377) ~[spring-cloud-stream-binder-kafka-core-4.1.3.jar:4.1.3]
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.provisionProducerDestination(KafkaTopicProvisioner.java:197) ~[spring-cloud-stream-binder-kafka-core-4.1.3.jar:4.1.3]
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.provisionProducerDestination(KafkaTopicProvisioner.java:96) ~[spring-cloud-stream-binder-kafka-core-4.1.3.jar:4.1.3]
    at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindProducer(AbstractMessageChannelBinder.java:298) ~[spring-cloud-stream-4.1.3.jar:4.1.3]
    at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindProducer(AbstractMessageChannelBinder.java:103) ~[spring-cloud-stream-4.1.3.jar:4.1.3]
    at org.springframework.cloud.stream.binder.AbstractBinder.bindProducer(AbstractBinder.java:153) ~[spring-cloud-stream-4.1.3.jar:4.1.3]
    at org.springframework.cloud.stream.binding.BindingService.doBindProducer(BindingService.java:353) ~[spring-cloud-stream-4.1.3.jar:4.1.3]
    at org.springframework.cloud.stream.binding.BindingService.bindProducer(BindingService.java:294) ~[spring-cloud-stream-4.1.3.jar:4.1.3]
    at org.springframework.cloud.stream.binding.BindingService.bindProducer(BindingService.java:311) ~[spring-cloud-stream-4.1.3.jar:4.1.3]
    at org.springframework.cloud.stream.binding.BindingService.bindProducer(BindingService.java:315) ~[spring-cloud-stream-4.1.3.jar:4.1.3]
    at org.springframework.cloud.stream.binding.AbstractBindableProxyFactory.createAndBindOutputs(AbstractBindableProxyFactory.java:115) ~[spring-cloud-stream-4.1.3.jar:4.1.3]
    at org.springframework.cloud.stream.binding.OutputBindingLifecycle.doStartWithBindable(OutputBindingLifecycle.java:58) ~[spring-cloud-stream-4.1.3.jar:4.1.3]
    at java.base/java.util.LinkedHashMap$LinkedValues.forEach(LinkedHashMap.java:647) ~[na:na]
    at org.springframework.cloud.stream.binding.AbstractBindingLifecycle.start(AbstractBindingLifecycle.java:59) ~[spring-cloud-stream-4.1.3.jar:4.1.3]
    at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:285) ~[spring-context-6.1.12.jar:6.1.12]
    at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:469) ~[spring-context-6.1.12.jar:6.1.12]
    at java.base/java.lang.Iterable.forEach(Iterable.java:75) ~[na:na]
    at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:257) ~[spring-context-6.1.12.jar:6.1.12]
    at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:202) ~[spring-context-6.1.12.jar:6.1.12]
    at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:990) ~[spring-context-6.1.12.jar:6.1.12]
    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:628) ~[spring-context-6.1.12.jar:6.1.12]
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:754) ~[spring-boot-3.3.3.jar:3.3.3]
    at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:456) ~[spring-boot-3.3.3.jar:3.3.3]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:335) ~[spring-boot-3.3.3.jar:3.3.3]
    at org.springframework.boot.test.context.SpringBootContextLoader.lambda$loadContext$3(SpringBootContextLoader.java:137) ~[spring-boot-test-3.3.3.jar:3.3.3]
    at org.springframework.util.function.ThrowingSupplier.get(ThrowingSupplier.java:58) ~[spring-core-6.1.12.jar:6.1.12]
    at org.springframework.util.function.ThrowingSupplier.get(ThrowingSupplier.java:46) ~[spring-core-6.1.12.jar:6.1.12]
    at org.springframework.boot.SpringApplication.withHook(SpringApplication.java:1463) ~[spring-boot-3.3.3.jar:3.3.3]
    at org.springframework.boot.test.context.SpringBootContextLoader$ContextLoaderHook.run(SpringBootContextLoader.java:553) ~[spring-boot-test-3.3.3.jar:3.3.3]
    at org.springframework.boot.test.context.SpringBootContextLoader.loadContext(SpringBootContextLoader.java:137) ~[spring-boot-test-3.3.3.jar:3.3.3]
    at org.springframework.boot.test.context.SpringBootContextLoader.loadContext(SpringBootContextLoader.java:108) ~[spring-boot-test-3.3.3.jar:3.3.3]
    at org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate.loadContextInternal(DefaultCacheAwareContextLoaderDelegate.java:225) ~[spring-test-6.1.12.jar:6.1.12]
    at org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate.loadContext(DefaultCacheAwareContextLoaderDelegate.java:152) ~[spring-test-6.1.12.jar:6.1.12]
    at org.springframework.test.context.support.DefaultTestContext.getApplicationContext(DefaultTestContext.java:130) ~[spring-test-6.1.12.jar:6.1.12]
    at org.springframework.test.context.support.DependencyInjectionTestExecutionListener.injectDependencies(DependencyInjectionTestExecutionListener.java:142) ~[spring-test-6.1.12.jar:6.1.12]
    at org.springframework.test.context.support.DependencyInjectionTestExecutionListener.prepareTestInstance(DependencyInjectionTestExecutionListener.java:98) ~[spring-test-6.1.12.jar:6.1.12]
    at org.springframework.test.context.TestContextManager.prepareTestInstance(TestContextManager.java:260) ~[spring-test-6.1.12.jar:6.1.12]
    at org.springframework.test.context.junit.jupiter.SpringExtension.postProcessTestInstance(SpringExtension.java:163) ~[spring-test-6.1.12.jar:6.1.12]
    at org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.lambda$invokeTestInstancePostProcessors$10(ClassBasedTestDescriptor.java:378) ~[junit-jupiter-engine-5.10.3.jar:5.10.3]
    at org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.executeAndMaskThrowable(ClassBasedTestDescriptor.java:383) ~[junit-jupiter-engine-5.10.3.jar:5.10.3]
    at org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.lambda$invokeTestInstancePostProcessors$11(ClassBasedTestDescriptor.java:378) ~[junit-jupiter-engine-5.10.3.jar:5.10.3]
    at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197) ~[na:na]
    at java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:179) ~[na:na]
    at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1625) ~[na:na]
    at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509) ~[na:na]
    at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499) ~[na:na]
    at java.base/java.util.stream.StreamSpliterators$WrappingSpliterator.forEachRemaining(StreamSpliterators.java:310) ~[na:na]
    at java.base/java.util.stream.Streams$ConcatSpliterator.forEachRemaining(Streams.java:735) ~[na:na]
    at java.base/java.util.stream.Streams$ConcatSpliterator.forEachRemaining(Streams.java:734) ~[na:na]
    at java.base/java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:762) ~[na:na]
    at org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.invokeTestInstancePostProcessors(ClassBasedTestDescriptor.java:377) ~[junit-jupiter-engine-5.10.3.jar:5.10.3]
    at org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.lambda$instantiateAndPostProcessTestInstance$6(ClassBasedTestDescriptor.java:290) ~[junit-jupiter-engine-5.10.3.jar:5.10.3]
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) ~[junit-platform-engine-1.10.3.jar:1.10.3]
    at org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.instantiateAndPostProcessTestInstance(ClassBasedTestDescriptor.java:289) ~[junit-jupiter-engine-5.10.3.jar:5.10.3]
    at org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.lambda$testInstancesProvider$4(ClassBasedTestDescriptor.java:279) ~[junit-jupiter-engine-5.10.3.jar:5.10.3]
    at java.base/java.util.Optional.orElseGet(Optional.java:364) ~[na:na]
    at org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.lambda$testInstancesProvider$5(ClassBasedTestDescriptor.java:278) ~[junit-jupiter-engine-5.10.3.jar:5.10.3]
    at org.junit.jupiter.engine.execution.TestInstancesProvider.getTestInstances(TestInstancesProvider.java:31) ~[junit-jupiter-engine-5.10.3.jar:5.10.3]
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$prepare$0(TestMethodTestDescriptor.java:106) ~[junit-jupiter-engine-5.10.3.jar:5.10.3]
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) ~[junit-platform-engine-1.10.3.jar:1.10.3]
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.prepare(TestMethodTestDescriptor.java:105) ~[junit-jupiter-engine-5.10.3.jar:5.10.3]
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.prepare(TestMethodTestDescriptor.java:69) ~[junit-jupiter-engine-5.10.3.jar:5.10.3]
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$prepare$2(NodeTestTask.java:123) ~[junit-platform-engine-1.10.3.jar:1.10.3]
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) ~[junit-platform-engine-1.10.3.jar:1.10.3]
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.prepare(NodeTestTask.java:123) ~[junit-platform-engine-1.10.3.jar:1.10.3]
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:90) ~[junit-platform-engine-1.10.3.jar:1.10.3]
    at java.base/java.util.ArrayList.forEach(ArrayList.java:1511) ~[na:na]
    at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41) ~[junit-platform-engine-1.10.3.jar:1.10.3]
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155) ~[junit-platform-engine-1.10.3.jar:1.10.3]
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) ~[junit-platform-engine-1.10.3.jar:1.10.3]
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141) ~[junit-platform-engine-1.10.3.jar:1.10.3]
    at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) ~[junit-platform-engine-1.10.3.jar:1.10.3]
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139) ~[junit-platform-engine-1.10.3.jar:1.10.3]
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) ~[junit-platform-engine-1.10.3.jar:1.10.3]
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138) ~[junit-platform-engine-1.10.3.jar:1.10.3]
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95) ~[junit-platform-engine-1.10.3.jar:1.10.3]
    at java.base/java.util.ArrayList.forEach(ArrayList.java:1511) ~[na:na]
    at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41) ~[junit-platform-engine-1.10.3.jar:1.10.3]
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155) ~[junit-platform-engine-1.10.3.jar:1.10.3]
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) ~[junit-platform-engine-1.10.3.jar:1.10.3]
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141) ~[junit-platform-engine-1.10.3.jar:1.10.3]
    at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) ~[junit-platform-engine-1.10.3.jar:1.10.3]
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139) ~[junit-platform-engine-1.10.3.jar:1.10.3]
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) ~[junit-platform-engine-1.10.3.jar:1.10.3]
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138) ~[junit-platform-engine-1.10.3.jar:1.10.3]
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95) ~[junit-platform-engine-1.10.3.jar:1.10.3]
    at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:35) ~[junit-platform-engine-1.10.3.jar:1.10.3]
    at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57) ~[junit-platform-engine-1.10.3.jar:1.10.3]
    at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:54) ~[junit-platform-engine-1.10.3.jar:1.10.3]
    at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:198) ~[junit-platform-launcher-1.10.3.jar:1.10.3]
    at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:169) ~[junit-platform-launcher-1.10.3.jar:1.10.3]
    at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:93) ~[junit-platform-launcher-1.10.3.jar:1.10.3]
    at org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:58) ~[junit-platform-launcher-1.10.3.jar:1.10.3]
    at org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:141) ~[junit-platform-launcher-1.10.3.jar:1.10.3]
    at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:57) ~[junit-platform-launcher-1.10.3.jar:1.10.3]
    at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:103) ~[junit-platform-launcher-1.10.3.jar:1.10.3]
    at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:94) ~[junit-platform-launcher-1.10.3.jar:1.10.3]
    at org.junit.platform.launcher.core.DelegatingLauncher.execute(DelegatingLauncher.java:52) ~[junit-platform-launcher-1.10.3.jar:1.10.3]
    at org.junit.platform.launcher.core.SessionPerRequestLauncher.execute(SessionPerRequestLauncher.java:70) ~[junit-platform-launcher-1.10.3.jar:1.10.3]
    at org.eclipse.jdt.internal.junit5.runner.JUnit5TestReference.run(JUnit5TestReference.java:100) ~[.cp/:na]
    at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:40) ~[.cp/:na]
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:529) ~[.cp/:na]
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:757) ~[.cp/:na]
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:452) ~[.cp/:na]
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:210) ~[.cp/:na]
Caused by: java.util.concurrent.TimeoutException: null
    at java.base/java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1960) ~[na:na]
    at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2095) ~[na:na]
    at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:180) ~[kafka-clients-3.7.1.jar:na]
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopicAndPartitions(KafkaTopicProvisioner.java:413) ~[spring-cloud-stream-binder-kafka-core-4.1.3.jar:4.1.3]
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopicIfNecessary(KafkaTopicProvisioner.java:387) ~[spring-cloud-stream-binder-kafka-core-4.1.3.jar:4.1.3]
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopic(KafkaTopicProvisioner.java:364) ~[spring-cloud-stream-binder-kafka-core-4.1.3.jar:4.1.3]
    ... 104 common frames omitted

2024-09-14T12:51:36.822-07:00 ERROR 73173 --- [test-processor] [           main] o.s.cloud.stream.binding.BindingService  : Failed to create consumer binding; retrying in 30 seconds

org.springframework.cloud.stream.provisioning.ProvisioningException: Provisioning exception encountered for input-queue
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopic(KafkaTopicProvisioner.java:377) ~[spring-cloud-stream-binder-kafka-core-4.1.3.jar:4.1.3]
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.doProvisionConsumerDestination(KafkaTopicProvisioner.java:246) ~[spring-cloud-stream-binder-kafka-core-4.1.3.jar:4.1.3]
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.provisionConsumerDestination(KafkaTopicProvisioner.java:211) ~[spring-cloud-stream-binder-kafka-core-4.1.3.jar:4.1.3]
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.provisionConsumerDestination(KafkaTopicProvisioner.java:96) ~[spring-cloud-stream-binder-kafka-core-4.1.3.jar:4.1.3]
    at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindConsumer(AbstractMessageChannelBinder.java:524) ~[spring-cloud-stream-4.1.3.jar:4.1.3]
    at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindConsumer(AbstractMessageChannelBinder.java:103) ~[spring-cloud-stream-4.1.3.jar:4.1.3]
    at org.springframework.cloud.stream.binder.AbstractBinder.bindConsumer(AbstractBinder.java:144) ~[spring-cloud-stream-4.1.3.jar:4.1.3]
    at org.springframework.cloud.stream.binding.BindingService.doBindConsumer(BindingService.java:186) ~[spring-cloud-stream-4.1.3.jar:4.1.3]
    at org.springframework.cloud.stream.binding.BindingService.bindConsumer(BindingService.java:139) ~[spring-cloud-stream-4.1.3.jar:4.1.3]
    at org.springframework.cloud.stream.binding.AbstractBindableProxyFactory.createAndBindInputs(AbstractBindableProxyFactory.java:98) ~[spring-cloud-stream-4.1.3.jar:4.1.3]
    at org.springframework.cloud.stream.binding.InputBindingLifecycle.doStartWithBindable(InputBindingLifecycle.java:58) ~[spring-cloud-stream-4.1.3.jar:4.1.3]
    at java.base/java.util.LinkedHashMap$LinkedValues.forEach(LinkedHashMap.java:647) ~[na:na]
    at org.springframework.cloud.stream.binding.AbstractBindingLifecycle.start(AbstractBindingLifecycle.java:59) ~[spring-cloud-stream-4.1.3.jar:4.1.3]
    at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:285) ~[spring-context-6.1.12.jar:6.1.12]
    at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:469) ~[spring-context-6.1.12.jar:6.1.12]
    at java.base/java.lang.Iterable.forEach(Iterable.java:75) ~[na:na]
    at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:257) ~[spring-context-6.1.12.jar:6.1.12]
    at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:202) ~[spring-context-6.1.12.jar:6.1.12]
    at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:990) ~[spring-context-6.1.12.jar:6.1.12]
    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:628) ~[spring-context-6.1.12.jar:6.1.12]
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:754) ~[spring-boot-3.3.3.jar:3.3.3]
    at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:456) ~[spring-boot-3.3.3.jar:3.3.3]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:335) ~[spring-boot-3.3.3.jar:3.3.3]
    at org.springframework.boot.test.context.SpringBootContextLoader.lambda$loadContext$3(SpringBootContextLoader.java:137) ~[spring-boot-test-3.3.3.jar:3.3.3]
    at org.springframework.util.function.ThrowingSupplier.get(ThrowingSupplier.java:58) ~[spring-core-6.1.12.jar:6.1.12]
    at org.springframework.util.function.ThrowingSupplier.get(ThrowingSupplier.java:46) ~[spring-core-6.1.12.jar:6.1.12]
    at org.springframework.boot.SpringApplication.withHook(SpringApplication.java:1463) ~[spring-boot-3.3.3.jar:3.3.3]
    at org.springframework.boot.test.context.SpringBootContextLoader$ContextLoaderHook.run(SpringBootContextLoader.java:553) ~[spring-boot-test-3.3.3.jar:3.3.3]
    at org.springframework.boot.test.context.SpringBootContextLoader.loadContext(SpringBootContextLoader.java:137) ~[spring-boot-test-3.3.3.jar:3.3.3]
    at org.springframework.boot.test.context.SpringBootContextLoader.loadContext(SpringBootContextLoader.java:108) ~[spring-boot-test-3.3.3.jar:3.3.3]
    at org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate.loadContextInternal(DefaultCacheAwareContextLoaderDelegate.java:225) ~[spring-test-6.1.12.jar:6.1.12]
    at org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate.loadContext(DefaultCacheAwareContextLoaderDelegate.java:152) ~[spring-test-6.1.12.jar:6.1.12]
    at org.springframework.test.context.support.DefaultTestContext.getApplicationContext(DefaultTestContext.java:130) ~[spring-test-6.1.12.jar:6.1.12]
    at org.springframework.test.context.support.DependencyInjectionTestExecutionListener.injectDependencies(DependencyInjectionTestExecutionListener.java:142) ~[spring-test-6.1.12.jar:6.1.12]
    at org.springframework.test.context.support.DependencyInjectionTestExecutionListener.prepareTestInstance(DependencyInjectionTestExecutionListener.java:98) ~[spring-test-6.1.12.jar:6.1.12]
    at org.springframework.test.context.TestContextManager.prepareTestInstance(TestContextManager.java:260) ~[spring-test-6.1.12.jar:6.1.12]
    at org.springframework.test.context.junit.jupiter.SpringExtension.postProcessTestInstance(SpringExtension.java:163) ~[spring-test-6.1.12.jar:6.1.12]
    at org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.lambda$invokeTestInstancePostProcessors$10(ClassBasedTestDescriptor.java:378) ~[junit-jupiter-engine-5.10.3.jar:5.10.3]
    at org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.executeAndMaskThrowable(ClassBasedTestDescriptor.java:383) ~[junit-jupiter-engine-5.10.3.jar:5.10.3]
    at org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.lambda$invokeTestInstancePostProcessors$11(ClassBasedTestDescriptor.java:378) ~[junit-jupiter-engine-5.10.3.jar:5.10.3]
    at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197) ~[na:na]
    at java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:179) ~[na:na]
    at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1625) ~[na:na]
    at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509) ~[na:na]
    at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499) ~[na:na]
    at java.base/java.util.stream.StreamSpliterators$WrappingSpliterator.forEachRemaining(StreamSpliterators.java:310) ~[na:na]
    at java.base/java.util.stream.Streams$ConcatSpliterator.forEachRemaining(Streams.java:735) ~[na:na]
    at java.base/java.util.stream.Streams$ConcatSpliterator.forEachRemaining(Streams.java:734) ~[na:na]
    at java.base/java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:762) ~[na:na]
    at org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.invokeTestInstancePostProcessors(ClassBasedTestDescriptor.java:377) ~[junit-jupiter-engine-5.10.3.jar:5.10.3]
    at org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.lambda$instantiateAndPostProcessTestInstance$6(ClassBasedTestDescriptor.java:290) ~[junit-jupiter-engine-5.10.3.jar:5.10.3]
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) ~[junit-platform-engine-1.10.3.jar:1.10.3]
    at org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.instantiateAndPostProcessTestInstance(ClassBasedTestDescriptor.java:289) ~[junit-jupiter-engine-5.10.3.jar:5.10.3]
    at org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.lambda$testInstancesProvider$4(ClassBasedTestDescriptor.java:279) ~[junit-jupiter-engine-5.10.3.jar:5.10.3]
    at java.base/java.util.Optional.orElseGet(Optional.java:364) ~[na:na]
    at org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.lambda$testInstancesProvider$5(ClassBasedTestDescriptor.java:278) ~[junit-jupiter-engine-5.10.3.jar:5.10.3]
    at org.junit.jupiter.engine.execution.TestInstancesProvider.getTestInstances(TestInstancesProvider.java:31) ~[junit-jupiter-engine-5.10.3.jar:5.10.3]
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$prepare$0(TestMethodTestDescriptor.java:106) ~[junit-jupiter-engine-5.10.3.jar:5.10.3]
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) ~[junit-platform-engine-1.10.3.jar:1.10.3]
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.prepare(TestMethodTestDescriptor.java:105) ~[junit-jupiter-engine-5.10.3.jar:5.10.3]
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.prepare(TestMethodTestDescriptor.java:69) ~[junit-jupiter-engine-5.10.3.jar:5.10.3]
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$prepare$2(NodeTestTask.java:123) ~[junit-platform-engine-1.10.3.jar:1.10.3]
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) ~[junit-platform-engine-1.10.3.jar:1.10.3]
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.prepare(NodeTestTask.java:123) ~[junit-platform-engine-1.10.3.jar:1.10.3]
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:90) ~[junit-platform-engine-1.10.3.jar:1.10.3]
    at java.base/java.util.ArrayList.forEach(ArrayList.java:1511) ~[na:na]
    at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41) ~[junit-platform-engine-1.10.3.jar:1.10.3]
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155) ~[junit-platform-engine-1.10.3.jar:1.10.3]
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) ~[junit-platform-engine-1.10.3.jar:1.10.3]
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141) ~[junit-platform-engine-1.10.3.jar:1.10.3]
    at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) ~[junit-platform-engine-1.10.3.jar:1.10.3]
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139) ~[junit-platform-engine-1.10.3.jar:1.10.3]
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) ~[junit-platform-engine-1.10.3.jar:1.10.3]
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138) ~[junit-platform-engine-1.10.3.jar:1.10.3]
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95) ~[junit-platform-engine-1.10.3.jar:1.10.3]
    at java.base/java.util.ArrayList.forEach(ArrayList.java:1511) ~[na:na]
    at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41) ~[junit-platform-engine-1.10.3.jar:1.10.3]
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155) ~[junit-platform-engine-1.10.3.jar:1.10.3]
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) ~[junit-platform-engine-1.10.3.jar:1.10.3]
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141) ~[junit-platform-engine-1.10.3.jar:1.10.3]
    at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) ~[junit-platform-engine-1.10.3.jar:1.10.3]
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139) ~[junit-platform-engine-1.10.3.jar:1.10.3]
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) ~[junit-platform-engine-1.10.3.jar:1.10.3]
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138) ~[junit-platform-engine-1.10.3.jar:1.10.3]
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95) ~[junit-platform-engine-1.10.3.jar:1.10.3]
    at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:35) ~[junit-platform-engine-1.10.3.jar:1.10.3]
    at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57) ~[junit-platform-engine-1.10.3.jar:1.10.3]
    at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:54) ~[junit-platform-engine-1.10.3.jar:1.10.3]
    at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:198) ~[junit-platform-launcher-1.10.3.jar:1.10.3]
    at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:169) ~[junit-platform-launcher-1.10.3.jar:1.10.3]
    at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:93) ~[junit-platform-launcher-1.10.3.jar:1.10.3]
    at org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:58) ~[junit-platform-launcher-1.10.3.jar:1.10.3]
    at org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:141) ~[junit-platform-launcher-1.10.3.jar:1.10.3]
    at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:57) ~[junit-platform-launcher-1.10.3.jar:1.10.3]
    at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:103) ~[junit-platform-launcher-1.10.3.jar:1.10.3]
    at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:94) ~[junit-platform-launcher-1.10.3.jar:1.10.3]
    at org.junit.platform.launcher.core.DelegatingLauncher.execute(DelegatingLauncher.java:52) ~[junit-platform-launcher-1.10.3.jar:1.10.3]
    at org.junit.platform.launcher.core.SessionPerRequestLauncher.execute(SessionPerRequestLauncher.java:70) ~[junit-platform-launcher-1.10.3.jar:1.10.3]
    at org.eclipse.jdt.internal.junit5.runner.JUnit5TestReference.run(JUnit5TestReference.java:100) ~[.cp/:na]
    at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:40) ~[.cp/:na]
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:529) ~[.cp/:na]
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:757) ~[.cp/:na]
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:452) ~[.cp/:na]
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:210) ~[.cp/:na]
Caused by: java.util.concurrent.TimeoutException: null
    at java.base/java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1960) ~[na:na]
    at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2095) ~[na:na]
    at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:180) ~[kafka-clients-3.7.1.jar:na]
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopicAndPartitions(KafkaTopicProvisioner.java:413) ~[spring-cloud-stream-binder-kafka-core-4.1.3.jar:4.1.3]
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopicIfNecessary(KafkaTopicProvisioner.java:387) ~[spring-cloud-stream-binder-kafka-core-4.1.3.jar:4.1.3]
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopic(KafkaTopicProvisioner.java:364) ~[spring-cloud-stream-binder-kafka-core-4.1.3.jar:4.1.3]
    ... 103 common frames omitted

2024-09-14T12:51:36.942-07:00  INFO 73173 --- [test-processor] [           main] .t.t.TestProcessorServiceConnectionTests : Started TestProcessorServiceConnectionTests in 196.22 seconds (process running for 196.983)
OpenJDK 64-Bit Server VM warning: Sharing is only supported for boot loader classes because bootstrap classpath has been appended
2024-09-14T12:51:37.468-07:00  INFO 73173 --- [test-processor] [           main] c.e.t.t.TestProcessorTestBase            : sending TestInput{name='Joe', value=1.0} as {"name":"Joe","value":1.0}
2024-09-14T12:51:37.719-07:00  INFO 73173 --- [test-processor] [to input-queue]] c.e.t.t.TestProcessorTestBase            : sent:SendResult [producerRecord=ProducerRecord(topic=input-queue, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=null, value=[B@620ee6d, timestamp=null), recordMetadata=input-queue-0@0]
2024-09-14T12:52:06.719-07:00 ERROR 73173 --- [test-processor] [   scheduling-1] o.s.cloud.stream.binding.BindingService  : Failed to create producer binding; retrying in 30 seconds

org.springframework.cloud.stream.provisioning.ProvisioningException: Provisioning exception encountered for output-queue
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopic(KafkaTopicProvisioner.java:377) ~[spring-cloud-stream-binder-kafka-core-4.1.3.jar:4.1.3]
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.provisionProducerDestination(KafkaTopicProvisioner.java:197) ~[spring-cloud-stream-binder-kafka-core-4.1.3.jar:4.1.3]
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.provisionProducerDestination(KafkaTopicProvisioner.java:96) ~[spring-cloud-stream-binder-kafka-core-4.1.3.jar:4.1.3]
    at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindProducer(AbstractMessageChannelBinder.java:298) ~[spring-cloud-stream-4.1.3.jar:4.1.3]
    at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindProducer(AbstractMessageChannelBinder.java:103) ~[spring-cloud-stream-4.1.3.jar:4.1.3]
    at org.springframework.cloud.stream.binder.AbstractBinder.bindProducer(AbstractBinder.java:153) ~[spring-cloud-stream-4.1.3.jar:4.1.3]
    at org.springframework.cloud.stream.binding.BindingService.lambda$rescheduleProducerBinding$4(BindingService.java:376) ~[spring-cloud-stream-4.1.3.jar:4.1.3]
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) ~[spring-context-6.1.12.jar:6.1.12]
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) ~[na:na]
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ~[na:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[na:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:840) ~[na:na]
Caused by: java.util.concurrent.TimeoutException: null
    at java.base/java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1960) ~[na:na]
    at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2095) ~[na:na]
    at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:180) ~[kafka-clients-3.7.1.jar:na]
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopicAndPartitions(KafkaTopicProvisioner.java:413) ~[spring-cloud-stream-binder-kafka-core-4.1.3.jar:4.1.3]
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopicIfNecessary(KafkaTopicProvisioner.java:387) ~[spring-cloud-stream-binder-kafka-core-4.1.3.jar:4.1.3]
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopic(KafkaTopicProvisioner.java:364) ~[spring-cloud-stream-binder-kafka-core-4.1.3.jar:4.1.3]
    ... 13 common frames omitted

2024-09-14T12:52:06.722-07:00 ERROR 73173 --- [test-processor] [   scheduling-1] o.s.s.s.TaskUtils$LoggingErrorHandler    : Unexpected error occurred in scheduled task

org.springframework.core.task.TaskRejectedException: ExecutorService in shutdown state did not accept task: org.springframework.cloud.stream.binding.BindingService$$Lambda$1574/0x00000070016cae70@194127a5
    at org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler.schedule(ThreadPoolTaskScheduler.java:405) ~[spring-context-6.1.12.jar:6.1.12]
    at org.springframework.cloud.stream.binding.BindingService.scheduleTask(BindingService.java:432) ~[spring-cloud-stream-4.1.3.jar:4.1.3]
    at org.springframework.cloud.stream.binding.BindingService.rescheduleProducerBinding(BindingService.java:373) ~[spring-cloud-stream-4.1.3.jar:4.1.3]
    at org.springframework.cloud.stream.binding.BindingService.lambda$rescheduleProducerBinding$4(BindingService.java:379) ~[spring-cloud-stream-4.1.3.jar:4.1.3]
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) ~[spring-context-6.1.12.jar:6.1.12]
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) ~[na:na]
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ~[na:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[na:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:840) ~[na:na]
Caused by: java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@52e6fa77[Not completed, task = java.util.concurrent.Executors$RunnableAdapter@30c79434[Wrapped task = DelegatingErrorHandlingRunnable for org.springframework.cloud.stream.binding.BindingService$$Lambda$1574/0x00000070016cae70@194127a5]] rejected from org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler$1@29232092[Shutting down, pool size = 1, active threads = 1, queued tasks = 1, completed tasks = 0]
    at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2065) ~[na:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:833) ~[na:na]
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:340) ~[na:na]
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:562) ~[na:na]
    at org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler.schedule(ThreadPoolTaskScheduler.java:402) ~[spring-context-6.1.12.jar:6.1.12]
    ... 10 common frames omitted
philwebb commented 1 month ago

I think the service provider stuff is working as designed. I think the problem is that KafkaTopicProvisioner isn't using the KafkaConnectionDetails abstraction. Looking at the constructor, it calls kafkaProperties.buildAdminProperties() to build adminClientProperties. If you look at Spring Boot's KafkaAutoConfiguration, there's a applyKafkaConnectionDetailsForAdmin method to apply the connection details.

I think you'll need similar code in KafkaTopicProvisioner.

corneil commented 1 month ago

I think the service provider stuff is working as designed. I think the problem is that KafkaTopicProvisioner isn't using the KafkaConnectionDetails abstraction. Looking at the constructor, it calls kafkaProperties.buildAdminProperties() to build adminClientProperties. If you look at Spring Boot's KafkaAutoConfiguration, there's a applyKafkaConnectionDetailsForAdmin method to apply the connection details.

I think you'll need similar code in KafkaTopicProvisioner.

Isn't it the job of @ServiceConnection to take care of the auto configuration like @DynamicPropertySource does?

wilkinsona commented 1 month ago

It has. It's auto-configured a KafkaConnectionDetails bean. KafkaTopicProvisioner is ignoring that bean so it has had no effect. As Phil suggested, KafkaTopicProvisioner will have to be updated to consider the connection details.

corneil commented 1 month ago

In this version using @DynamicPropertySource we don't need to do anything else.

wilkinsona commented 1 month ago

Configuration properties are one way of altering the KafkaConnectionDetails bean. A service connection is another. Either way, you'll have to consider the connection details for that alteration to be noticed.

corneil commented 1 month ago

Am I misunderstanding the purpose of @ServiceConnection? I thought itnis supposed to take case of connection properties for a range of supported TestContainers.

wilkinsona commented 1 month ago

Slightly, yes.

The purpose of @ServiceConnection is to cause the auto-configuration for the service to connect to the container-managed service. That isn't done through configuration properties, though, it's done by configuring a …ConnectionDetails bean.

The …ConnectionDetails bean is used by the auto-configuration when it's configuring the bean(s) that connect to the service. In the absence of a …ConnectionDetails bean that's contributed by a @ServiceConnection, we auto-configure one that's backed by the configuration properties. In other words, a @ServiceConnection or some configuration properties are two different ways of influencing the connection settings that will be provided by a …ConnectionDetails bean but it's the …ConnectionDetails bean that's the canonical source for information on how to connect to the service.

If you're only looking at configuration properties, you're ignoring other possible sources of connection information (Testcontainers and Docker Compose being the main two).

philwebb commented 1 month ago

It works at a higher level than setting properties. That turns out to be problematic for a number of reasons, but the main one is you can end up with a mix of overrides (e.g. the connection URL from one source but the username/password from another).

To get around that problem we introduced the ConnectionDetails abstraction. This is a way for us to define as a Spring bean how connections to various technologies should be established. We retro-fitted all our auto-configuration classes to use ConnectionDetails beans. If there is not ConnectionDetails bean, then we create one that reads properties from the Environment. I think most of the info in this blog is still relevant: https://spring.io/blog/2023/06/19/spring-boot-31-connectiondetails-abstraction

What @ServiceConnection does is create an appropriate ConnectionDetails bean that is backed by Testcontainers.

corneil commented 1 month ago

Doesn't this mean that @ServiceConnection will result in the creation of KafkaConnectionDetails which should provide all that is needed to auto configure Kafka producer or consumer the same as with DynamicPropertySource.

philwebb commented 1 month ago

Doesn't this mean that @ServiceConnection will result in the creation of KafkaConnectionDetails which should provide all that is needed to auto configure Kafka producer or consumer the same as with DynamicPropertySource.

It's not the same as DynamicPropertySource. A DynamicPropertySource will add properties to the Environment, a @ServiceConnection will create a KafkaConnectionDetails bean. As a mentioned in the previous comment, that will work for us because of this code, but not for you because your constructor, it calls kafkaProperties.buildAdminProperties() to build adminClientProperties which is completely unaware of the KafkaConnectionDetails bean.

corneil commented 1 month ago

@philwebb I have created separate tests for Kafka and RabbitMQ to illustrate that @ServiceConnection works for RabbitMQ/AMQP but not for Kafka.

wilkinsona commented 1 month ago

@corneil Have you changed the code in the Kafka binder to take the connections details into account as Phil suggested above? Until you've done that, it's to be expected that it doesn't work. If you've made those changes and it's still not working, please share the Kafka test with us and we can take a look.

corneil commented 1 month ago

@wilkinsona @philwebb You will see kafka and rabbit tests under test-processor With both combinations of @ServiceConnection and @DynamicPropertySource and the only failure is the Kafka ServiceConnection test.

philwebb commented 1 month ago

It looks to be like RabbitExchangeQueueProvisioner doesn't have the same kind of logic as KafkaTopicProvisioner. Specifically, it's not injecting a configuration properties class and attempting to establish the connection itself. If you put a breakpoint on RabbitMqContainerConnectionDetails.getAddresses() you can see it's our auto-configuration that's calling this one.

wilkinsona commented 1 month ago

We seem to be going round in circles here. In the hope of breaking of that loop, I've opened a PR that should fix the problem in the Kafka stream binder. The changes in the PR update the binder so that it considers any KafkaConnectionDetails bean when making an admin, consumer, or producer connection to Kafka. If any further discussion is needed, let's have it on the PR, please, as this problem is specific to Spring Cloud Stream. As such, it's off-topic for most people watching this repository.