Open Rajat0705 opened 1 week ago
Please send me 10 rows of obfuscated data, so I can see what the input looks like.
The above data looks like synthetic data and does not look like the client data? My guess is you are holding many references to the return value of ProfileAccumulator.merge() but it is hard to see what is going on without, either some data that approximates what is being parsed, some indicating of the objects in the heap, or the additional code that is calling ProfileAccumulator.merge(). You should only ever have as many TextAnalyzers as you have columns - in your case ~100. Each TextAnalyzer is allowed to have a maximum of 400 Streams and you have way to many objects in your stack as though you were keeping them around after the return from merge().
@tsegall this is actual data that we are working on. We currently have only one TextAnalyzer class which we use for 100 columns how can we create different TextAnalyzer per column? You mean a new instance of this class per column? We are using below code to initialize textAnalyzer
fieldnames.forEach(f -> {
AnalyzerContext context = new AnalyzerContext(f, DateTimeParser.DateResolutionMode.Auto, "profile-" + f, fieldnames.toArray(new String[fieldnames.size()]));
TextAnalyzer textAnalyzer = new TextAnalyzer(context);
accumulator.getAnalyzers().put(f, new TextAnalyzerWrapper(textAnalyzer));
});
We have written a custom aggregator Interface in spark that calls the merge() in accumulator as shown below -
public interface CustomAggregator<T extends Serializable> extends Serializable
{
T initialize();
void process(T accumulator, Map<String, Serializable> row);
T merge(T accumulator1, T accumulator2);
Map<String, Serializable> complete(T accumulator);
}
Then we use this in stream.aggregate() as shown -
for (int x = 0; x < fieldnames.size(); x++) --> 100 columns
{
cols.add(
CustomAggregation.builder()
.fieldname(fieldnames.get(x))
.outputFieldname(outputFieldnames.get(x))
.aggregator((CustomAggregator)new ProfileAggregator(fieldnames,profileFtaProperties))
.build());
}
stream.aggregate(cols);
@tsegall can you please confirm about above query?
Please rerun with 15.10.0 (just released). I have improved the performance of TextAnalyzer.merge(). In my test case, which you can see in TestStrings.issue119() the overall memory usage has decreased by about 6x, performance has also improved by about 4x. I am still not convinced this is going to help - since none of my test cases showed any memory leakage. My best guess is you are some how keeping references around.
Thanks! we will test this out, just curious to know why you feel we are keeping references around which references exactly? @tsegall need your input
@tsegall update : We ran with latest version of fta on same dataset and we are seeing great memory improvements as shown below [ ~42gb to ~0.6gb ] But the execution time still hasn't improved!
I would not have expected the execution time to have improved in your case, as I assume the FTA overhead is minimal relative to all the other code you are executing. I have another small change that improves the performance further (25%) for merge intensive use cases (e.g. like the TestStrings.issue119() test case), which I will add shortly.
So I assume we can close this issue?
We are using fta version of 15.7.9.
While processing the job, we are training the data for each column of textAnalyzer. Now, we can see that with 1 million of data for 100 columns table, the train method of textAnalyzer is consuming around 40+Gb of memory (~85% of total) causing us the OutOfMemoryException in spark environment. Can someone please help on this regarding any way to optimize the memory consumption or any alternate of train method?
PFA snapshot of memory consumption
Attaching stack trace for reference. spark-executor.log
Also attaching the class where train method is defined. Profile Aggregator.txt (Line: 50)