Closed marecabo closed 6 months ago
Why can't we merge this PR ❓
Why can't we merge this PR ❓
@steffenaxer I ran a version of Gelato from this branch against our Paris East model soon after the PR was created, and it failed. I will document the exact failure here when I try again.
I then ran this branch against a different, larger existing model, and it failed there, too, albeit for a different reason.
I am very keen to merge this branch, but before we do that, I need to understand fully why it failed with these two models and what can be done to fix those errors.
But don't worry, I haven't forgotten about it 👍
When I run this version of Gelato against my (admittedly old) Paris East outputs:
2024-04-29T12:55:34,432 INFO TablesawKpiCalculator:907 Adding costs to legs table. Legs table has 2673030 rows, personModeScores table has 3751146 rows, moneyLog has 0 entries
2024-04-29T12:55:50,746 INFO MemoryObserver:42 used RAM: 7028 MB free: 3931 MB total: 10960 MB
2024-04-29T12:55:55,832 INFO TablesawKpiCalculator:933 Adding contribution from person money events
2024-04-29T12:55:56,518 INFO TablesawKpiCalculator:947 Iterating over the money log
java.util.ConcurrentModificationException: java.util.ConcurrentModificationException
at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:77)
at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:499)
at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:480)
at java.base/java.util.concurrent.ForkJoinTask.getThrowableException(ForkJoinTask.java:562)
at java.base/java.util.concurrent.ForkJoinTask.reportException(ForkJoinTask.java:591)
at java.base/java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:689)
at java.base/java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:159)
at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:173)
at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
at java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:596)
at java.base/java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:765)
at com.arup.cml.abm.kpi.tablesaw.TablesawKpiCalculator.addCostToLegs(TablesawKpiCalculator.java:953)
at com.arup.cml.abm.kpi.tablesaw.TablesawKpiCalculator.readLegs(TablesawKpiCalculator.java:119)
at com.arup.cml.abm.kpi.tablesaw.TablesawKpiCalculator.<init>(TablesawKpiCalculator.java:75)
at com.arup.cml.abm.kpi.matsim.run.MatsimKpiGenerator.run(MatsimKpiGenerator.java:90)
at picocli.CommandLine.executeUserObject(CommandLine.java:2026)
at picocli.CommandLine.access$1500(CommandLine.java:148)
at picocli.CommandLine$RunLast.executeUserObjectOfLastSubcommandWithSameParent(CommandLine.java:2461)
at picocli.CommandLine$RunLast.handle(CommandLine.java:2453)
at picocli.CommandLine$RunLast.handle(CommandLine.java:2415)
at picocli.CommandLine$AbstractParseResultHandler.execute(CommandLine.java:2273)
at picocli.CommandLine$RunLast.execute(CommandLine.java:2417)
at picocli.CommandLine.execute(CommandLine.java:2170)
at com.arup.cml.abm.kpi.matsim.run.MatsimKpiGenerator.main(MatsimKpiGenerator.java:44)
Caused by: java.util.ConcurrentModificationException
at java.base/java.util.HashMap.computeIfAbsent(HashMap.java:1221)
at com.arup.cml.abm.kpi.data.MoneyLog.getPersonLog(MoneyLog.java:29)
at com.arup.cml.abm.kpi.data.MoneyLog.getMoneyLogData(MoneyLog.java:19)
at com.arup.cml.abm.kpi.tablesaw.TablesawKpiCalculator.lambda$addCostToLegs$9(TablesawKpiCalculator.java:958)
at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
at java.base/java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:992)
at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
at java.base/java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:290)
at java.base/java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:754)
at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182)
at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655)
at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622)
at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)
Under the covers, parallel streaming over the legs table via legs.stream().parallel().forEach(row -> {
launches multiple threads, each of which is accessing the MoneyLog
. The MoneyLog
now contains this method, which is called from inside the legs iteration loop:
public double getMoneyLogData(String personID, double departureTime, double arrivalTime) {
return getPersonLog(personID).entrySet().stream()
.filter(e -> e.getKey() > departureTime && e.getKey() <= arrivalTime).mapToDouble(Entry::getValue).sum();
}
The getPersonLog
method called from getMoneyLogData
looks like this:
private Map<Double, Double> getPersonLog(String personID) {
return moneyLogData.computeIfAbsent(personID, k -> new HashMap<>());
}
So, we have multiple threads calling getMoneyLogData
concurrently. getMoneyLogData
now streams (iterates) over the entries of the underlying hash map in order to filter them, but also calls the getPersonLog
method, which potentially modifies the same hash map. Given enough leg rows, at some point, threads will intertwine such that the hashmap is being iterated over in one thread whilst an attempt is made to modify the hashmap in a different thread. This is not thread-safe, so HashMap
detects and forbids it, hence the java.util.ConcurrentModificationException
.
When I change this line:
legs.stream().parallel().forEach(row -> {
To move the streaming from multiple threads running in parallel to streaming in serial in a single thread:
legs.stream().forEach(row -> {
A couple of things happen:
1 - The concurrent modification error disappears
2 - The MatsimKpiGeneratorIntegrationTest.testAppWithDrt
test fails when asserting the value of one of the KPIs
Expected :3042939934L
Actual :2105941526L
This is quite surprising. The answer should be the same whether we're iterating over the legs in the old way, the new way in parallel, or the new way in series. I haven't investigated this at all.
3 - (Possibly) The performance probably slows from the parallel version (but is still faster than the version without the other changes in this branch). I don't have timings for all three scenarios because I cannot avoid the concurrent modification exception for the parallel streaming version with the model I'm using, but for main
, Gelato processed the model outputs in 25m 21s
, and for this branch but with serial streaming over the legs table, the same model took 21m 26s
. A decent improvement, but likely not as a dramatic as we would see if we were streaming over the legs table in parallel.
return moneyLogData.computeIfAbsent(personID, k -> new HashMap<>());
), making the likelihood of concurrent modification much higher.Keeping the parallel streaming, but changing MoneyLog
's underlying HashMap
for a ConcurrentHashMap
, or maybe a synchronized map via Collections.synchronizedMap
might fix the concurrency issues without resorting to serial streaming. However, using Collections.synchronizedMap
in particular might introduce so much locking overhead that all of the benefits of parallelism disappear and we are effectively running in serial.
@mfitz Thank you for your investigation and great comment. I think, ~I~ we did two things at once here, which should be in two PRs:
Both itself should reproduce the same result on their own.
I would suggest to remove the parallel()
first and investigate, why the results differ. Then we can consider parallelizing it in a later step. Could you share your output folder, where this different results occur?
@steffenaxer I reverted your parallel() commit @mfitz Could you check again, whether you get the expected results?
Could you share your output folder, where this different results occur?
Are you asking about the different results from the DRT integration test? You can see that by changing legs.stream().parallel().forEach(row -> {
to legs.stream().forEach(row -> {
and running MatsimKpiGeneratorIntegrationTest
. That fails consistently on my machine. If you set a breakpoint in testAppWithDrt
just before the assertions, you will be able to inspect the KPI files in the local temp directory created by the test.
I didn't actually compare the results of my Paris East model using main
versus this-branch-but-without-parallel-streaming
to check for different KPI values (I only compared running times), but I will do that at some point. I think there's a reasonable chance that some of the KPI values differ. Do you want the model data for that? It's 61 million events, around 1.5 gigs for all of the input files, and everything except the MATSim config file is compressed.
You can see that by changing legs.stream().parallel().forEach(row -> { to legs.stream().forEach(row -> { and running MatsimKpiGeneratorIntegrationTest.
Perfect, then we can use that test to check it. I'll update this branch with the latest master before, though.
@mfitz I merged the latest main and all my tests work locally. However, I cannot rerun the checks. Can you eventually?
@mfitz I merged the latest main and all my tests work locally. However, I cannot rerun the checks. Can you eventually?
We have just hit a monthly Arup-wide limit of paid GitHub Actions minutes. It will reset tomorrow, so I doubt our admins will add more credits today, but it means we can't run any CI builds in GitHub until tomorrow 😞
I'll grab the latest changes and do some local investigation, though.
Hey there :) Steffen pointed out some code to me and I looked into it performance-wise.
Accessing a Tablesaw column by String name is rather expensive, we learned. This is because they do a string comparison
equalsIgnoreCase
to identify the desired column.The critical path is:
legs.doubleColumn("monetaryCostOfTravel")
Relation.doubleColumn(String columnName)
Relation.columnIndex(String columnName)
String.equalsIgnoreCase(String anotherString)
@steffenaxer In this case, I moved getting the columns out of the loop, it should help a bit for this part.
I also changed some type definitions from implementation (
HashMap
) to interface (Map
) and reduced queries for map entries a bit inMoneyLog
, which should also help performance slightly.