LGouellec / streamiz

.NET Stream Processing Library for Apache Kafka 🚀
https://lgouellec.github.io/streamiz/
MIT License
462 stars 74 forks source link

Windowed KTables appear to have a concurrency issue. #382

Open AntonyLittle opened 4 days ago

AntonyLittle commented 4 days ago

Description

I am seeing the following exception when using Windowed KTables in Streamiz 1.6.0:

fail: Streamiz.Kafka.Net.Processors.StreamThread[0]
      stream-thread[urn.menzies.falcona.interface_tableruntobarcodestable-1424c242-4ed5-48a9-927f-c3e6b8abe430-stream-thread-0] Encountered the following error during processing:
      System.InvalidOperationException: Collection was modified; enumeration operation may not execute.
         at System.Collections.Generic.HashSet`1.Enumerator.MoveNext()
         at Streamiz.Kafka.Net.State.InMemory.InMemoryWindowStore.RemoveExpiredData()
         at Streamiz.Kafka.Net.State.InMemory.InMemoryWindowStore.Fetch(Bytes key, Int64 time)
         at Streamiz.Kafka.Net.State.Metered.MeteredWindowStore`2.<>c__DisplayClass20_0.<Fetch>b__0()
         at Streamiz.Kafka.Net.Crosscutting.ActionHelper.MeasureLatency[T](Func`1 actionToMeasure, Sensor sensor)
         at Streamiz.Kafka.Net.Processors.KStreamWindowAggregateProcessor`4.Process(K key, V value)
         at Streamiz.Kafka.Net.Processors.AbstractProcessor`2.Forward(IEnumerable`1 processors, Action`1 action)
         at Streamiz.Kafka.Net.Processors.AbstractProcessor`2.Forward(K key, V value)
         at Streamiz.Kafka.Net.Processors.AbstractProcessor`2.Process(ConsumeResult`2 record)
         at Streamiz.Kafka.Net.Processors.StreamTask.Process()
         at Streamiz.Kafka.Net.Processors.Internal.TaskManager.Process(Int64 now)
         at Streamiz.Kafka.Net.Crosscutting.ActionHelper.MeasureLatency(Action action)
         at Streamiz.Kafka.Net.Processors.StreamThread.Run()

How to reproduce

Create materializer thusly:

        var materializer
            = InMemoryWindows
                .As<TGroupedKey, TValueResult>(tableName)
                .WithKeySerdes((ISerDes<TGroupedKey>)SerDesHelper.GetSerDes<TGroupedKey>(KafkaConfiguration))
                .WithValueSerdes((ISerDes<TValueResult>)SerDesHelper.GetSerDes<TValueResult>(KafkaConfiguration))
                .WithRetention(TimeSpan.FromMilliseconds(_windowSizeMs));

Topology is created like so:


        builder.Stream<TKey, TValue>(TopicName).GroupBy(Grouper).Aggregate(Initializer, Aggregator, materializer)).Build();

Access store thusly:


        _windowStore ??= TableStream.Store(
            StoreQueryParameters.FromNameAndType(Name, QueryableStoreTypes.WindowStore<TGroupedKey, TValueResult>()));
        _windowStore
                .Fetch(
                    key,
                    now.Subtract(TimeSpan.FromMilliseconds(_windowSizeMs + _windowAdvanceMs)),
                    now.Add(TimeSpan.FromMilliseconds(_windowSizeMs)))
                .ToList();

-- OR --

        _windowStore
                .FetchAll(
                    now.Subtract(TimeSpan.FromMilliseconds(_windowSizeMs + _windowAdvanceMs)),
                    now.Add(TimeSpan.FromMilliseconds(_windowSizeMs)))
                .ToList();

The issue does not occur every time, but only with large volumes of data. I suspect the issue is due to the Streamiz framework calling Fetch() at the same time as our code.

Checklist

Please provide the following information:

LGouellec commented 4 days ago

Hey @AntonyLittle,

Thank you for your issue and interest of Streamiz. I will try to reproduce the issue and fix it as soon as possible.

Quick question : If you have a large amount of data, why not choosing RocksDb as a persistent layer of storage ?

Best regards,

LGouellec commented 12 hours ago

@AntonyLittle ,

Are you able to test with this specific branch ? https://github.com/LGouellec/streamiz/tree/fix/concurrent-issue-window

Thanks,