spring-projects / spring-kafka

Provides Familiar Spring Abstractions for Apache Kafka
https://projects.spring.io/spring-kafka
Apache License 2.0
2.18k stars 1.56k forks source link

KafkaStreamsConfiguration is not used to create the StreamBuilder instance #3180

Closed yoniwg closed 6 months ago

yoniwg commented 6 months ago

In what version(s) of Spring for Apache Kafka are you seeing this issue?

For example:

3.0.10

Describe the bug

The configuration set via KafkaStreamsConfiguration bean are not passed into the StreamBuilder instance, hence, ignored during the build of the topology. Specifically, I am trying to set default.dsl.store=in_memory, but when I use Materialized.as("Store-Name") it creates the MetrializedInternal (Which is used during the topology build process) object (Which is used during the topology build process) with storeType=ROCKS_DB. The code that defines the store type in MaterializedInternal is:

// if store type is not configured during creating Materialized, then try to get the topologyConfigs from nameProvider
// otherwise, set to default rocksDB
if (storeType == null) {
    storeType = StoreType.ROCKS_DB;
    if (nameProvider instanceof InternalStreamsBuilder) {
        final TopologyConfig topologyConfig = ((InternalStreamsBuilder) nameProvider).internalTopologyBuilder.topologyConfigs();
        if (topologyConfig != null) {
            storeType = topologyConfig.parseStoreType();
        }
    }
}

So it is capable of taking the configuration from the InternalStreamsBuilder. But when StreamsBuilderFactoryBean creates the StreamBuilder here:

@Override
protected synchronized StreamsBuilder createInstance() {
if (this.autoStartup) {
    Assert.state(this.properties != null,
            "streams configuration properties must not be null");
}
StreamsBuilder builder = new StreamsBuilder(); // <<<<<<<<<<<<<< HERE
this.infrastructureCustomizer.configureBuilder(builder);
return builder;
}

it does not use the constructor StreamsBuilder(TopologyConfig) which enables to pass the topology config, which in turn, used to create the InternalStreamsBuilder

To Reproduce

  1. Use the DSL with Materialized (for example: KTable<K, V>.filter(Predicate<? super K, ? super V> ,Materialized<K, V, KeyValueStore<Bytes, byte[]>>))
  2. Set the property:
    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    public KafkaStreamsConfiguration kStreamsConfigs() {
      Map<String, Object> kafkaProperties = new HashMap<>();
      // other kafka properties
      kafkaProperties.put(StreamsConfig.DEFAULT_DSL_STORE_CONFIG, StreamsConfig.IN_MEMORY);
      return new KafkaStreamsConfiguration(kafkaProperties);
    }
  3. Observe the creation of state store (by debugging, or by looking at the state.dir folder) and find out rocksdb is still in use.

Expected behavior

After passing kafkaProperties.put(StreamsConfig.DEFAULT_DSL_STORE_CONFIG, StreamsConfig.IN_MEMORY), all DSL state-stores must be of IN_MEMORY type even when using Materialized (unless configured explicitly else via the Materialized object)

sobychacko commented 6 months ago

@yoniwg I believe this a duplicate of https://github.com/spring-projects/spring-kafka/issues/3176. This has been addressed recently.

sobychacko commented 6 months ago

Closing the issue. Feel free to reopen it if you still see issues with the applied fix on https://github.com/spring-projects/spring-kafka/issues/3176.