LGouellec / kafka-streams-dotnet

.NET Stream Processing Library for Apache Kafka 🚀
https://lgouellec.github.io/kafka-streams-dotnet/
MIT License
452 stars 73 forks source link

System.InvalidOperationException when accessing InMemoryWindowed Store #314

Open rao-mayur opened 4 months ago

rao-mayur commented 4 months ago

Description

After setting up an InMemoryWindowed Store using aggregation, I want to access the Store by doing a FetchAll() on the windowed store. However when trying to do this the Stream thread crashes and outputs the following error (complete stack trace is from Streamiz) :

"System.InvalidOperationException", "Collection was modified; enumeration operation may not execute.", \t at System.Collections.Generic.HashSet1.Enumerator.MoveNext() \n\t at Streamiz.Kafka.Net.State.InMemory.InMemoryWindowStore.RemoveExpiredData() \n\t at Streamiz.Kafka.Net.State.InMemory.InMemoryWindowStore.Fetch(Bytes key, Int64 time) \n\t at Streamiz.Kafka.Net.State.Metered.MeteredWindowStore2.<>c__DisplayClass17_0.b__0() \n\t at Streamiz.Kafka.Net.Crosscutting.ActionHelper.MeasureLatency[T](Func1 actionToMeasure, Sensor sensor) \n\t at Streamiz.Kafka.Net.Processors.KStreamWindowAggregateProcessor4.Process(K key, V value) \n\t at Streamiz.Kafka.Net.Processors.AbstractProcessor2.Forward(IEnumerable1 processors, Action1 action) \n\t at Streamiz.Kafka.Net.Processors.AbstractProcessor2.Forward(IEnumerable1 processors, Action1 action) \n\t at Streamiz.Kafka.Net.Processors.SourceProcessor2.Process(K key, V value) \n\t at Streamiz.Kafka.Net.Processors.AbstractProcessor2.Process(ConsumeResult`2 record) \n\t at Streamiz.Kafka.Net.Crosscutting.ActionHelper.MeasureLatency(Action action) \n\t at Streamiz.Kafka.Net.Processors.StreamTask.Process() \n\t at Streamiz.Kafka.Net.Processors.Internal.TaskManager.Process(Int64 now)

This is specially relevant when there is lag build up on the source topic.

Can you please review this and let me know if there is a bug in the library and something in RemoveExpiredData() is not thread safe? Meaning race conditions may happen leading to a crash like above?

I refer to this guide https://lgouellec.github.io/kafka-streams-dotnet/stores.html#in-memory-window-store to look at samples and best practices while using Streamiz. Can you please update that or provide me a recommended example/sample of setting up the WindowedStore and accessing it for lookups? I want to understand the correct way to call FetchAll() without crashing the streams thread.

How to reproduce

  1. Setup a sample topic. Pump some messages (around 80 or 90k) first. Make sure that the streams thread has to process some lag.
  2. Run the sample streams topology Program.txt
  3. After some time InvalidOperationException error should appear and cause the stream to die.

Checklist

Please provide the following information:

LGouellec commented 4 months ago

Hey @rao-mayur ,

Sorry for the delay, I will try to reproduce the issue and fix that. Come back to you ASAP. Regards,

LGouellec commented 6 days ago

Sorry for the huge delay @rao-mayur,

Can you share me the snippet of code where you hit FetchAll(..) please ?

rao-mayur commented 5 days ago

Hi @LGouellec

I have attached a file Program.txt and it has the snippet that does a FetchAll(). It's in How To Reproduce section (point 2). Can you please see the file?