confluentinc / schema-registry

Confluent Schema Registry for Kafka
https://docs.confluent.io/current/schema-registry/docs/index.html
Other
2.23k stars 1.11k forks source link

Unconfigured SpecificAvroSerializer throws an opaque error when used with TopologyTestDriver #1085

Closed MichaelDrogalis closed 3 years ago

MichaelDrogalis commented 5 years ago

I have a topic with events serialized in Avro and Schema Registry, so I'm using the SpecificAvroSerializer and SpecificAvroDeserializer serdes. But when I use the TopologyTestDriver, I encountered the following error:

 org.apache.kafka.common.errors.SerializationException: Error serializing Avro message

        Caused by:
        java.lang.NullPointerException
            at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:82)
            at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:53)
            at io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer.serialize(SpecificAvroSerializer.java:65)
            at io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer.serialize(SpecificAvroSerializer.java:38)
            at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:60)
            at org.apache.kafka.streams.test.ConsumerRecordFactory.create(ConsumerRecordFactory.java:184)
            at org.apache.kafka.streams.test.ConsumerRecordFactory.create(ConsumerRecordFactory.java:290)
            at org.apache.kafka.streams.test.ConsumerRecordFactory.create(ConsumerRecordFactory.java:324)
            at org.apache.kafka.streams.test.ConsumerRecordFactory.create(ConsumerRecordFactory.java:304)
            at io.confluent.developer.FilterEventsTest.testFilter(FilterEventsTest.java:96)

I eventually found out that this is because I need to manually configure the serdes to use a Schema Registry URL, since the test driver skips through the code path to connect to Schema Registry on its own:

    public SpecificAvroSerializer<User> makeSerializer(Properties envProps) {
        SpecificAvroSerializer<User> serializer = new SpecificAvroSerializer<>();
        serializer.configure(new HashMap<String, String>() {{
            put("schema.registry.url", envProps.getProperty("schema.registry.url"));
        }}, false);

        return serializer;
    }

    public SpecificAvroDeserializer<User> makeDeserializer(Properties envProps) {
        SpecificAvroDeserializer<User> deserializer = new SpecificAvroDeserializer<>();
        deserializer.configure(new HashMap<String, String>() {{
            put("schema.registry.url", envProps.getProperty("schema.registry.url"));
        }}, false);

        return deserializer;
    }

We should either change the error message to something more helpful, or make changes to avoid having users configure their serdes in this scenario.

slominskir commented 3 years ago

This problem is not just when using the TopologyTestDriver - the NullPointerException occurs when running against a real Kafka server too if you forget to configure the Schema Registry URL. This is an incredibly confusing and unhelpful exception that I keep re-discovering as I prototype code and then waste a ton of time trying various fixes before remembering that its the registry URL config missing!