LGouellec / kafka-streams-dotnet

.NET Stream Processing Library for Apache Kafka 🚀
https://lgouellec.github.io/kafka-streams-dotnet/
MIT License
452 stars 73 forks source link

InMemoryWindowStore System.NullReferenceException #271

Closed mayur-rao closed 11 months ago

mayur-rao commented 11 months ago

Description

When I try to reprocess the messages from earliest offset of a topic that is populating an InMemoryWindowStore, while restoring state and trying to skip record for expired segment, the library crashes on a NullReferenceException. In the class InMemoryWindowStore.cs method Put() line 351, the context.Timestamp variable is NULL. Code : if (windowStartTimestamp <= observedStreamTime - retention.TotalMilliseconds) { expiredRecordSensor.Record(1.0, context.Timestamp); logger.LogWarning("Skipping record for expired segment"); }

How to reproduce

  1. Setup a source topic called topic and produce messages to the topic with historical timestamps. Meaning some messages should have timestamp in Feb 2023, some in March 2023, some in April 2023 and finally some messages with current timestamp.
  2. Setup a simple windowed aggregation and write the results of the aggregation to an InMemoryWindows store.
  3. Setup an application Id.
  4. Run the app and let it process to the latest offset.
  5. Reset the offset to earliest for the application Id/consumer group.
  6. Restart the application to process messages. Program.txt

Checklist

Please provide the following information:

LGouellec commented 11 months ago

Fix in the 1.5.0-RC2 version

mayur-rao commented 11 months ago

Hi @LGouellec Appreciate the quick fix. I upgraded the package to 1.5.0-RC2 and tested in an app where I have multiple stream toplogies as part of processing. While testing I am running into an aggregate exception related to threads. Can you please take a look? Exception : info: Streamiz.Kafka.Net.Processors.GlobalStreamThread[0] global-stream-thread processor-1bc32f19-59fa-4c9d-922f-ddd6d20b2121-GlobalStreamThread Shutting down Unhandled exception. System.AggregateException: One or more errors occurred. (One or more errors occurred. (Thread has not been started.)) ---> System.AggregateException: One or more errors occurred. (Thread has not been started.) ---> System.Threading.ThreadStateException: Thread has not been started. at System.Threading.Thread.Join(Int32 millisecondsTimeout) at System.Threading.Thread.Join() at Streamiz.Kafka.Net.Processors.GlobalStreamThread.Dispose(Boolean waitForThread) at Streamiz.Kafka.Net.KafkaStream.Close() at Streamiz.Kafka.Net.KafkaStream.b26_0() at System.Threading.Tasks.Task.InnerInvoke() at System.Threading.Tasks.Task.<>c.<.cctor>b__272_0(Object obj) at System.Threading.ExecutionContext.RunFromThreadPoolDispatchLoop(Thread threadPoolThread, ExecutionContext executionContext, ContextCallback callback, Object state) --- End of stack trace from previous location --- at System.Threading.ExecutionContext.RunFromThreadPoolDispatchLoop(Thread threadPoolThread, ExecutionContext executionContext, ContextCallback callback, Object state) at System.Threading.Tasks.Task.ExecuteWithThreadLocal(Task& currentTaskSlot, Thread threadPoolThread) --- End of inner exception stack trace --- at System.Threading.Tasks.Task.ThrowIfExceptional(Boolean includeTaskCanceledExceptions) at System.Threading.Tasks.Task.Wait(Int32 millisecondsTimeout, CancellationToken cancellationToken) at System.Threading.Tasks.Task.Wait(TimeSpan timeout) at Streamiz.Kafka.Net.KafkaStream.Dispose() at Streamiz.Kafka.Net.KafkaStream.b25_0() at System.Threading.CancellationToken.<>c.b__12_0(Object obj) at System.Threading.CancellationTokenSource.Invoke(Delegate d, Object state, CancellationTokenSource source) at System.Threading.CancellationTokenSource.CallbackNode.<>c.b__9_0(Object s) at System.Threading.ExecutionContext.RunInternal(ExecutionContext executionContext, ContextCallback callback, Object state) --- End of stack trace from previous location --- at System.Threading.CancellationTokenSource.CallbackNode.ExecuteCallback() at System.Threading.CancellationTokenSource.ExecuteCallbackHandlers(Boolean throwOnFirstException) --- End of inner exception stack trace --- at System.Threading.CancellationTokenSource.ExecuteCallbackHandlers(Boolean throwOnFirstException) at System.Threading.CancellationTokenSource.NotifyCancellation(Boolean throwOnFirstException) at System.Threading.CancellationTokenSource.TimerCallback(Object state) at System.Threading.TimerQueueTimer.CallCallback(Boolean isThreadPool) at System.Threading.TimerQueueTimer.Fire(Boolean isThreadPool) at System.Threading.TimerQueue.FireNextTimers() at System.Threading.TimerQueue.AppDomainTimerCallback(Int32 id)

LGouellec commented 11 months ago

@mayur-rao ,

Do you have the full log of your application please ?

It happens when you shutdown the app ?

mayur-rao commented 11 months ago

It happens when I start the app which has multiple topologies as summarized in the logs. During the execution of the 1st topology, I think it is trying to close/stop a thread and running into the above exception. I was not facing this with nuget version 1.4.2

Attaching full logs as a txt. streams-issue-comment.txt

LGouellec commented 11 months ago

@mayur-rao ,

Can you share the GlobalTable topology please ?

mayur-rao commented 11 months ago

Hi @LGouellec

I did a restart of my application and after that no longer running into the thread exception. So please close this issue. I will report back if see an issue again.

Again, I thank you for your quick response and time spent.