monix / monix-kafka

Monix integration with Kafka
Apache License 2.0
123 stars 38 forks source link

KafkaConsumerObservable and its subclasses use KafkaConsumer (implementation) instead of Consumer (interface) #244

Closed girishkolantra closed 3 years ago

girishkolantra commented 3 years ago

Use of the implementation instead of the interface as follows

trait KafkaConsumerObservable[K, V, Out] extends Observable[Out] {
  protected def config: KafkaConsumerConfig
  protected def consumer: Task[KafkaConsumer[K, V]]

would make integration with tracing libraries like zipkin more difficult. Using the Consumer interface instead would allow for easy integration with tracing libraries

Avasil commented 3 years ago

How would that work?

I think none of the Consumer interface methods are actually implemented by KafkaConsumerObservable, all of it is delegated to actual KafkaConsumer.

There is a constructor that allows to pass KafkaConsumer by the caller:

def apply[K, V](
  cfg: KafkaConsumerConfig,
  consumer: Task[KafkaConsumer[K, V]]): KafkaConsumerObservable[K, V, ConsumerRecord[K, V]]

I didn't use these tracing libraries but if they wrap Kafka Consumer, couldn't we pass this wrapped tracing consumer to the KafkaConsumerObservable and have tracing on poll, commit etc?

girishkolantra commented 3 years ago

The zipkin tracing library has a tracing consumer that decorates a Consumer[K, V]. So if the apply method in the KafkaConsumerObservable accepted the interface, the integration would be seemless

Avasil commented 3 years ago

Ahh, sorry, I misunderstood.

Yes, in this case, the change would be very useful! Not binary compatible but we don't provide a guarantee here so it's fine

Avasil commented 3 years ago

@girishkolantra @arun0009

I've released a SNAPSHOT with your changes: 1.0.0-RC6-c7231f9-SNAPSHOT

Let me know if that works out for you, or if you need anything more to be included in a proper release.

Thank you for the PR!