akka / alpakka-kafka

Alpakka Kafka connector - Alpakka is a Reactive Enterprise Integration library for Java and Scala, based on Reactive Streams and Akka.
https://doc.akka.io/libraries/alpakka-kafka/current/
Other
1.42k stars 386 forks source link

[Consumer] OutOfMemoryError interrupting the heartbeat thread #306

Open cchantep opened 7 years ago

cchantep commented 7 years ago

Related to KAFKA-3552, network latency on consumer side can cause an OutOfMemoryError in the heartbeat thread, which is fatal.

Uncaught error from thread [Tests-akka.kafka.default-dispatcher-147] shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for ActorSystem[Tests]
java.lang.OutOfMemoryError: Direct buffer memory
    at java.nio.Bits.reserveMemory(Bits.java:658)
    at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
    at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
    at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:174)
    at sun.nio.ch.IOUtil.read(IOUtil.java:195)
    at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
    at org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:110)
    at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:97)
    at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
    at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:169)
    at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:150)
    at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:355)
    at org.apache.kafka.common.network.Selector.poll(Selector.java:303)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1047)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
    at akka.kafka.KafkaConsumerActor.tryPoll$1(KafkaConsumerActor.scala:243)
    at akka.kafka.KafkaConsumerActor.poll(KafkaConsumerActor.scala:289)
    at akka.kafka.KafkaConsumerActor.akka$kafka$KafkaConsumerActor$$receivePoll(KafkaConsumerActor.scala:218)
    at akka.kafka.KafkaConsumerActor$$anonfun$receive$1.applyOrElse(KafkaConsumerActor.scala:137)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
    at akka.kafka.KafkaConsumerActor.aroundReceive(KafkaConsumerActor.scala:72)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
    at akka.actor.ActorCell.invoke(ActorCell.scala:495)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
    at akka.dispatch.Mailbox.run(Mailbox.scala:224)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
20:28:11.920 ERROR a.a.ActorSystemImpl - Uncaught error from thread [Tests-akka.kafka.default-dispatcher-147] shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled [Tests-akka.actor.default-dispatcher-64]
20:28:11.931 ERROR o.a.k.c.c.i.AbstractCoordinator - Unexpected interrupt received in heartbeat thread for group tests1 [kafka-coordinator-heartbeat-thread | tests1]
java.lang.InterruptedException: null
    at java.lang.Object.wait(Native Method)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:926)

Exception: sbt.TrapExitSecurityException thrown from the UncaughtExceptionHandler in thread "Tests-akka.kafka.default-dispatcher-147"
cchantep commented 7 years ago
java.lang.OutOfMemoryError: Direct buffer memory
    at java.nio.Bits.reserveMemory(Bits.java:658)
    at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
    at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
    at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:174)
    at sun.nio.ch.IOUtil.read(IOUtil.java:195)
    at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
    at org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:110)
    at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:97)
    at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
    at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:169)
    at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:150)
    at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:355)
    at org.apache.kafka.common.network.Selector.poll(Selector.java:303)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1047)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
    at akka.kafka.KafkaConsumerActor.tryPoll$1(KafkaConsumerActor.scala:243)
    at akka.kafka.KafkaConsumerActor.poll(KafkaConsumerActor.scala:289)
    at akka.kafka.KafkaConsumerActor.akka$kafka$KafkaConsumerActor$$receivePoll(KafkaConsumerActor.scala:218)
    at akka.kafka.KafkaConsumerActor$$anonfun$receive$1.applyOrElse(KafkaConsumerActor.scala:137)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
    at akka.kafka.KafkaConsumerActor.aroundReceive(KafkaConsumerActor.scala:72)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
    at akka.actor.ActorCell.invoke(ActorCell.scala:495)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
    at akka.dispatch.Mailbox.run(Mailbox.scala:224)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
21:42:17.327 ERROR a.a.ActorSystemImpl - Uncaught error from thread [Tests-akka.kafka.default-dispatcher-143] shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled [Tests-akka.actor.default-dispatcher-4]
21:42:17.338 ERROR o.a.k.c.c.i.AbstractCoordinator - Unexpected interrupt received in heartbeat thread for group tests1 [kafka-coordinator-heartbeat-thread | tests1]
java.lang.InterruptedException: null
    at java.lang.Object.wait(Native Method)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:926)
patriknw commented 7 years ago

What do you suggest that we do about that here? If it's a leak in the Kafka driver it must be fixed at that end.

cchantep commented 7 years ago

I'm wondering whether it's not trigger by the ConsumerStage, as I cannot reproduce it in the same env directly using the Kafka client.

chitrakojha commented 7 years ago

We're seeing the same error at high traffic. Is there a fix or new lib version to use?

chitrakojha commented 7 years ago

We also saw this today for Java Heap Memory:

Uncaught error from thread [ClusterSystem-akka.kafka.default-dispatcher-318]: Java heap space, shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for for ActorSystem[ClusterSystem]
java.lang.OutOfMemoryError: Java heap space
    at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
    at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
    at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:97)
    at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:75)
    at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:203)
    at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:167)
    at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:379)
    at org.apache.kafka.common.network.Selector.poll(Selector.java:326)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:432)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:232)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:168)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:358)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
    at akka.kafka.KafkaConsumerActor.tryPoll$1(KafkaConsumerActor.scala:257)
    at akka.kafka.KafkaConsumerActor.poll(KafkaConsumerActor.scala:303)
    at akka.kafka.KafkaConsumerActor.akka$kafka$KafkaConsumerActor$$receivePoll(KafkaConsumerActor.scala:232)
    at akka.kafka.KafkaConsumerActor$$anonfun$receive$1.applyOrElse(KafkaConsumerActor.scala:151)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
    at akka.kafka.KafkaConsumerActor.aroundReceive(KafkaConsumerActor.scala:73)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
    at akka.actor.ActorCell.invoke(ActorCell.scala:495)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
    at akka.dispatch.Mailbox.run(Mailbox.scala:224)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
johanandren commented 7 years ago

Did you see that it was reactive-kafka leaking memory (taking a heap dump and analyzing it for example)?

If it was we need more details about what is leaking.

If not you need to figure out and fix the memory issues in your application, it's not really anything the kafka connector can do about that.

chitrakojha commented 7 years ago

From the heap dump, it's most definitely the dispatcher. We got the issue under control by implementing back-pressure in streams and tuning threadpools.

abhijith-vr commented 6 years ago

I am using 0.19 of reactive kafka connector. Same error is coming .How to resolve it .

akka.kafka.KafkaConsumerActor - Reconciliation has found revoked assignments: Map() added assignments: Map(). Current subscriptions: Set(SubscribePattern(asset-.*,ListenerCallbacks(akka.kafka.internal.SingleSourceLogic$$Lambda$202/1753862672@4b982f41,akka.kafka.internal.SingleSourceLogic$$Lambda$203/2095702609@f84c028))) 
Uncaught error from thread [ceon-current-data-akka.kafka.default-dispatcher-10]: Direct buffer memory, shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for ActorSystem 
java.lang.OutOfMemoryError: Direct buffer memory
    at java.nio.Bits.reserveMemory(Bits.java:694)
    at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
    at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
    at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:241)
    at sun.nio.ch.IOUtil.read(IOUtil.java:195)
    at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
    at org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:109)
    at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:101)
    at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:75)
    at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:203)
    at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:167)
    at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:381)
    at org.apache.kafka.common.network.Selector.poll(Selector.java:326)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:433)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:232)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:184)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:217)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:203)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
    at akka.kafka.KafkaConsumerActor.tryPoll$1(KafkaConsumerActor.scala:287)
    at akka.kafka.KafkaConsumerActor.poll(KafkaConsumerActor.scala:334)
    at akka.kafka.KafkaConsumerActor.akka$kafka$KafkaConsumerActor$$receivePoll(KafkaConsumerActor.scala:256)
    at akka.kafka.KafkaConsumerActor$$anonfun$receive$1.applyOrElse(KafkaConsumerActor.scala:164)
    at akka.actor.Actor.aroundReceive(Actor.scala:517)
    at akka.actor.Actor.aroundReceive$(Actor.scala:515)
    at akka.kafka.KafkaConsumerActor.aroundReceive(KafkaConsumerActor.scala:79)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:527)
    at akka.actor.ActorCell.invoke(ActorCell.scala:496)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
    at akka.dispatch.Mailbox.run(Mailbox.scala:224)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
2018-02-19 18:18:54.411 [default-dispatcher-4] ERROR akka.actor.ActorSystemImpl - Uncaught error from thread [akka.kafka.default-dispatcher-10]: Direct buffer memory, shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for ActorSystem[ceon-current-data] 
java.lang.OutOfMemoryError: Direct buffer memory
    at java.nio.Bits.reserveMemory(Bits.java:694)
    at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
    at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
    at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:241)
    at sun.nio.ch.IOUtil.read(IOUtil.java:195)
    at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
    at org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:109)
    at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:101)
    at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:75)
    at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:203)
    at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:167)
    at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:381)
    at org.apache.kafka.common.network.Selector.poll(Selector.java:326)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:433)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:232)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:184)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:217)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:203)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
    at akka.kafka.KafkaConsumerActor.tryPoll$1(KafkaConsumerActor.scala:287)
    at akka.kafka.KafkaConsumerActor.poll(KafkaConsumerActor.scala:334)
    at akka.kafka.KafkaConsumerActor.akka$kafka$KafkaConsumerActor$$receivePoll(KafkaConsumerActor.scala:256)
    at akka.kafka.KafkaConsumerActor$$anonfun$receive$1.applyOrElse(KafkaConsumerActor.scala:164)
    at akka.actor.Actor.aroundReceive(Actor.scala:517)
    at akka.actor.Actor.aroundReceive$(Actor.scala:515)
    at akka.kafka.KafkaConsumerActor.aroundReceive(KafkaConsumerActor.scala:79)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:527)
    at akka.actor.ActorCell.invoke(ActorCell.scala:496)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
    at akka.dispatch.Mailbox.run(Mailbox.scala:224)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
rdebokx commented 6 years ago

I encountered similar behavior the other day. In my case this was due to the fact that I had topic-specific consumers in the same consumer group.

As soon as I created 80+ consumers, where each consumer would have its own topic to subscribe to, rebalancing issues occured that resulted in a similar behavior. Reading the documentation of kafka, it is not stated explicitly that this is not possible. It seems, however, that it is assumed that all consumers in a consumer group have the same contract, meaning that they should be able to deal with the same set of topics. It seems that the rebalancing algorithm also assumes this when it rebalances the partitions within a consumer group.

Conclusion: make sure that all consumers in a consumer group are subscribed to the same set of topics. Alternatively, you can use topic-specific group IDs.

ingared8 commented 6 years ago

When I tried to run a client to Kafka with TLS without providing all the trust store and key store files, I got the same errors. Instead of throwing SSL failed, it throwed the above exception.
kafka-console-consumer --topic test --from-beginning --bootstrap-server $KF

The following is the error : java.lang.OutOfMemoryError: Java heap space at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57) at java.nio.ByteBuffer.allocate(ByteBuffer.java:335) at org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30) at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:140) at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:93) at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:244) at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:205) at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:557) at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:495) at org.apache.kafka.common.network.Selector.poll(Selector.java:424) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:460) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:261) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:156) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:228) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:205) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:279) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1149) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115) at kafka.consumer.NewShinyConsumer.<init>(BaseConsumer.scala:63) at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:78) at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:54) at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)

On Contrast when I run with all detailed config properties, it works. kafka-console-consumer --topic test --from-beginning --bootstrap-server $KF --consumer.config /opt/conf/consumer.properties

ingared8 commented 6 years ago

My observation is java.lang.OutOfMemoryError: Java heap space at java.nio.HeapByteBuffer.<init> is not exactly a OOM issue but somehow SSL authentication errors are manifested into this issue.

johanandren commented 6 years ago

Could be the same as this Kafka issue perhaps: https://issues.apache.org/jira/browse/KAFKA-4090?jql=project%20%3D%20KAFKA%20AND%20text%20~%20java.lang.OutOfMemoryError

(If that's not it, there are also a few other client OOMs in the Kafka tracker if you search)

victuxbb commented 4 years ago

As @ingared8 mentioned, seems a problem with the consumer security settings, I was dealing with same error and when I configured properly my security user and pass everything worked well!

mosaic-pradeep-mani commented 4 years ago

i faced the same issue, and resolved it by setting .withProperty("security.protocol", "SSL") in consumer/producer settings

ronit29 commented 3 years ago

@victuxbb , @mosaic-pradeep-mani facing the same issue could you please elaborate in details what security settings need to be configured on both producer and consumer!!

mosaic-pradeep-mani commented 3 years ago

in the conf file kafka-consumer-settings: ${akka.kafka.consumer} { kafka-clients { bootstrap.servers = "localhost9094" security.protocol = "SSL" auto.offset.reset = "earliest" } }

ronit29 commented 3 years ago

@mosaic-pradeep-mani , i have a java based consumer and ruby based producer.... can you brief me on the ssl.keystore.location part how do i generate a key and use it!

sandeep2244 commented 10 months ago

As soon as I created 80+ consumers, where each consumer would have its own topic to subscribe to, rebalancing issues occured that resulted in a similar behavior. Reading the documentation of kafka, it is not stated explicitly that this is not possible. It seems, however, that it is assumed that all consumers in a consumer group have the same contract, meaning that they should be able to deal with the same set of topics. It seems that the rebalancing algorithm also assumes this when it rebalances the partitions within a consumer group.

Conclusion: make sure that all consumers in a consumer group are subscribed to the same set of topics. Alternatively, you can use topic-specific group IDs.

Hey, did it solved ?