sksamuel / avro4s

Avro schema generation and serialization / deserialization for Scala
Apache License 2.0
719 stars 236 forks source link

Using GenericSerde[T] in a Kafka Producer throws runtime error value.serializer: Expected a Class instance or class name. #592

Closed cwhellams closed 3 years ago

cwhellams commented 3 years ago

Trying to use the example in the ReadMe for GenericSerde[T] for a value serializer in a Kafka Producer I get an error at runtime Expected a Class instance or class name.

props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, new GenericSerde[NetPos])

I am invoking the KafkaProducer with new KafkaProducer[String,NetPos](props)

It seems like the Kafka Producer does not see the class instance I have created and tries to instantiate the class again itself, which obviously fails. No doubt there is something simple or subtle I have missed. I can send a PR with updated Readme if I can get help to make this work. Thnx.

fmsbeekmans commented 3 years ago

Most objects in the Kafka ecosystem work with a configure method that takes a java.util.Map[String, AnyRef] and a 0 argument constructor. Where the configuration is asking for a type it wants a canonical class name. Try props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "com.sksamuel.avro4s.kafka.GenericSerde")

cwhellams commented 3 years ago

Thanks for the tip. We got this working using the KafkaProducer overloads (https://kafka.apache.org/26/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html) . It seems you cannot use generics in props, at least not in the Kafka 2.6 libs. Without checking the Kafka source I assume it is using reflection to instantiate a concrete serdes or then expects a concrete serializer with a default constructor. The Kafka API cannot handle any class that takes parameters passed thru props - it will compile but will throw at runtime. I'm not sure if this is by design or a bug - it seems like a bug in Kafka to me.

It also seems that you must pass key and value serializers in props, however, you can get GenericSerde[T] to work as expected by passing in an instance to the KafkaProducer constructor, whatever is in props is ignored. So this works :

´new KafkaProducer[String, NetPos](props, new StringSerializer, new GenericSerdeNetPos

GenericSerde[T] will pass unit tests because it is working just fine, however, it is rather tricky to use at runtime as there are some quirks ! I will see if @sksamuel will accept a PR to the docs to explain this a bit better when I have time. Closing this issue for now.