akka / alpakka-kafka

Alpakka Kafka connector - Alpakka is a Reactive Enterprise Integration library for Java and Scala, based on Reactive Streams and Akka.
https://doc.akka.io/libraries/alpakka-kafka/current/
Other
1.42k stars 386 forks source link

Allow to use polling on demand by KafkaConsumerActor #969

Closed szymonm closed 4 years ago

szymonm commented 5 years ago

Short description

Currently, the KafkaConsumerActor calls consumer.poll periodically and does not allow to skip polling if there is no demand from the ...SourceLogic. This behaviour ensures that Kafka receives polls (that act like heartbeats) in a timely manner and doesn't consider the consumer dead. While this can be efficient in many scenarios, it prevents Kafka's automatic subscription mechanisms to rebalance, when the consumer does not keep up. This mechanism may be helpful when the partitioning assigned by Kafka is not uniform in terms of load.

I suggest adding a new option to the consumer to indicate if the client want's to poll periodically or the polls should be source-based.

Details

Consumer behaviour is described in Detecting Consumer Failures section of KafkaConsumer java doc (https://kafka.apache.org/20/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html)

szymonm commented 5 years ago

@seglo @ennru do you have any thoughts around this?

ennru commented 5 years ago

Sorry, we were in meetings last week.

I had similar ideas in https://github.com/akka/alpakka-kafka/issues/559

Polling is not required for heartbeats anymore, the Kafka client runs those on a separate thread. It is required to get notified about rebalances and to receive commit acknowledgements.

It would require to increase the poll interval to something shorter than max.poll.interval.ms.

Downstream demand triggers extra polling with Poll(periodic=false), so you might want to try changing the poll-interval to something longer and see how that behaves.

seglo commented 5 years ago

Hi @szymonm. Can you clarify the following:

While this can be efficient in many scenarios, it prevents Kafka's automatic subscription mechanisms to rebalance, when the consumer does not keep up.

It's my understanding that more frequent polling helps facilitate faster rebalances because the rebalance listener runs on the user's polling thread. It also helps us handle commit acks faster, as @ennru already pointed out.

szymonm commented 4 years ago

That makes a lot of sense, @ennru. Seems like I can achieve what I want by increasing max.poll.interval.ms.

It's my understanding that more frequent polling helps facilitate faster rebalances because the rebalance listener runs on the user's polling thread. It also helps us handle commit acks faster, as @ennru already pointed out.

That's true. On the other hand, Kafka will trigger rebalancing, if a consumer has not been polling for long enough. This can help spread the load more evenly if one of the streams is backpressuring for long.