LGouellec / kafka-streams-dotnet

.NET Stream Processing Library for Apache Kafka 🚀
https://lgouellec.github.io/kafka-streams-dotnet/
MIT License
452 stars 73 forks source link

RocksDB.Range() and All(): A concurrent update was performed on this collection and corrupted its state. #320

Closed tsuz closed 2 months ago

tsuz commented 2 months ago

Description

When accessing the state store from multiple threads, this error is thrown. It is coming from here which looks like it's tracking open handlers to RocksDB. I want to know if this is safe to ignore.

It is coming from here and is consistent with the behavior where the error also occurs with these APIs:

but it does not occur with

The error does not occur with InMemoryStore but only occurs on RocksDBStore.

I tried using locks and thread pools but had no effect.

      Connection id "0HN2NOS4VPFJU", Request id "0HN2NOS4VPFJU:000000F9": An unhandled exception was thrown by the application.
      System.InvalidOperationException: Operations that change non-concurrent collections must have exclusive access. A concurrent update was performed on this collection and corrupted its state. The collection's state is no longer correct.
         at System.Collections.Generic.HashSet`1.AddIfNotPresent(T value, Int32& location)
         at System.Collections.Generic.HashSet`1.Add(T item)
         at Streamiz.Kafka.Net.State.RocksDb.RocksDbKeyValueStore.Range(Bytes from, Bytes to, Boolean forward)
         at Streamiz.Kafka.Net.State.Metered.MeteredKeyValueStore`2.Range(Func`3 enumerator, K from, K to)
         at Streamiz.Kafka.Net.State.Internal.ReadOnlyKeyValueStoreFacade`2.Range(K from, K to)
         at System.Linq.Enumerable.SelectArrayIterator`2.ToList()
         at Streamiz.Kafka.Net.State.Enumerator.CompositeKeyValueEnumerator`2..ctor(IEnumerable`1 enumerable)
         at Streamiz.Kafka.Net.State.Internal.CompositeReadOnlyKeyValueStore`2.Range(K from, K to)

How to reproduce

  1. Run the code below
  2. Run dotnet build && dotnet run --urls="http://localhost:5004"
  3. Run JMeter with 100 concurrency and many requests
  4. The failure message is shown intermittently
Sample code
```csharp using System; using System.Text.Json; using System.Threading.Tasks; using Confluent.Kafka; using Microsoft.Extensions.Logging; using Streamiz.Kafka.Net; using Streamiz.Kafka.Net.SerDes; using Streamiz.Kafka.Net.State; using Streamiz.Kafka.Net.Stream; using Streamiz.Kafka.Net.Table; using Streamiz.Kafka.Net.Metrics.Prometheus; using Streamiz.Kafka.Net.Processors.Public; using Streamiz.Kafka.Net.State.RocksDb; var config = new StreamConfig(); config.ApplicationId = "test-app-rocksdbconcurrency"; config.UsePrometheusReporter(9098, true); config.MetricsRecording = Streamiz.Kafka.Net.Metrics.MetricsRecordingLevel.DEBUG; config.StateDir = "./state-store"; config.AutoOffsetReset = AutoOffsetReset.Latest; var isRunningState = false; StreamBuilder builder = new StreamBuilder(); var storeName = "mystore2"; var store = RocksDb.As(storeName); builder.GlobalTable("input-topic", store); Topology t = builder.Build(); KafkaStream stream = new KafkaStream(t, config); stream.StateChanged += (old, @new) => { Console.WriteLine($"old state: {old}, new state: {@new}"); if (@new.Equals(KafkaStream.State.RUNNING)) { isRunningState = true; Console.WriteLine("Running now"); } }; await stream.StartAsync(); while (!isRunningState) { Thread.Sleep(500); Console.WriteLine("Waiting for running state"); } if (isRunningState) { createWebServer(stream); } void createWebServer(KafkaStream stream) { var builder = WebApplication.CreateBuilder(args); var app = builder.Build(); app.MapGet("/range", async() => { var queryableStoreType = QueryableStoreTypes.KeyValueStore(); IReadOnlyKeyValueStore store = stream.Store(StoreQueryParameters.FromNameAndType(storeName, queryableStoreType)); var iterator = store.Range("A", "Z"); while (iterator.MoveNext()) { Console.WriteLine(iterator.Current); } return "OK"; }); app.Run(); } ```
csproj file
```csharp Exe net7.0 enable enable ```

Checklist

Please provide the following information: