smallrye / smallrye-reactive-messaging

SmallRye Reactive Messaging
http://www.smallrye.io/smallrye-reactive-messaging/
Apache License 2.0
241 stars 179 forks source link

KafkaCompanion does not configure Serializer/Deserializer #2756

Open diversit opened 1 month ago

diversit commented 1 month ago

The KafkaCompanion does not properly configure a registered Serde, Serializer and/or Deserialiser which causes issues with certain implementations.

E.g. When using Confluents io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde (and it's serializer/deserializer) not properly calling configure causes a test to fail with:

java.lang.AssertionError: Expected a completion event but got a failure: org.apache.kafka.common.errors.InvalidConfigurationException: You must configure() before serialize() or use serializer constructor with SchemaRegistryClient

This behaviour can be verified by adding a test case to SerdesTest:

    @Test
    void testRegisteredSerdeShouldBeConfigured() {
        // Custom Serde which Serialize/Deserializer throw exception if not configured
        companion.registerSerde(Person.class, new Serde<>() {

            // Custom Deserializer which throws an exception if not configured
            private PersonDeserializer personDeserializer = new PersonDeserializer() {
                private boolean isConfigured = false;

                @Override
                public void configure(Map<String, ?> configs, boolean isKey) {
                    isConfigured = true;
                }

                @Override
                public Person deserialize(String s, byte[] bytes) {
                    if (isConfigured) {
                        return super.deserialize(s, bytes);
                    } else {
                        throw new SerializationException("Deserializer not configured");
                    }
                }
            };

            // Custom Serializer which throws an exception if not configured
            private PersonSerializer personSerializer = new PersonSerializer() {
                private boolean isConfigured = false;

                @Override
                public void configure(Map<String, ?> configs, boolean isKey) {
                    isConfigured = true;
                }

                @Override
                public byte[] serialize(String s, Person person) {
                    if (isConfigured) {
                        return super.serialize(s, person);
                    } else {
                        throw new SerializationException("Serializer not configured");
                    }
                }
            };

            @Override
            public Serializer<Person> serializer() {
                return personSerializer;
            }

            @Override
            public Deserializer<Person> deserializer() {
                return personDeserializer;
            }

            @Override
            public void configure(Map<String, ?> configs, boolean isKey) {
                // Configure the serializer and deserializer
                personSerializer.configure(configs, isKey);
                personDeserializer.configure(configs, isKey);
            }
        });

        companion.produce(Person.class).fromRecords(
                new ProducerRecord<>(topic, new Person("1", 30)),
                new ProducerRecord<>(topic, new Person("2", 25)),
                new ProducerRecord<>(topic, new Person("3", 18))).awaitCompletion();

        ConsumerBuilder<String, Person> consumer = companion.consumeWithDeserializers(PersonDeserializer.class.getName());
        ConsumerTask<String, Person> task = consumer.fromTopics(topic, 3).awaitCompletion();
        assertThat(task.getRecords()).hasSize(3);
    }

which will currently fail with

java.lang.AssertionError: Expected a completion event but got a failure: org.apache.kafka.common.errors.SerializationException: Serializer not configured

KafkaCompanion should be updated to call configure on the Serde so the Serializer and Deserialiser properly get configured before a message is published or consumed.

Due to this issue, other serialisers like the StringSerializer, UUIDSerializer, ListSerializer, etc could also give unexpected results in tests.

Provided a fix in PR #2757

ozangunalp commented 1 month ago

@diversit Thank you for opening this detailed issue and the PR.

This was intentional, following the typical Kafka client behavior. If you pass the serde type name, configure will be called by the client constructor. I believe there was a javadoc note on that.

That being said, I think configure could've called the for creators from registered serde. Would that make sense for you ?

diversit commented 1 month ago

@ozangunalp Thanks for your reply.

I understand your point. I was in the understanding the SerDe would required configuration provided by the KafaCompanion and I thought the getCommonClientConfig() would not be sufficient.

From the docs registering custom serdes I had not understood I had to configure the SerDe myself. And I'm having trouble finding the Javadocs for the SmallRye projects since it does not seem to be linked from the SmallRye site or Github repo's.

Could you please point me to the Javadoc note your mentioned?

After better reading the docs of SpecificAvroSerde I know understand I have to configure it myself with the appropriate config, before registering it with the KafkaCompanion. Next challenge will then be how to get the url from the Schema Registry from Quarkus in dev/test modes.

So I guess this issue can be closed then.

ozangunalp commented 1 month ago

You can access the javadoc from here : https://javadoc.io/doc/io.smallrye.reactive/smallrye-reactive-messaging-kafka-test-companion/latest/io/smallrye/reactive/messaging/kafka/companion/ConsumerBuilder.html

Next challenge will then be how to get the url from the Schema Registry from Quarkus in dev/test modes.

If you use the KafkaCompanionResource it is done in here : https://github.com/quarkusio/quarkus/blob/50f2d3f1bd14b0b8f518d8d346422e0dc7199898/test-framework/kafka-companion/src/main/java/io/quarkus/test/kafka/KafkaCompanionResource.java#L28