Sidecar application used by Confluent for VS Code, as a local proxy for Confluent Cloud, Confluent Platform and local Kafka clusters, to help users build streaming applications.
Apache License 2.0
3
stars
3
forks
source link
Create new KafkaConsumer instance for every consume request #145
Opening more than one message viewer tabs for topics within the same cluster would be met with a "Something went wrong" in VS Code. From the sidecar logs (see expand below), we see that a ConcurrentModificationException is raised by a KafkaConsumer client that is tried to be used in a different thread than where it was created.
ConcurrentModificationException log
```
[io.qua.ver.htt.run.QuarkusErrorHandler] (executor-thread-10) HTTP Request to /gateway/v1/clusters/4L6g3nShT-eMCtK--X86sw/topics/_schemas/partitions/-/consume failed, error id: 32d33f88-c23c-4e7b-be40-87f460f8a434-1: java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access. currentThread(name: executor-thread-10, id: 80) otherThread(id: 76)
at org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.acquire(LegacyKafkaConsumer.java:1226)
at org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.acquireAndEnsureOpen(LegacyKafkaConsumer.java:1207)
at org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.partitionsFor(LegacyKafkaConsumer.java:914)
at org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.partitionsFor(LegacyKafkaConsumer.java:909)
at org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(KafkaConsumer.java:1434)
at io.confluent.idesidecar.restapi.messageviewer.SimpleConsumer.getPartitionCount(SimpleConsumer.java:122)
at io.confluent.idesidecar.restapi.messageviewer.SimpleConsumer.consume(SimpleConsumer.java:63)
at io.confluent.idesidecar.restapi.messageviewer.strategy.NativeConsumeStrategy.consumeMessages(NativeConsumeStrategy.java:56)
at io.confluent.idesidecar.restapi.messageviewer.strategy.NativeConsumeStrategy.lambda$execute$0(NativeConsumeStrategy.java:35)
at io.vertx.core.impl.ContextImpl.lambda$executeBlocking$0(ContextImpl.java:178)
at io.vertx.core.impl.ContextInternal.dispatch(ContextInternal.java:270)
at io.vertx.core.impl.ContextImpl.lambda$internalExecuteBlocking$2(ContextImpl.java:210)
at io.vertx.core.impl.TaskQueue.run(TaskQueue.java:76)
at io.quarkus.vertx.core.runtime.VertxCoreRecorder$14.runWith(VertxCoreRecorder.java:635)
at org.jboss.threads.EnhancedQueueExecutor$Task.doRunWith(EnhancedQueueExecutor.java:2516)
at org.jboss.threads.EnhancedQueueExecutor$Task.run(EnhancedQueueExecutor.java:2495)
at org.jboss.threads.EnhancedQueueExecutor$ThreadBody.run(EnhancedQueueExecutor.java:1521)
at org.jboss.threads.DelegatingRunnable.run(DelegatingRunnable.java:11)
at org.jboss.threads.ThreadLocalResettingRunnable.run(ThreadLocalResettingRunnable.java:11)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base@21.0.2/java.lang.Thread.runWith(Thread.java:1596)
at java.base@21.0.2/java.lang.Thread.run(Thread.java:1583)
at org.graalvm.nativeimage.builder/com.oracle.svm.core.thread.PlatformThreads.threadStartRoutine(PlatformThreads.java:833)
at org.graalvm.nativeimage.builder/com.oracle.svm.core.posix.thread.PosixPlatformThreads.pthreadStartRoutine(PosixPlatformThreads.java:211)
```
The KafkaConsumer instances are not thread safe. We should not cache these instances at the cluster/config level, since concurrent consume requests for topic(s) in the same cluster will lead to re-use of the instance across worker threads.
Summary of Changes
Remove the KafkaConsumerClients class that caches client instances by cluster id and configuration overrides.
Instead, we use a KafkaConsumerFactory that encapsulates the logic for creating a new KafkaConsumer instance.
Pull request checklist
Please check if your PR fulfills the following (if applicable):
Tests:
[x] Added new
[ ] Updated existing
[ ] Deleted existing
[x] Have you validated this change locally against a running instance of the Quarkus dev server?
make quarkus-dev
[x] Have you validated this change against a locally running native executable?
make mvn-package-native && ./target/ide-sidecar-*-runner
Background
Opening more than one message viewer tabs for topics within the same cluster would be met with a "Something went wrong" in VS Code. From the sidecar logs (see expand below), we see that a
ConcurrentModificationException
is raised by a KafkaConsumer client that is tried to be used in a different thread than where it was created.ConcurrentModificationException log
``` [io.qua.ver.htt.run.QuarkusErrorHandler] (executor-thread-10) HTTP Request to /gateway/v1/clusters/4L6g3nShT-eMCtK--X86sw/topics/_schemas/partitions/-/consume failed, error id: 32d33f88-c23c-4e7b-be40-87f460f8a434-1: java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access. currentThread(name: executor-thread-10, id: 80) otherThread(id: 76) at org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.acquire(LegacyKafkaConsumer.java:1226) at org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.acquireAndEnsureOpen(LegacyKafkaConsumer.java:1207) at org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.partitionsFor(LegacyKafkaConsumer.java:914) at org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.partitionsFor(LegacyKafkaConsumer.java:909) at org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(KafkaConsumer.java:1434) at io.confluent.idesidecar.restapi.messageviewer.SimpleConsumer.getPartitionCount(SimpleConsumer.java:122) at io.confluent.idesidecar.restapi.messageviewer.SimpleConsumer.consume(SimpleConsumer.java:63) at io.confluent.idesidecar.restapi.messageviewer.strategy.NativeConsumeStrategy.consumeMessages(NativeConsumeStrategy.java:56) at io.confluent.idesidecar.restapi.messageviewer.strategy.NativeConsumeStrategy.lambda$execute$0(NativeConsumeStrategy.java:35) at io.vertx.core.impl.ContextImpl.lambda$executeBlocking$0(ContextImpl.java:178) at io.vertx.core.impl.ContextInternal.dispatch(ContextInternal.java:270) at io.vertx.core.impl.ContextImpl.lambda$internalExecuteBlocking$2(ContextImpl.java:210) at io.vertx.core.impl.TaskQueue.run(TaskQueue.java:76) at io.quarkus.vertx.core.runtime.VertxCoreRecorder$14.runWith(VertxCoreRecorder.java:635) at org.jboss.threads.EnhancedQueueExecutor$Task.doRunWith(EnhancedQueueExecutor.java:2516) at org.jboss.threads.EnhancedQueueExecutor$Task.run(EnhancedQueueExecutor.java:2495) at org.jboss.threads.EnhancedQueueExecutor$ThreadBody.run(EnhancedQueueExecutor.java:1521) at org.jboss.threads.DelegatingRunnable.run(DelegatingRunnable.java:11) at org.jboss.threads.ThreadLocalResettingRunnable.run(ThreadLocalResettingRunnable.java:11) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.base@21.0.2/java.lang.Thread.runWith(Thread.java:1596) at java.base@21.0.2/java.lang.Thread.run(Thread.java:1583) at org.graalvm.nativeimage.builder/com.oracle.svm.core.thread.PlatformThreads.threadStartRoutine(PlatformThreads.java:833) at org.graalvm.nativeimage.builder/com.oracle.svm.core.posix.thread.PosixPlatformThreads.pthreadStartRoutine(PosixPlatformThreads.java:211) ```The
KafkaConsumer
instances are not thread safe. We should not cache these instances at the cluster/config level, since concurrent consume requests for topic(s) in the same cluster will lead to re-use of the instance across worker threads.Summary of Changes
KafkaConsumerClients
class that caches client instances by cluster id and configuration overrides.KafkaConsumerFactory
that encapsulates the logic for creating a newKafkaConsumer
instance.Pull request checklist
Please check if your PR fulfills the following (if applicable):