LGouellec / kafka-streams-dotnet

.NET Stream Processing Library for Apache Kafka 🚀
https://lgouellec.github.io/kafka-streams-dotnet/
MIT License
455 stars 74 forks source link

Questions: How to provide serializer to the WrappedWindowStore #74

Closed 0x1D-1983 closed 3 years ago

0x1D-1983 commented 3 years ago

Description

When creating tumbling windows the result object's SerDes is not passed to the inner window store.

How to reproduce

Having the following stream:

StreamBuilder builder = new StreamBuilder();

builder.Stream<string, ObjectA, StringSerDes, SchemaAvroSerDes<ObjectA>>(_config.InputTopicName)
    .Map((key, value) => new KeyValuePair<string, ObjectA>(value.symbol, value))
    .GroupByKey()
    .WindowedBy(TumblingWindowOptions.Of(TimeSpan.FromMinutes(5)))
    .Aggregate<ObjectB, SchemaAvroSerDes<ObjectB>>(
        () => new ObjectB(),
        (key, ObjectA, ObjectB) => _ObjectBHelper.CreateObjectB(key, ObjectA, ObjectB))
    .ToStream()
    .Map((key, ObjectB) => new KeyValuePair<string, ObjectB>(key.Key, ObjectB))
    .To<StringSerDes, SchemaAvroSerDes<ObjectB>>(_config.OutputTopicName);

Will throw the following exception:

Streamiz.Kafka.Net.Errors.StreamsException: SchemaSerDes<ObjectB> is not initialized !
   at Streamiz.Kafka.Net.SchemaRegistry.SerDes.SchemaSerDes`1.Serialize(T data, SerializationContext context) in /Users/oxid/code/kafka-streams-dotnet/serdes/Streamiz.Kafka.Net.SchemaRegistry.SerDes/SchemaSerDes.cs:line 68
   at Streamiz.Kafka.Net.SerDes.ValueAndTimestampSerDes`1.Serialize(ValueAndTimestamp`1 data, SerializationContext context) in /Users/oxid/code/kafka-streams-dotnet/core/SerDes/ValueAndTimestampSerDes.cs:line 56
   at Streamiz.Kafka.Net.State.Internal.WrappedWindowStore`2.GetValueBytes(V value) in /Users/oxid/code/kafka-streams-dotnet/core/State/Internal/WrappedWindowStore.cs:line 48
   at Streamiz.Kafka.Net.State.Internal.WrappedWindowStore`2.Put(K key, V value, Int64 windowStartTimestamp) in /Users/oxid/code/kafka-streams-dotnet/core/State/Internal/WrappedWindowStore.cs:line 77
   at Streamiz.Kafka.Net.Processors.KStreamWindowAggregateProcessor`4.Process(K key, V value) in /Users/oxid/code/kafka-streams-dotnet/core/Processors/KStreamWindowAggregateProcessor.cs:line 74
   at Streamiz.Kafka.Net.Processors.AbstractProcessor`2.Forward[K1,V1](K1 key, V1 value) in /Users/oxid/code/kafka-streams-dotnet/core/Processors/AbstractProcessor.cs:line 83
   at Streamiz.Kafka.Net.Processors.KStreamMapProcessor`4.Process(K key, V value) in /Users/oxid/code/kafka-streams-dotnet/core/Processors/KStreamMapProcessor.cs:line 20
   at Streamiz.Kafka.Net.Processors.SourceProcessor`2.Process(K key, V value) in /Users/oxid/code/kafka-streams-dotnet/core/Processors/SourceProcessor.cs:line 57
   at Streamiz.Kafka.Net.Processors.AbstractProcessor`2.Process(Object key, Object value) in /Users/oxid/code/kafka-streams-dotnet/core/Processors/AbstractProcessor.cs:line 200
   at Streamiz.Kafka.Net.Processors.AbstractProcessor`2.Process(ConsumeResult`2 record) in /Users/oxid/code/kafka-streams-dotnet/core/Processors/AbstractProcessor.cs:line 194
   at Streamiz.Kafka.Net.Processors.StreamTask.Process() in /Users/oxid/code/kafka-streams-dotnet/core/Processors/StreamTask.cs:line 311
   at Streamiz.Kafka.Net.Processors.Internal.TaskManager.Process(Int64 now) in /Users/oxid/code/kafka-streams-dotnet/core/Processors/Internal/TaskManager.cs:line 217

If I remove the windowing tasks and just leave an input/output Map steps, ObjectBs are properly written in the output topic, so the SerDes alone is working fine. But when the Windowing aggregator step is added the SerDes gets lost for the output object.

Modifying the ValueAndTimestampSerDes class to initialize the inner serdes causes seems to be working but it's not quite 100%, the output topic SerializationContext is still of the input and not the output.

InnerSerdes.Initialize(serDesContext);

How can I properly feed in the output schema SerDes and topic name to the below method: public override byte[] Serialize(ValueAndTimestamp<V> data, SerializationContext context)

Checklist

"Streamiz.Kafka.Net" Version="1.1.5" "Streamiz.Kafka.Net.SchemaRegistry.SerDes.Avro" Version="1.1.5" Kafka docker image: confluentinc/cp-kafka:5.3.0 Schema registry: confluentinc/cp-schema-registry:6.0.0 .net 5.0 on MacOS

LGouellec commented 3 years ago

Hi @0x1D-1983,

You are right ! Serdes is not initialized from Schema registry and if you modify ValueAndTimestampSerDes ( initialize the inner serdes), maybe it's cool but can't work any time at 100%.

Why ?

At time I haven't changelog topic, so when schema registry client need a schema (option1 : check local cache and that's works ! option2: No present in cache, and need to contact schema registry with a subject (default: topicName-key or topicName-value), but I haven't changelog topic at time (present normally in next release 1.2.0 soon I hope)./

SO that's why and you have just source and sink data with SchemaAvroSerdes, it's works because I can set a topic name.

A good workaround is to use JSON Formatter (without registry) for intermediate statefull processor waiting 1.2.0 release.

If you have any other question, do not hesitate.

0x1D-1983 commented 3 years ago

Thank you for the answer. I will try Option1 (local cache) and if it doesn't work I will check the JSON formatter workaround. Cheers!

0x1D-1983 commented 3 years ago

It works fine with the JSON formatter. 🎉 I will keep an eye on future updates regarding the changelog topic. Happy to close this issue.