flipkart-incubator / databuilderframework

A data driven execution engine
33 stars 29 forks source link

ConcurrentModificationExceptions when using Optimised Multithread Executors #56

Open shadowmanoj opened 4 months ago

shadowmanoj commented 4 months ago

Encountered a ConcurrentModificationException in a multi-threaded environment when attempting to get DataSet object by filtering the available data based on the accessibility criteria defined for a DataBuilder. The application uses a DataSet class to manage data and it is stored in availableData which is a Map<String, Data>. If the builderLevel is more than 1, we are experiencing this error.

To provide filtered views of this data, we use Guava's Maps.filterKeys alongside Predicates.in based on criteria from DataBuilderMeta. The exception suggests a concurrent modification issue, likely due to the iteration over a collection that is subject to concurrent changes. Given the Maps.filterKeys creates a live view of the original map, any concurrent modifications to the underlying map (such as additions or removals) may lead to this exception if the view is iterated over concurrently.

Let us know if this issue was encountered earlier(fixed in some other branch) or we are using this executors and dataSets wrongly.

Executor code: OptimizedMultiThreadedDataFlowExecutor executor = new OptimizedMultiThreadedDataFlowExecutor(Executors.newFixedThreadPool(10));

Source Code which triggered exception:

public DataSet getDataSet(DataBuilder builder) { Preconditions.checkNotNull(builder.getDataBuilderMeta(), "No metadata present in this builder"); return new DataSet( Maps.filterKeys(Utils.sanitize(dataSet.getAvailableData()), Predicates.in(Utils.sanitize(builder.getDataBuilderMeta().getAccessibleDataSet())))); }

Complete stack trace

0 = {StackTraceElement@20540} "java.base/java.util.HashMap$HashIterator.nextNode(HashMap.java:1510)" 1 = {StackTraceElement@20541} "java.base/java.util.HashMap$EntryIterator.next(HashMap.java:1543)" 2 = {StackTraceElement@20542} "java.base/java.util.HashMap$EntryIterator.next(HashMap.java:1541)" 3 = {StackTraceElement@20543} "com.google.common.collect.Iterators.indexOf(Iterators.java:806)" 4 = {StackTraceElement@20544} "com.google.common.collect.Iterators.any(Iterators.java:698)" 5 = {StackTraceElement@20545} "com.google.common.collect.Iterables.any(Iterables.java:634)" 6 = {StackTraceElement@20546} "com.google.common.collect.Collections2$FilteredCollection.isEmpty(Collections2.java:176)" 7 = {StackTraceElement@20547} "com.google.common.collect.Maps$AbstractFilteredMap.isEmpty(Maps.java:2957)" 8 = {StackTraceElement@20548} "com.flipkart.databuilderframework.engine.Utils.isEmpty(Utils.java:38)" 9 = {StackTraceElement@20549} "com.flipkart.databuilderframework.engine.Utils.sanitize(Utils.java:54)" 10 = {StackTraceElement@20550} "com.flipkart.databuilderframework.engine.DataBuilderContext.getDataSet(DataBuilderContext.java:42)" 11 = {StackTraceElement@20552} "com.flipkart.databuilderframework.engine.OptimizedMultiThreadedDataFlowExecutor$BuilderRunner.call(OptimizedMultiThreadedDataFlowExecutor.java:249)" 12 = {StackTraceElement@20553} "com.flipkart.databuilderframework.engine.OptimizedMultiThreadedDataFlowExecutor$BuilderRunner.call(OptimizedMultiThreadedDataFlowExecutor.java:206)" 13 = {StackTraceElement@20554} "java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264)" 14 = {StackTraceElement@20555} "java.base/java.util.concurrent.FutureTask.run(FutureTask.java)" 15 = {StackTraceElement@20556} "java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)" 16 = {StackTraceElement@20557} "java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264)" 17 = {StackTraceElement@20558} "java.base/java.util.concurrent.FutureTask.run(FutureTask.java)" 18 = {StackTraceElement@20559} "java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)" 19 = {StackTraceElement@20560} "java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)" 20 = {StackTraceElement@20561} "java.base/java.lang.Thread.run(Thread.java:834)"