spring-projects / spring-kafka

Provides Familiar Spring Abstractions for Apache Kafka
https://projects.spring.io/spring-kafka
Apache License 2.0
2.19k stars 1.56k forks source link

Allows to initialize/inject the Topology object without starting the Kafka streams. #3020

Closed foal closed 7 months ago

foal commented 9 months ago

Expected Behavior

Get the Topology from StreamsBuilderFactoryBean without starting the streams engine. Topology have to be created with properties and KafkaStreamsInfrastructureCustomizer and all futures updated if any.

It is necessary to run the tests with Kafka TopologyTestDriver.

Current Behavior

I can inject the StreamsBuilder, but I then I need to create and initialize the topology manually.

Context

Manual work around:

    @Autowired
    private StreamsBuilder streamBuilder;
    @Autowired
    private KafkaStreamsConfiguration streamConfig;
    @Autowired
    private KafkaStreamsInfrastructureCustomizer infrastructureCustomizer;

///....

        Topology topology = streamBuilder.build(streamConfig.asProperties());
        infrastructureCustomizer.configureTopology(topology);

But of course, it aligned with current version of StreamsBuilderFactoryBean only.

sobychacko commented 9 months ago

That is an interesting request. Could you elaborate on the use case that requires it?

artembilan commented 9 months ago

I wonder why do you need StreamsBuilderFactoryBean if you create it manually. There is also a PR like this https://github.com/spring-projects/spring-kafka/pull/1853 which does something with TopologyTestDriver.

The StreamsBuilderFactoryBean can also be configured for the setAutoStartup(false).

So, we indeed need more info how to proceed. Perhaps the fully formed test would be great start to look into.

Thanks

foal commented 9 months ago

I do not create StreamsBuilderFactoryBean manually. In the real application all configured automatically. The issue is in the tests. I have tree options:

The test in general looks like (simplified):

@TestPropertySource(properties = {
    "spring.kafka.bootstrap-servers=mock://appTopologyTest",
    "spring.kafka.streams.application-id=app-test-${random.uuid}",
    "spring.kafka.properties.schema.registry.url=mock://appTopologyTest",
    "spring.kafka.streams.properties.schema.registry.url=mock://appTopologyTest",
    "app.kafka.topic-offer-replicas=1",
    "spring.kafka.streams.auto-startup=false",
    "spring.kafka.listener.auto-startup=false",
    "logging.level.org.springframework.kafka.core.KafkaAdmin=OFF",
    "logging.level.io.confluent.kafka.serializers=WARN",
    "logging.level.org.apache.kafka.clients.admin=WARN",
})
@SpringBootTest
public class AppTopologyTest {

    //There is configuration for input/output topics

    @Autowired
    private StreamsBuilder streamsBuilder;
    @Autowired
    private KafkaStreamsConfiguration kafkaStreamsConfiguration;
    @Autowired
    private KafkaStreamsInfrastructureCustomizer infrastructureCustomizer;

    @ParameterizedTest(name = "{0}")
    @MethodSource
    protected void testProcessing(String info, List<InputNew<?, ?>> input, Matcher<Iterable<TestRecord>> expected, TopicType out) {

        //This is a tricky part I need to duplicate logic from 
        //https://github.com/spring-projects/spring-kafka/blob/main/spring-kafka/src/main/java/org/springframework/kafka/config/StreamsBuilderFactoryBean.java#L347
        Topology topology = streamsBuilder.build(kafkaStreamsConfiguration.asProperties());
        infrastructureCustomizer.configureTopology(topology);

        try (TopologyTestDriver testDriver = new TopologyTestDriver(topology, kafkaStreamsConfiguration.asProperties())) {

            Map<TopicType, TestInputTopic<?, ?>> inputTopics = new HashMap(10);

            input.forEach(i -> send(testDriver, inputTopics, i));

            TestOutputTopic outputTopic = getOutTopic(testDriver, out);

            List<TestRecord> output = outputTopic.readRecordsToList();

            MatcherAssert.assertThat(output, expected);
        }

    }

    @SuppressWarnings("unused")
    private static List<Arguments> testProcessing() {
        int i = 1;
        return List.of(
            testSteps(i++, "Duplicates", OUT, MATCHER_1, IN_MSG_1, IN_MSG_1),
            testSteps(i++, "Single ent", OUT, MATCHER_2, IN_MSG_3),
            testSteps(i++, "Single another ent", OUT, MATCHER_3, IN_MSG_4),
            testSteps(i++, "Comb", OUT,  MATCHER_4, IN_MSG_1, IN_MSG_2, IN_MSG_3, IN_MSG_4),
            //....
            testSteps(i++, "Nothing", OUT, MATCHER_NOTHING));
    }

    private <K, V> void send(TopologyTestDriver testDriver, Map<TopicType, TestInputTopic<?, ?>> inputTopics, InputNew<?, ?> input) {
        TestInputTopic<K, V> topic = getInputTopic(testDriver, inputTopics, input.getTopicType());
        topic.pipeInput(new TestRecord(input.getKey(), input.getValue()));
    }

    @SafeVarargs
    protected static Arguments testSteps(int iteration, String description, TopicType out,
                                         MatcherFn matcher, Function<Integer, InputNew>... inputs) {
        List<InputNew> parameters = StreamEx.of(inputs).map(input -> input.apply(iteration)).toList();
        return Arguments.of(description, parameters, matcher.apply(iteration), out);
    }
}
artembilan commented 9 months ago

OK. So, the idea is to have this logic in the StreamsBuilderFactoryBean:

Topology topol = getObject().build(this.properties); // NOSONAR: getObject() cannot return null
this.infrastructureCustomizer.configureTopology(topol);
this.topology = topol;

out of the start() scope, e.g. in the afterPropertiesSet()? And then have StreamsBuilderFactoryBean not started automatically and therefore don't have a KafkaStreams instances created over there.

If this is a plan, we will be happy to accept such a contribution.

Thanks

foal commented 9 months ago

Ok. Try to provide PR next week.

foal commented 8 months ago

Grr... Can't load the project into Eclipse due to https://github.com/eclipse-jdt/eclipse.jdt.core/issues/1621

Will be fixed in the next release of Eclipse (4.31) - not sure....

chickenchickenlove commented 7 months ago

@artembilan, @sobychacko, @foal Hi, Spring Kafka Team and reporter. I'm interested in this issue as well.

If this is on pausing, may i handle this issue? i'm willing to do this work after reporter and maintainer's ok answer.

foal commented 7 months ago

Yes, please!

chickenchickenlove commented 7 months ago

@foal Thanks a lot 🙇‍♂️ I will make a PR for this. 👍

chickenchickenlove commented 7 months ago

https://github.com/spring-projects/spring-kafka/pull/3172 I create new PR. Please take a look 🙇‍♂️