nomisRev / kotlin-kafka

Kafka bindings for Kotlin `suspend`, and Kafka streaming operators for KotlinX Flow.
https://nomisRev.github.io/kotlin-kafka/
Apache License 2.0
103 stars 10 forks source link

Getting metrics from KafkaReceiver #196

Open gavvvr opened 2 months ago

gavvvr commented 2 months ago

Hello

The io.github.nomisRev.kafka.publisher.KafkaPublisher interface in this library exposes metrics() function to get metrics from the underlying org.apache.kafka.clients.producer.KafkaProducer. Unfortunately, the io.github.nomisRev.kafka.receiver.KafkaReceiver interface does not expose anything similar to obtain metrics.

This is a feature request to make it possible to obtain metrics from the underlying org.apache.kafka.clients.consumer.KafkaConsumer. As for now, I don't even see any workaround to get consumer metrics, because KafkaConsumer gets created on each receive() call and is very well encapsulated.

There are different extensions on org.apache.kafka.clients.consumer.*-types (including raw KafkaConsumer) in Consumer.kt and I could probably refactor current code to use them, but all of them are marked as deprecated with a recommendation to use io.github.nomisRev.kafka.receiver.KafkaReceiver instead.

I guess that receive() method on KafkaReceiver should rather return some advanced combined type, which has both traits: 1) the Flow itself + 2) other interfaces with additional capabilities (for example, for accessing metrics)