bootique / bootique-kafka

Provides integration of Kafka client and streams with Bootique
http://bootique.io
Apache License 2.0
4 stars 4 forks source link

Replace consumer iterator with callback #30

Closed andrus closed 3 years ago

andrus commented 3 years ago

KafkaConsumerRunner implements an iterator over Kafka messages. This was a good try, but Kafka polling is just not really compatible with the iterator model. E.g. it can only work in auto-commit mode, that does not allow "at least once" semantics, and may result in data loss.

So KafkaConsumerRunner is replaced with two alternative methods:

@Inject
KafkaConsumerFactory factory;

// using Kafka API
Consumer<byte[], byte[]> c  = factory.binaryConsumer()
                .autoCommit(false)
                .cluster("X")
                .topics("topic1")
                .group("group1")
                .createConsumer();

// using KafkaConsumerCallback lambda
KafkaPollingTracker<String, String> p = factory.charConsumer()
                .autoCommit(false)
                .cluster("X")
                .topics("topic1")
                .group("group1")
                .consume((c, d) -> data.forEach(r -> System.out.println(r.value())));

Upgrade Notes

If you were using KafkaConsumerRunner, you will have to switch to consume(KafkaConsumerCallback,Duration). The callback is invoked on a batch of data after each poll.