bootique / bootique-kafka

Provides integration of Kafka client and streams with Bootique
http://bootique.io
Apache License 2.0
4 stars 4 forks source link

User-friendly consumer interface #23

Closed andrus closed 6 years ago

andrus commented 6 years ago

It is kinda hard to consume data from Kafka using its standard client/Consumer. Let's provide a simple Iterator/Stream based interface that doesn't need to do manual polling and shutdown management. Borrowing ideas from the recently implemented kafka-streams.. E.g.

@Inject
KafkaConsumerFactory factory;

KafkaConsumerRunner<byte[], String> runner = factory
    .charValueConsumer()
    .autoOffsetReset(OffsetReset.earliest)
    .pollInterval(Duration.of(100))
    .topic("mytopic")
    .create();

// 1. iterator consumption
for(ConsumerRecord<byte[], String> r : runner) {
 ...
}

// 2. stream consumption
runner.stream().map(r -> ...)....

Upgrade Notes:

  1. Consumers can no longer be obtained via KafkaClientFactory. Instead inject KafkaConsumerFactory and use its consumer builder methods.

  2. Producers can no longer be obtained via KafkaClientFactory. Instead inject KafkaProducerFactory and use its consumer builder methods.

  3. kafkaclient.consumer.autoCommitIntervalMs is renamed to kafkaclient.consumer.autoCommitInterval and is now a duration (so you can use readable values like "1ms". Same goes for kafkaclient.consumer.sessionTimeoutMs that got renamed to kafkaclient.consumer.sessionTimeout.