We had a massive issue in temporal memory leak when we tried use BulkExecutor in our ETL pipeline for data load. There were 4 VMs performing data ingestion by batches. Each batch contains 2000 - 5000 elements. Constantly one or two of VMs were falling down due to out of memory. Situation gets much worse when multiple Bulk- operations perform on same VM simultaneously.
Investigation
Analysing decompiled code for BulkInsert method, I've noticed a strange usage of ConcurrentBag collection. Simplified piece of code you can find below:
...
var documentsToImportByPartition =
new ConcurrentDictionary<string, ConcurrentBag<string>>();
var miniBatchesToImportByPartition =
new ConcurrentDictionary<string, List<List<string>>>();
foreach (string partitionKeyRangeId in this.partitionKeyRangeIds)
{
documentsToImportByPartition[partitionKeyRangeId] = new ConcurrentBag<string>();
miniBatchesToImportByPartition[partitionKeyRangeId] = new List<List<string>>();
}
...
Parallel.ForEach<string>(
GetNextPage<string>(sourceEnumerator, pageSize),
documentString => documentsToImportByPartition[partitionKeyId].Add(documentString));
...
As you might see, instance of ConcurrentBag in documentsToImportByPartition dictionary gets populated, but never consumed. But since ConcurrentBag uses ThreadLocals to keep separate list of items per thread, it means that the items will be garbage collected only when finalizer of ThreadLocal called. Considering the fact that bag creation and population was performed frequently, we ended up with huge number of 'rubbish' retained by ThreadLocal.
Reproduction
To show the problem the following code can be used:
[TestFixture]
public class ConcurrentBagTests
{
private class TestPayload
{
private int[] _data = new int[1024 * 10];
}
[Test]
public async Task ConcurrentBag_MultiThreadPopulation()
{
for (int i = 0; i < 100; i++)
{
await PopulateBag();
}
}
private async Task PopulateBag()
{
var bag = new ConcurrentBag<TestPayload>();
for (int i = 0; i < 1000; i++)
{
var routines = new List<Task>
{
Task.Run(() => bag.Add(new TestPayload())),
Task.Run(() => bag.Add(new TestPayload())),
Task.Run(() => bag.Add(new TestPayload())),
Task.Run(() => bag.Add(new TestPayload())),
Task.Run(() => bag.Add(new TestPayload())),
};
await Task.WhenAll(routines);
}
}
}
Here is a memory diagram, which showing 1,2GB of memory usage at the peak:
If we rewrite previous test by using a simple List object:
private async Task PopulateList()
{
var bag = new List<TestPayload>();
for (int i = 0; i < 1000; i++)
{
var routines = new List<Task>
{
Task.Run(() => bag.Add(new TestPayload())),
Task.Run(() => bag.Add(new TestPayload())),
Task.Run(() => bag.Add(new TestPayload())),
Task.Run(() => bag.Add(new TestPayload())),
Task.Run(() => bag.Add(new TestPayload()))
};
await Task.WhenAll(routines);
}
}
Memory diagram looks like depicted below (only 485 MB allocated at the peak):
Possible fixes
There are multiple options:
just clean-up allocated ConcurrentBag objects after bulk operation finished, so it should reduce lifetime of items and items can be collected by GC;
use different collection, for instance ConcurrentQueue.
Looking forward for the issue to be addressed. I'm happy to assist for further investigation and fix.
Context
Package version:
2.3.0-preview2
We had a massive issue in temporal memory leak when we tried use
BulkExecutor
in our ETL pipeline for data load. There were 4 VMs performing data ingestion by batches. Each batch contains 2000 - 5000 elements. Constantly one or two of VMs were falling down due to out of memory. Situation gets much worse when multiple Bulk- operations perform on same VM simultaneously.Investigation
Analysing decompiled code for
BulkInsert
method, I've noticed a strange usage ofConcurrentBag
collection. Simplified piece of code you can find below:As you might see, instance of
ConcurrentBag
indocumentsToImportByPartition
dictionary gets populated, but never consumed. But sinceConcurrentBag
usesThreadLocal
s to keep separate list of items per thread, it means that the items will be garbage collected only when finalizer ofThreadLocal
called. Considering the fact that bag creation and population was performed frequently, we ended up with huge number of 'rubbish' retained byThreadLocal
.Reproduction
To show the problem the following code can be used:
Here is a memory diagram, which showing 1,2GB of memory usage at the peak:
If we rewrite previous test by using a simple List object:
Memory diagram looks like depicted below (only 485 MB allocated at the peak):
Possible fixes
There are multiple options:
ConcurrentBag
objects after bulk operation finished, so it should reduce lifetime of items and items can be collected by GC;ConcurrentQueue
.Looking forward for the issue to be addressed. I'm happy to assist for further investigation and fix.