akka / alpakka-kafka

Alpakka Kafka connector - Alpakka is a Reactive Enterprise Integration library for Java and Scala, based on Reactive Streams and Akka.
https://doc.akka.io/libraries/alpakka-kafka/current/
Other
1.42k stars 386 forks source link

Consumer fails due to WakeupExceptions when there are no messages for a while #235

Closed kciesielski closed 5 years ago

kciesielski commented 8 years ago

As reported by @breadfan "(...) if there are not any new messages for some period of time we get a "[ERROR] from akka.kafka.KafkaConsumerActor in search-classified-cats-enhancer-akka.actor.default-dispatcher-5 - WakeupException limit exceeded, stopping."

A consumer should not receive WakeupExceptions when there are no messages.

jonbuffington commented 6 years ago

@sbrnunes I mean a separate committable source per topic. All committable sources (now six) belong to the same consumer group.

We have not seen any connection issues across all our Kafka clients. My suspicion is a strange interaction between akka message and the Kafka poll() loop. I do not have any data to back up my suspicion though.

dvgica commented 6 years ago

There's a hacky little workaround for unlimited wakeups. Set akka.kafka.consumer.max-wakeups = 0, and it will just keep retrying. This is thanks to the code here: https://github.com/akka/reactive-kafka/blob/master/core/src/main/scala/akka/kafka/KafkaConsumerActor.scala#L305.

patriknw commented 6 years ago

setting it to 2147483647 will also effectively be infinite retries, if that is what you need

jroper commented 6 years ago

I'm not sure if this is helpful or not, but right now, I'm seeing these errors every time a Kafka consumer starts up:

13:16:54.061 [info] akka.kafka.internal.SingleSourceLogic [sourceThread=example-application-akka.actor.default-dispatcher-4, akkaTimestamp=03:16:54.061UTC, akkaSource=SingleSourceLogic(akka://example-application), sourceActorSystem=example-application] - Revoked partitions: Set(). All partitions: Set()
13:16:57.064 [warn] akka.kafka.KafkaConsumerActor [sourceThread=example-application-akka.actor.default-dispatcher-18, akkaTimestamp=03:16:57.060UTC, akkaSource=akka.tcp://example-application@127.0.0.1:44931/system/kafka-consumer-3, sourceActorSystem=example-application] - KafkaConsumer poll is taking significantly longer (3000ms) to return from poll then the configured poll interval (50ms). Waking up consumer to avoid thread starvation.
13:16:57.086 [info] akka.kafka.internal.SingleSourceLogic [sourceThread=example-application-akka.actor.default-dispatcher-15, akkaSource=SingleSourceLogic(akka://example-application), sourceActorSystem=example-application, akkaTimestamp=03:16:57.085UTC] - Assigned partitions: Set(my.messages.in1-0). All partitions: Set(my.messages.in1-0)
13:16:57.089 [warn] akka.kafka.KafkaConsumerActor [sourceThread=example-application-akka.actor.default-dispatcher-18, akkaTimestamp=03:16:57.088UTC, akkaSource=akka.tcp://example-application@127.0.0.1:44931/system/kafka-consumer-3, sourceActorSystem=example-application] - Wake up has been triggered. Dumping stacks: 

What's really frustrating is the fact that it dumps the stacks - so every time my application starts (which when working with something like Lagom that does hot reloads on code change is all the time), I get thousands of lines of stack dumps logged at warn. Can the stack dump be logged at debug level, so that it can be filtered out? The issue doesn't seem to have any actual effect, so at least for the time being, it would nice to be able to ignore it.

jroper commented 6 years ago

Sorry I just noticed the wakeup-debug setting. I'll disable that.

patriknw commented 6 years ago

Perhaps we can make wekeup-debug less aggressive by not triggering it until we are almost out of retries.

ennru commented 5 years ago

All wakeup exception usage is removed now as of 1.0-RC1.

ASchmidt84 commented 5 years ago

For topics with sporadic messages (especially where you may never know when a message will arrive), set fetch-min-bytes to 0, timeout to some acceptable value (e.g. 1 second) that is less than heartbeat, and interval to some acceptable value (e.g. 1 second) that is less than heartbeat.

This will cause polls to return and be repeated in a timely manner without busy waiting the poll.

Which config I have to add in my application.conf for that?

ennru commented 5 years ago

What Alpakka Kafka version are you using? Old docs are available: eg. 0.22