LGouellec / kafka-streams-dotnet

.NET Stream Processing Library for Apache Kafka 🚀
https://lgouellec.github.io/kafka-streams-dotnet/
MIT License
452 stars 73 forks source link

Add state stores through StreamBuilder #284

Closed ppilev closed 8 months ago

ppilev commented 9 months ago

Changes:

NOTE: It's required to connect state stores to IProcessor<K, V> or ITransformer<K, V, K1, V1> before use it.

Example:

  1. Add a state store

        var builder = new StreamBuilder();
        var storeBuilder = Stores.KeyValueStoreBuilder<K, V>(
            Stores.InMemoryKeyValueStore("output-store"),
            new JsonSerDes<K>(),
            new JsonSerDes<V>());
    
        builder.AddStateStore(storeBuilder);
  2. Connect the state store with the transformer or processor by passing the store name to storeNames parameter of Process, Transform or TransformValues method:

     builder.Stream<K, V>("topic-name")
            .Transform(
                TransformerBuilder.New<K, V, K1, V1>().Transformer<TTransformer>().Build(), 
                storeNames: "output-store")
  3. The state store now can be retrieved in TTransformer.Init and can be used later when TTransformer.Process is called

        public void Init(ProcessorContext<K, V> context)
        { 
            stateStore = (IKeyValueStore<K, V>)context.GetStateStore("output-store");
        }
LGouellec commented 9 months ago

Hi @ppilev ,

Thanks for your contribution. Can you create multiple unit tests to cover your feature please ?

Best regards,