pmikstacki / SliccDB

Light Embedded Graph Database for .net
MIT License
132 stars 14 forks source link

`.Labels(string)` method contains race condition #9

Closed holly-hacker closed 1 year ago

holly-hacker commented 2 years ago

In a tight loop where you both query and insert nodes (common for "upsert" scenarios), the parallel processing of the .Labels(string) method may cause exceptions. I believe this is because it uses allEntities.AsParallel().ForAll to iterate over all entities and it then concurrently puts them in a non-concurrent hashset:

DatabaseConnectionExceptions.cs

    public static HashSet<Node> Labels(this HashSet<Node> allEntities, params string[] labels)
    {
        var entities = new HashSet<Node>();

        allEntities.AsParallel().ForAll(node =>
        {
            var commonLabels = node.Labels.Count(x => labels.Contains(x));
            if (commonLabels == node.Labels.Count)
                entities.Add(node);
        });

        return entities;
    }

The following sample reproduces the error:

var db = new DatabaseConnection("test.db");
db.ClearDatabase();
for (int i = 0; i < 1000; i++)
{
    Console.WriteLine($"Iteration {i}");
    _ = db.Nodes
        .Labels("item")
        // .Properties("id".Value(i.ToString()))
        .FirstOrDefault();

    db.CreateNode(
        properties: new Dictionary<string, string> { { "id", i.ToString() } },
        labels: new HashSet<string> { "item" }
    );
}

... which returns the following output for me:

Iteration 0
Iteration 1
Iteration 2
Iteration 3
Iteration 4
Iteration 5
Iteration 6
Iteration 7
Iteration 8
Iteration 9
Iteration 10
Iteration 11
Iteration 12
Unhandled exception. System.AggregateException: One or more errors occurred. (Index was outside the bounds of the array.)
 ---> System.IndexOutOfRangeException: Index was outside the bounds of the array.
   at System.Collections.Generic.HashSet`1.AddIfNotPresent(T value, Int32& location)
   at System.Collections.Generic.HashSet`1.Add(T item)
   at SliccDB.Fluent.DatabaseConnectionExtensions.<>c__DisplayClass8_0.<Labels>b__0(Node node) in D:\Projects\DotNet\eetool\SliccDB\SliccDB\Fluent\DatabaseConnectionExtensions.cs:line 107
   at System.Linq.Parallel.ForAllOperator`1.ForAllEnumerator`1.MoveNext(TInput& currentElement, Int32& currentKey)
   at System.Linq.Parallel.ForAllSpoolingTask`2.SpoolingWork()
   at System.Linq.Parallel.SpoolingTaskBase.Work()
   at System.Linq.Parallel.QueryTask.BaseWork(Object unused)
   at System.Linq.Parallel.QueryTask.<>c.<.cctor>b__10_0(Object o)
   at System.Threading.Tasks.Task.InnerInvoke()
   at System.Threading.Tasks.Task.<>c.<.cctor>b__271_0(Object obj)
   at System.Threading.ExecutionContext.RunFromThreadPoolDispatchLoop(Thread threadPoolThread, ExecutionContext executionContext, ContextCallback callback, Object state)
--- End of stack trace from previous location ---
   at System.Threading.ExecutionContext.RunFromThreadPoolDispatchLoop(Thread threadPoolThread, ExecutionContext executionContext, ContextCallback callback, Object state)
   at System.Threading.Tasks.Task.ExecuteWithThreadLocal(Task& currentTaskSlot, Thread threadPoolThread)
   --- End of inner exception stack trace ---
   at System.Linq.Parallel.QueryTaskGroupState.QueryEnd(Boolean userInitiatedDispose)
   at System.Linq.Parallel.SpoolingTask.SpoolForAll[TInputOutput,TIgnoreKey](QueryTaskGroupState groupState, PartitionedStream`2 partitions, TaskScheduler taskScheduler)
   at System.Linq.Parallel.DefaultMergeHelper`2.System.Linq.Parallel.IMergeHelper<TInputOutput>.Execute()
   at System.Linq.Parallel.MergeExecutor`1.Execute()
   at System.Linq.Parallel.MergeExecutor`1.Execute[TKey](PartitionedStream`2 partitions, Boolean ignoreOutput, ParallelMergeOptions options, TaskScheduler taskScheduler, Boolean isOrdered, CancellationState cancellationState, Int32 queryId)   at System.Linq.Parallel.PartitionedStreamMerger`1.Receive[TKey](PartitionedStream`2 partitionedStream)
   at System.Linq.Parallel.ForAllOperator`1.WrapPartitionedStream[TKey](PartitionedStream`2 inputStream, IPartitionedStreamRecipient`1 recipient, Boolean preferStriping, QuerySettings settings)
   at System.Linq.Parallel.UnaryQueryOperator`2.UnaryQueryOperatorResults.ChildResultsRecipient.Receive[TKey](PartitionedStream`2 inputStream)
   at System.Linq.Parallel.ScanQueryOperator`1.ScanEnumerableQueryOperatorResults.GivePartitionedStream(IPartitionedStreamRecipient`1 recipient)
   at System.Linq.Parallel.UnaryQueryOperator`2.UnaryQueryOperatorResults.GivePartitionedStream(IPartitionedStreamRecipient`1 recipient)
   at System.Linq.Parallel.QueryOperator`1.GetOpenedEnumerator(Nullable`1 mergeOptions, Boolean suppressOrder, Boolean forEffect, QuerySettings querySettings)
   at System.Linq.Parallel.ForAllOperator`1.RunSynchronously()
   at System.Linq.ParallelEnumerable.ForAll[TSource](ParallelQuery`1 source, Action`1 action)
   at SliccDB.Fluent.DatabaseConnectionExtensions.Labels(HashSet`1 allEntities, String[] labels) in D:\Projects\DotNet\eetool\SliccDB\SliccDB\Fluent\DatabaseConnectionExtensions.cs:line 103
   at Program.<Main>$(String[] args) in D:\Projects\DotNet\eetool\eetool\Program.cs:line 13
   at Program.<Main>(String[] args)

I am using the latest commit on the master branch, which currently is 9b6972e682b033f6f4daa901106863df7221809f.

pmikstacki commented 2 years ago

Hi! Since I am now working on my thesis I don't have time neccessary to work on SliccDB. I will look into it in my free time, but unfortunately I don't have one recently... Please feel free to create a pull request with a fix tho!

pmikstacki commented 1 year ago

I removed all concurrency from these methods. now it should not cause this issue.