LGouellec / kafka-streams-dotnet

.NET Stream Processing Library for Apache Kafka 🚀
https://lgouellec.github.io/kafka-streams-dotnet/
MIT License
452 stars 73 forks source link

Topic record timestamp extractor #329

Closed ppilev closed 1 week ago

ppilev commented 2 weeks ago

Provides control over timestamp value of records published to Kafka output topic.

Some context below: Q: How this functionality is different than ITimestampExtractor applied to a source stream or IKStream<K, V>.WithRecordTimestamp applied to some existing stream?

A: In simple scenarios either method can be used to provide timestamps for the records sent to the output topic. For example ITimestampExtractor provides the initial value for RecordContext.Timestamp, whereas IKStream<K, V>.WithRecordTimestamp can override that value later. However in some scenarios you might need more grained control over what timestamp is applied to the output topic records and change that timestamp without affecting the RecordContext.Timestamp of underlying stream.

Example with pseudo code which WON'T WORK:

var joinedStream = stream1.WithRecordTimestamp(m1 => m1.Timestamp) LEFT JOIN stream2.WithRecordTimestamp(m2 => m2.Timestamp);
joinedStream.WithRecordTimestamp(utcNow).To("output-topic")

This basically will break your join because applying joinedStream.WithRecordTimestamp(utcNow) will override the timestamp in RecordContext.Timestamp and value stored by KStreamJoinWindowProcessor into the window store will be utcNow as all topology processors are executed first:

        public override void Process(K key, V value)
        {
            if(key != null)
            {
                Forward(key, value);
                window.Put(key, value, Context.Timestamp);
            }
        }

The solution I came up with is to bring the desired timestamp value directly to SinkProcessor without touching the value in RecordContext.Timestamp at all.

Example with pseudo code which WORKS with changes proposed by this PR:

var joinedStream = stream1.WithRecordTimestamp(m1 => m1.Timestamp) LEFT JOIN stream2.WithRecordTimestamp(m2 => m2.Timestamp);
joinedStream.To("output-topic", extractor => utcNow)
ppilev commented 2 weeks ago

hey @LGouellec

I know the spare time is a luxury but could you please review it when you've got some. what's the chance to have that change being available with the upcoming 1.6.0 release?

regards, Plamen

LGouellec commented 2 weeks ago

Hi @ppilev ,

I will take a look before end of next week ! Good chance to merge before 1.6