spring-cloud / spring-cloud-stream-binder-kafka

Spring Cloud Stream binders for Apache Kafka and Kafka Streams
Apache License 2.0
331 stars 301 forks source link

Unknown Autoconfigured KafkaAvroSerializer and KafkaAvroDeSerializers #1224

Closed bjeSSAdev closed 1 year ago

bjeSSAdev commented 1 year ago

I'm using version 2.8 of the binder. I am defining several Serde beans in the application context. As you can see the auto.register.schemas property is being set to false in all the SpecificAvroSerdes.

@Configuration public class SerdeConfiguration {

private static final String SCHEMA_REGISTRY_URL_CONFIG = "schema.registry.url";
private static final String AUTO_REGISTER_SCHEMAS = "auto.register.schemas";

@Bean
public Serde<UnreportedTipsKey> tipsKeySerde(
        @Value("${spring.kafka.properties.schema.registry.url}") String schemaRegistryUrl) {

    final Map<String, Object> serdeConfig = new HashMap<>();
    serdeConfig.put(SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
    serdeConfig.put(AUTO_REGISTER_SCHEMAS, false);

    final Serde<UnreportedTipsKey> keySpecificRecordSerde = new SpecificAvroSerde<>();
    keySpecificRecordSerde.configure(serdeConfig, true);

    return keySpecificRecordSerde;
}

@Bean
public Serde<UnreportedWagesKey> wagesKeySerde(
        @Value("${spring.kafka.properties.schema.registry.url}") String schemaRegistryUrl) {

    final Map<String, Object> serdeConfig = new HashMap<>();
    serdeConfig.put(SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
    serdeConfig.put(AUTO_REGISTER_SCHEMAS, false);

    final Serde<UnreportedWagesKey> keySpecificRecordSerde = new SpecificAvroSerde<>();
    keySpecificRecordSerde.configure(serdeConfig, true);

    return keySpecificRecordSerde;
}

@Bean
public Serde<SpecificRecord> specificRecordValueSerde(
        @Value("${spring.kafka.properties.schema.registry.url}") String schemaRegistryUrl) {

    final Map<String, Object> serdeConfig = new HashMap<>();
    serdeConfig.put(SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
    serdeConfig.put(AUTO_REGISTER_SCHEMAS, false);
    serdeConfig.put("use.latest.version", true);
    serdeConfig.put("latest.compatibility.strict", false);
    serdeConfig.put("normalize.schemas", true);

    final Serde<SpecificRecord> specificRecordValueSerde = new SpecificAvroSerde<>();
    specificRecordValueSerde.configure(serdeConfig, false);

    return specificRecordValueSerde;

}

public static Serde<UnreportedTips> unreportedTipsSerde() {
    Serdes.String();
    return new JsonSerde<>(new ObjectMapper().constructType(UnreportedTips.class));
}

public static Serde<UnreportedWages> unreportedWagesSerde() {
    return new JsonSerde<>(new ObjectMapper().constructType(UnreportedWages.class));
}

I have the following streams processors that uses the serdes.

@Bean @Autowired public Consumer<KStream<UnreportedTipsKey, SpecificRecord>> tipsProcess( @Value("${kafka.store.tips}") String unreportedTipsStore, Serde tipsKeySerde, Serde specificSerde, EventPersister eventPersister, @Value("${kafka.topic.tips}") String topic, TransformerSupplier<UnreportedTipsKey, SpecificRecord, KeyValue<UnreportedTipsKey, SpecificRecord>> tipsTransformerSupplier, @Value("${MDE_DEPLOYMENT_STAGE}") String env) {

    return input -> input.transform(tipsTransformerSupplier)
            .groupByKey(Grouped.with(tipsKeySerde, specificSerde))
            .aggregate(UnreportedTips::new,
                    new UnreportedTipsAggregator(
                            eventPersister, topic, env),
                    Materialized
                            .<UnreportedTipsKey, UnreportedTips, KeyValueStore<Bytes, byte[]>>as(
                                    unreportedTipsStore)
                            .withKeySerde(tipsKeySerde).withValueSerde(
                                    SerdeConfiguration.unreportedTipsSerde()));
}

@Bean
public Consumer<KStream<UnreportedWagesKey, SpecificRecord>> wagesProcess(
        @Value("${kafka.store.wages}") String unreportedWagesStore,
        Serde<UnreportedWagesKey> wagesKeySerde, Serde<SpecificRecord> specificSerde,
        EventPersister eventPersister, @Value("${kafka.topic.wages}") String topic,
        TransformerSupplier<UnreportedWagesKey, SpecificRecord, KeyValue<UnreportedWagesKey, SpecificRecord>> wagesTransformerSupplier,
        @Value("${MDE_DEPLOYMENT_STAGE}") String env) {
    return input -> input.transform(wagesTransformerSupplier)
            .groupByKey(Grouped.with(wagesKeySerde, specificSerde))
            .aggregate(UnreportedWages::new,
                    new UnreportedWagesAggregator(eventPersister, topic, env),
                    Materialized
                            .<UnreportedWagesKey, UnreportedWages, KeyValueStore<Bytes, byte[]>>as(
                                    unreportedWagesStore)
                            .withKeySerde(wagesKeySerde).withValueSerde(
                                    SerdeConfiguration.unreportedWagesSerde()));
}

And these are the properties I'm using.

spring.kafka.properties.schema.registry.url=private spring.kafka.properties.value.subject.name.strategy=gov.ssa.irsrptg.configuration.UnqualifiedRecordNameStrategy

spring.cloud.function.definition=tipsProcess;wagesProcess spring.cloud.stream.kafka.streams.binder.brokers=${KAFKA_BROKERS}

applications should not auto create topics

spring.cloud.stream.kafka.binder.autoCreateTopics=false

spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler=logAndContinue

spring.cloud.stream.kafka.streams.bindings.tipsProcess.consumer.startOffset=latest spring.cloud.stream.kafka.streams.bindings.wagesProcess.consumer.startOffset=latest

spring.cloud.stream.kafka.streams.binder.functions.tipsProcess.applicationId=utw-${MDE_DEPLOYMENT_STAGE}-t spring.cloud.stream.kafka.streams.binder.functions.wagesProcess.applicationId=utw-${MDE_DEPLOYMENT_STAGE}-w

spring.cloud.stream.bindings.tipsProcess-in-0.destination=${kafka.topic.tips} spring.cloud.stream.bindings.wagesProcess-in-0.destination=${kafka.topic.wages}

To my surprise KafkaAvroSerializer and KafkaAvroDeSerializer are configured with the following properties. Some properties are omitted for brevity.

    **auto.register.schemas = true**
**schema.registry.url = [http://awse-d-kafka-devsch2.aci.is.cl.ssa.gov]**
**value.subject.name.strategy = class gov.ssa.irsrptg.configuration.UnqualifiedRecordNameStrategy**

I discovered this before setting the spring.kafka.properties.value.subject.name.strategy=gov.ssa.irsrptg.configuration.UnqualifiedRecordNameStrategy property. After setting the property, I realized that during the transform step of the topology the Serdes I'm configuring are not being used. Instead another AvroSerde is being configured. However the documentation says the following:

https://docs.spring.io/spring-cloud-stream-binder-kafka/docs/current/reference/html/spring-cloud-stream-binder-kafka.html#_outbound_serialization:

"Value serdes are inferred using the same rules used for inbound deserialization. First it matches to see if the outbound type is from a provided bean in the application. If not, it checks to see if it matches with a Serde exposed by Kafka such as - Integer, Long, Short, Double, Float, byte[], UUID and String. If that doesnt’t work, then it falls back to JsonSerde provided by the Spring Kafka project, but first look at the default Serde configuration to see if there is a match. Keep in mind that all these happen transparently to the application. If none of these work, then the user has to provide the Serde to use by configuration."

Based on this documentation the Serde beans I configured should only be used, however something else is configuring another one using properties under spring.kafka.properties instead of the spring.cloud.stream.kafka. prefix.

bjeSSAdev commented 1 year ago

I apologize if that's too much information but I wanted to make sure I provided enough configuration to properly demonstrate the issue.

bjeSSAdev commented 1 year ago

The framework is calling "configure" on the infered serdes which causes serde information to be logged to the console again. This also overwrites any custom configuration with defaults, unless the custom configuration is also declared in the properties file.