LGouellec / kafka-streams-dotnet

.NET Stream Processing Library for Apache Kafka 🚀
https://lgouellec.github.io/kafka-streams-dotnet/
MIT License
455 stars 74 forks source link

Tumbling windowing causes Out of range exception #73

Closed 0x1D-1983 closed 3 years ago

0x1D-1983 commented 3 years ago

Description

Creating a tumbling window from a timestamp causes a value out of range exception in the Streamiz DateTimeExtensions class.

How to reproduce

Having the following stream:

StreamBuilder builder = new StreamBuilder();

builder.Stream<string, ObjectA, StringSerDes, SchemaAvroSerDes<ObjectA>>(_config.InputTopicName)
    .Map((key, value) => new KeyValuePair<string, ObjectA>(value.symbol, value))
    .GroupByKey()
    .WindowedBy(TumblingWindowOptions.Of(TimeSpan.FromMinutes(5)))
    .Aggregate<ObjectB, SchemaAvroSerDes<ObjectB>>(
        () => new ObjectB(),
        (key, ObjectA, ObjectB) => _ObjectBHelper.CreateObjectB(key, ObjectA, ObjectB))
    .ToStream()
    .Map((key, ObjectB) => new KeyValuePair<string, ObjectB>(key.Key, ObjectB))
    .To<StringSerDes, SchemaAvroSerDes<ObjectB>>(_config.OutputTopicName);

Will cause the exception:

Unhandled exception. Streamiz.Kafka.Net.Errors.StreamsException: Value to add was out of range. (Parameter 'value') ---> System.ArgumentOutOfRangeException: Value to add was out of range. (Parameter 'value') at System.DateTime.Add(Double value, Int32 scale) at System.DateTime.AddMilliseconds(Double value) at Streamiz.Kafka.Net.Crosscutting.DateTimeExtensions.FromMilliseconds(Int64 epoch) at Streamiz.Kafka.Net.Stream.Window..ctor(Int64 startMs, Int64 endMs) at Streamiz.Kafka.Net.Stream.TimeWindowOptions.WindowsFor(Int64 timestamp) at Streamiz.Kafka.Net.Processors.KStreamWindowAggregateProcessor4.Process(K key, V value) at Streamiz.Kafka.Net.Processors.AbstractProcessor2.Forward[K1,V1](K1 key, V1 value) at Streamiz.Kafka.Net.Processors.KStreamMapProcessor4.Process(K key, V value) at Streamiz.Kafka.Net.Processors.SourceProcessor2.Process(K key, V value) at Streamiz.Kafka.Net.Processors.AbstractProcessor2.Process(Object key, Object value) at Streamiz.Kafka.Net.Processors.AbstractProcessor2.Process(ConsumeResult`2 record) at Streamiz.Kafka.Net.Processors.StreamTask.Process() at Streamiz.Kafka.Net.Processors.Internal.TaskManager.Process(Int64 now) at Streamiz.Kafka.Net.Processors.StreamThread.Run() --- End of inner exception stack trace --- at Streamiz.Kafka.Net.Processors.StreamThread.TreatException(Exception exception) at Streamiz.Kafka.Net.Processors.StreamThread.Run() at System.Threading.ThreadHelper.ThreadStart_Context(Object state) at System.Threading.ExecutionContext.RunInternal(ExecutionContext executionContext, ContextCallback callback, Object state) --- End of stack trace from previous location --- at System.Threading.ExecutionContext.RunInternal(ExecutionContext executionContext, ContextCallback callback, Object state) at System.Threading.ThreadHelper.ThreadStart()

I was able to fix the issue by modifying the FromMilliseconds method like follows:

public static DateTime FromMilliseconds(this long epoch) { //return Jan1St1970.AddMilliseconds(epoch); return new DateTime(epoch); }

Basically, the long epoch is the ticks of the StartMS DateTime and not a ticks delta as the existing implementation expects.

Checklist

"Streamiz.Kafka.Net" Version="1.1.5" "Streamiz.Kafka.Net.SchemaRegistry.SerDes.Avro" Version="1.1.5" Kafka docker image: confluentinc/cp-kafka:5.3.0 Schema registry: confluentinc/cp-schema-registry:6.0.0 .net 5.0 on MacOS

LGouellec commented 3 years ago

Hi @0x1D-1983,

Do you have information about this out of range exception ? The value of startMs and endMs ? Just of understand the problem.

What is your local timezone ?

Thanks for your feedbacks !

0x1D-1983 commented 3 years ago

Hi, thanks for the prompt reply.

Here are the details:

epoch: 637542516600000000

startMs: 637542516600000000
endMs: 637542516600300000

windowStart: 637542516600000000
size: 300000 (<-- 5 Min tumbling window)

timeStamp from value.Bean: "2021-04-17T09:21:00-0000"

DateTime.Parse("2021-04-17T09:21:00-0000").Ticks
637542516600000000

The value comes from the payload. The tumbling window is based on this value.

My local culture is the following:

System.Globalization.CultureInfo.CurrentCulture
{en-US}
    Calendar: {System.Globalization.GregorianCalendar}
    CompareInfo: {System.Globalization.CompareInfo}
    CultureTypes: System.Globalization.CultureTypes.SpecificCultures
    DateTimeFormat: {System.Globalization.DateTimeFormatInfo}
    DisplayName: "English (United States)"
    EnglishName: "English (United States)"
    IetfLanguageTag: "en-US"
    IsNeutralCulture: false
    IsReadOnly: true
    KeyboardLayoutId: 1033
    LCID: 1033
    Name: "en-US"
    NativeName: "English (United States)"
    NumberFormat: {System.Globalization.NumberFormatInfo}
    OptionalCalendars: {System.Globalization.Calendar[1]}
    Parent: {en}
    TextInfo: {TextInfo - en-US}
    ThreeLetterISOLanguageName: "eng"
    ThreeLetterWindowsLanguageName: "ENU"
    TwoLetterISOLanguageName: "en"
    UseUserOverride: true
    Static members: 
    Non-Public members: 
LGouellec commented 3 years ago

Hi @0x1D-1983 ,

I don't understand why your startMs is equal to 637542516600000000, normally startMs is a unix epoch timestamp from kafka record metadata.

Maybe you use a timestampExtractor in your configuration ?

In Kafka .net package, you use unix timestamp ms, so in Streamiz also. If you use timestampExtractor , please return a unixTimestampMs long data. You can use helper class to convert DateTime to long unixTimestampMs :

DateTime dt = DateTime.Parse("2021-04-17T09:21:00-0000");
long unixTimestampMs = Confluent.Kafka.Timestamp.DateTimeToUnixTimestampMs(dt);

Regards,

0x1D-1983 commented 3 years ago

Hi @LGouellec

yes, I'm using a timestamp extractor that extracted the timestamp value from the payload. Sorry, I forgot to mention this explicitly, but that's what I meant by saying the window is based on the value from the payload.

And indeed the value was DateTime ticks instead of Unix timestamp. I was expecting the Ticks to return a Unix timestamp 😅

Converting the ticks to Unix timestamp fixed this issue.

Thank you for your help!

LGouellec commented 3 years ago

Great !

I write some unit tests to explain this situation if you want :

No problem :) ++