zio / zio-kafka

A Kafka client for ZIO and ZIO Streams
https://zio.dev/zio-kafka
Apache License 2.0
337 stars 138 forks source link

Consumer.fromJavaConsumer must fail if the passed consumer has enable.auto.commit set to `true` #1289

Closed tnielens closed 2 months ago

tnielens commented 2 months ago

From the KafkaConsumer java doc

Note: Using automatic offset commits can also give you "at-least-once" delivery, but the requirement is that you must consume all data returned from each call to poll(Duration) before any subsequent calls, or before [closing](https://kafka.apache.org/37/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#close()) the consumer. If you fail to do either of these, it is possible for the committed offset to get ahead of the consumed position, which results in missing records. The advantage of using manual offset control is that you have direct control over when a record is considered "consumed."

zio-kafka is not compatible with auto.commit because it prepolls batches for each partition. If auto.commit is enabled, the consumer will commit batches before they are processed by the user streams.

erikvanoosten commented 2 months ago

Thanks for this issue! I hope it was not created because of a hard learned lesson :(

I mostly, but not entirely agree with the premise, but I guess in practice its better to do prevent auto commits indeed.

There is one other option: plain ignore the setting (by swapping the arguments to ++ on this line https://github.com/zio/zio-kafka/blob/master/zio-kafka/src/main/scala/zio/kafka/consumer/ConsumerSettings.scala#L74

tnielens commented 2 months ago

Thank you @erikvanoosten! No hard leassons here. I was thinking about the auto.commit behavior since I realised it's by default set to true.

I'm not sure I read correctly the PR changed files. Don't forget to cover the fromJavaConsumer constructor which passes an already constructed java consumer.

erikvanoosten commented 2 months ago

Don't forget to cover the fromJavaConsumer constructor which passes an already constructed java consumer.

This is a little bit problematic. We can't extract the actual settings from the given consumer. However, you also need to pass a consumerSettings instance which needs to follow the rules set in #1290. To be sure I'll add some documentation to the method.

erikvanoosten commented 2 months ago

@tnielens I am going to close this issue as can't-fix. Even though there is some protection in place now via #1290, the java consumer does not allow inspection in such a way that we can fully solve this issue.