azhur / kafka-serde-scala

Implicitly converts typeclass encoders to kafka Serializer, Deserializer, Serde.
https://github.com/azhur/kafka-serde-scala
Apache License 2.0
57 stars 9 forks source link

How to use this with a Producer/Consumer (not KafkaStreams) #220

Closed fr-ser closed 3 years ago

fr-ser commented 3 years ago

Hi,

I have been using the implicit Serdes for a KafkaStreams app very nicely, but for the integration tests, I also need to manually produce some messages. As the producer actually requires a string I have been wondering how to achieve this.

The code snippet below shows the use case (which errors with a runtime error - not a build error) Error: Invalid value com.example.reading_filter.models.JsonSerde$$anon$1@3aaf4f07 for configuration key.serializer: Expected a Class instance or class name

object KafkaHelper {
  private def fooSerializer()(implicit serde: Serde[Foo]) = serde.deserializer()

  def getProducer: KafkaProducer[Foo, Foo] = {
    val props = new Properties
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, appConfig.bootstrapServers)
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, fooSerializer)
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, fooSerializer)
    new KafkaProducer(props)
}

...
KafkaHelper.getProducer

I also tried fooSerializer.getClass, but this results in Could not find a public no-argument constructor for com.example.reading_filter.models.JsonSerde$$anon$1

azhur commented 3 years ago

Hi @fr-ser, KafkaProducer has the constructor which accepts key serializer and value serializer as well:

public KafkaProducer(Properties properties, Serializer<K> keySerializer, Serializer<V> valueSerializer)

so you can use that constructor to instantiate your KafkaProducer:

def getProducer[K, V](implicit keySerializer: Serializer[K], valueSerializer: Serializer[V]): KafkaProducer[Foo, Foo] = {
    val props = new Properties
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, appConfig.bootstrapServers)
    // other possible props you need

    new KafkaProducer(props, keySerializer, valueSerializer)
}
fr-ser commented 3 years ago

I didn't know that one. Thanks a lot. This works nicely!