quarkusio / quarkus

Quarkus: Supersonic Subatomic Java.
https://quarkus.io
Apache License 2.0
13.84k stars 2.7k forks source link

Determine quarkus.kafka-streams.topics automatically #8909

Open rquinio opened 4 years ago

rquinio commented 4 years ago

Description

Today the mandatory property quarkus.kafka-streams.topics must be manually kept in sync in case topic names are overridden at runtime (which we do to ensure multi-tenancy when the same broker is shared across namespaces and/or deploying the same microservice multiple times but working on different topics). But in most common streaming microservice case (1 input & 1 output topics), topics are known at startup inside the Kafka Streams Topology, so the property becomes redundant and error prone.

Proposed feature would be to:

Implementation ideas

Kafka Streams APIs to discover sources and sinks:

for (Subtopology subtopology : topology.describe().subtopologies()) {
  for (Node node : subtopology.nodes()) {
    if (node instanceof Source) {
      Source source = (Source)node;
      // if(source.topicPattern())
      System.out.println(source.topicSet());
    }
    if (node instanceof Sink) {
      Sink sink = (Sink)node;
      // if(sink.topicNameExtractor())
      System.out.println(sink.topic());
    }
  }
}

cc @gunnarmorling

gunnarmorling commented 4 years ago

Today the mandatory property quarkus.kafka-streams.topics must be manually kept in sync in case topic names are overridden at runtime

You can prevent that by externalizing all topic names into other properties which then are referenced by quarkus.kafka-streams.topics and from within your topology producer. E.g. that's what I'm doing in the Debezium audit log example.

That being said, I really like the idea of defaulting the property to all topic names used in the topology. I wasn't aware of this possibility, it's definitely worth pursuing. Will comment on the PR on the impl.

rquinio commented 4 years ago

Interesting, I didn't know it was possible to reference another property via ${} ! I'm using Maven properties + filtering application.properties to avoid the duplication today. Does it work even if the reference is overridden at runtime via environment variables for instance AUDIT_VEGETABLES_TOPIC=dbserver2.inventory.vegetable ?

audit.context.data.topic=dbserver1.inventory.transaction_context_data
audit.vegetables.topic=dbserver1.inventory.vegetable
quarkus.kafka-streams.topics=${audit.context.data.topic},${audit.vegetables.topic}

But yeah, the less configuration the easier !

gunnarmorling commented 4 years ago

Does it work even if the reference is overridden at runtime via environment variables for instance AUDIT_VEGETABLES_TOPIC=dbserver2.inventory.vegetable ?

Worth trying, but yes, it'd be my expectation that it works.

shnplr commented 3 years ago

This enhancement could help to support topic pattern too e.g.

KStream<String, byte[]> raw = builder.stream(Pattern.compile("input-.*"));

JayGhiya commented 3 years ago

Interesting, I didn't know it was possible to reference another property via ${} ! I'm using Maven properties + filtering application.properties to avoid the duplication today. Does it work even if the reference is overridden at runtime via environment variables for instance AUDIT_VEGETABLES_TOPIC=dbserver2.inventory.vegetable ?

audit.context.data.topic=dbserver1.inventory.transaction_context_data
audit.vegetables.topic=dbserver1.inventory.vegetable
quarkus.kafka-streams.topics=${audit.context.data.topic},${audit.vegetables.topic}

But yeah, the less configuration the easier !

This does not work actually. Tried!