confluentinc / schema-registry

Confluent Schema Registry for Kafka
https://docs.confluent.io/current/schema-registry/docs/index.html
Other
2.23k stars 1.11k forks source link

NullPointerException serializing Avro message in KafkaStreams #524

Closed OscarLinden closed 7 years ago

OscarLinden commented 7 years ago

I'm updating a kafka streams app to change the outputs to Avro. I see a similar NPE as in #389:

org.apache.kafka.common.errors.SerializationException: Error serializing Avro message
Caused by: java.lang.NullPointerException
        at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:72)
        at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:54)

The NPE seems to be related to the internal schemaRegistry object. I have configured a schema registry in my kafka streams config:

config.setProperty(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://schema.confluent");

The schema registry is up and responds to /subjects.

I couldn't find an example of using Avro with kafka streams (the WikipediaFeedAvroExample uses a produce client not a stream for writing Avro). Am I missing something?

OscarLinden commented 7 years ago

Looks like the problem was I was creating my avro serdes by hand, so I needed to also create and pass a schema registry client. Setting the URL in the streams config didn't help because streams wasn't making my serde.

ardlema commented 5 years ago

Hi @OscarLinden , would you mind letting me know how did you manage to create the SchemaRegistryClient? I'm trying to make a unit test work and I'm getting the same NullPointerException.

See following my code:

val testDriver = new TopologyTestDriver(TopologyBuilder.createTopology(), kafkaConfig)
        val recordFactory = new ConsumerRecordFactory(new StringSerializer(), new KafkaAvroSerializer())
        val client = new Client("alberto",39, true)
        val consumerRecordFactory = recordFactory.create("input-topic", "a", client, 9999L)
        testDriver.pipeInput(consumerRecordFactory)

Thank you in advance!