quarkusio / quarkus

Quarkus: Supersonic Subatomic Java.
https://quarkus.io
Apache License 2.0
13.85k stars 2.7k forks source link

TopologyTestDriver does not pick up serde defaults #31887

Open msillence opened 1 year ago

msillence commented 1 year ago

Description

In my application.properties I have:

quarkus.kafka-streams.schema-registry-url=${kafka.schema.registry.url}
quarkus.kafka-streams.default.key.serde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
quarkus.kafka-streams.default.value.serde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde

in my application-test.properties I have:

quarkus.apicurio-registry.devservices.port=18081

kafka.schema.registry.url=http://localhost:18081/apis/ccompat/v6
schema.registry.url=${kafka.schema.registry.url}

new TopologyTestDriver(topology) - does not pick these up instead I need to manually create the properties and pass them in:

   @ConfigProperty(name="quarkus.kafka-streams.default.key.serde")
   String keySerde;

   @ConfigProperty(name="quarkus.kafka-streams.default.value.serde")
   String valueSerde;

   @ConfigProperty(name="schema.registry.url")
   String schemaRegistry;   

   @Test
   public void myTest() {

       // using DSL
       StreamsBuilder builder = new StreamsBuilder();
      .... build thing to test
       Topology topology = builder.build();

       // create test driver
       Properties p = new Properties();
       p.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, keySerde);
       p.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, valueSerde);
       p.setProperty("schema.registry.url", schemaRegistry);

       try (TopologyTestDriver testDriver = new TopologyTestDriver(topology, p)) {

### Implementation ideas

It would be great if it could pick up the same properties I use to configure the app and I didn't have manually add in the right mappings.

i.e.

quarkus.kafka-streams.schema-registry-url quarkus.kafka-streams.default.key.serde quarkus.kafka-streams.default.value.serde

rquinio commented 1 year ago

@msillence do you build a different topology for the test itself ? In quickstart https://github.com/quarkusio/quarkus-quickstarts/blob/415210003ec505d53b3c6f5c31bc1e075eddfa53/kafka-streams-quickstart/aggregator/src/test/java/org/acme/kafka/streams/aggregator/streams/TopologyProducerTest.java#L52-L55 the one produced by https://github.com/quarkusio/quarkus-quickstarts/blob/415210003ec505d53b3c6f5c31bc1e075eddfa53/kafka-streams-quickstart/aggregator/src/main/java/org/acme/kafka/streams/aggregator/streams/TopologyProducer.java#L33 is injected.

Maybe there could be TopologyTestDriverResource QuarkusTestResource that would create a TopologyTestDriver bean.

geoand commented 1 year ago

Maybe there could be TopologyTestDriverResource QuarkusTestResource that would create a TopologyTestDriver bean.

Sounds reasonable to me