Closed barrybecker4 closed 7 years ago
First of all, nice work Barry! Thanks again.
Now I understand why range partitioner causes non-determinism when generating boundary points. If this partitioner was determinist, I would choose the original solution despite it includes some non-boundary points. Your solution is far more exact and accurate than mine. The problem here is that your new partitioner conveys a new shuffling phase and much more information need to be sent across the network (as you include a long index for each distinct value). It can deteriorate the performance of MDLP when facing big datasets. Have you tried some tests with big datasets?
About the coalesce call, it is performed to allow the maximum parallelism when processing "big" features with too much boundary points. Why different partitioning affects entropy computations? The only requirement in this function is that the elements are ordered.
I have not done much performance testing with big data, but plan to soon. I don't have baseline numbers from before these changes, but will have some numbers with these changes soon. It's too bad about the additional shuffling, but I don't know of an alternative. I don't really know why the coalese call was influencing entropy calculations. Just for fun I tried putting it back, and I immediately see test failures like this:
org.scalatest.exceptions.TestFailedException: Expected "...98.5, 999451.5, 1000[563.5, 1001776.0, 1002918.5, 1003944.5, 1004384.0, 1004505.5, 1007262.5, 1008990.0, 1010339.5, 1013073.0, 1013876.5, 1018213.0, 1018448.5, 1020292.0, 1021113.0, 1021618.5, 1022342.5, 1023333.5, 1026862.5, 1032483.5, 1035048.5, 1042767.0, 1044203.5, Infinity;-Infinity, 147927.5, 150303.0, 155430.0, 156562.5, 158145.5, 161622.0, 172141.0, 175366.5, 179003.5, 186795.0, 190705.5, 195083.0, 196383].5, 199703.5, 202225...", but got "...98.5, 999451.5, 1000[449.5, 1001776.0, 1002918.5, 1003944.5, 1004394.0, 1004527.0, 1007262.5, 1008990.0, 1010339.5, 1013073.0, 1013876.5, 1018213.0, 1018448.5, 1020292.0, 1021090.0, 1021495.5, 1022251.5, 1023193.0, 1026862.5, 1032956.0, 1035120.0, 1042767.0, 1044203.5, Infinity;-Infinity, 147927.5, 150303.0, 155430.0, 156562.5, 158145.5, 161622.0, 172141.0, 175366.5, 179003.5, 186795.0, 190705.5, 194703.0, 195749.5, 196399].5, 199703.5, 202225..."
for the Run MDLPD on all columns in srvRequest40000 data (label = churned, maxBins = 100, maxByPart = 10000) test
Ok, we wait for the performance results. I'll try to figure out what is happening with coalesce. I still think that a non-determinism MDLP could fit as an approximate big data solution. Original design will be much faster than your solution for sure, and may be more attractive to practitioners because of this reason . We can manage to have two versions of DMDLP, but maybe it could be problematic.
Perhaps we can have a "approximate" option that allows it to run with the faster but less accurate aproach. Spark does that for other things.
I created some unit tests in a branch for a 100k and 10m row dataset. The 10m one is probably to big to add to github since its about 1G on disk. When I was doing some profiling, I noticed that we can get some performance improvement from using !isEmpty instead of calling count in a couple of places.
Here are some results on my 4 core laptop 10m rows, 4 continuous columns discretized together.
maxBins 50 maxByPart 50000 default par defualt - 25 min maxBins 50 maxByPart 5000 default par defualt - 53 min
maxBins 50 maxByPart 100000 default par 4 - 19 min maxBins 50 maxByPart 10000 default par 4 - 39 min
after replacing coutn with !isEmpty (not quite as much speedup as I had hoped for)
maxBins 50 maxByPart 100000 default par 4 - 17 min maxBins 50 maxByPart 10000 default par 4 - 35 min
100k rows
maxBins = 50 maxByPart = 1000 default parallelism=4 57s maxBins = 50 maxByPart = 10000 default parallelism=4 32s (29 after using !isEmpty instead of count) maxBins = 50 maxByPart = 20000 default parallelism=4 10s maxBins = 50 maxByPart = 100000 default parallelism=4 10s
Did you find a way to reintroduce coalesce into ManyValuesThresholdFinder without bringing back the non-determinism?
When running MDLP on the 1G serrverX dataset on machine with 24 cores. Below is a summary of the timings. Since maxByPart here is 10,000, and it should really be 100,000 for better performance, these numbers are only useful for understanding where bottlenecks might be.
6s for collect at MDLPDiscretizer: 47 1s for isEmpty at MDLPDiscretizer: 76 33s for sortByKey at MDLPDiscretizer: 153 (160/160 tasks) This can take a very long time and even run out of memory if the target has a huge disctinct values. I think this is a spark bug see https://issues.apache.org/jira/browse/SPARK-18181 Here it has 5. 9s for countByValue at InitialThreshodsFinder: 118 2s for zipWithIndex at InitialThreshodsFinder: 61 13s for countByKey at MDLPDiscretizer: 181 There are a whole lot of jobs for count, min, and runJob in ManyValuesThreshold finder Most of these entries have 1/1 (5 skipped) in the stages column and 1045/1045 (1285 skipped) in the Tasks (for all stages) column All of the count jobs seem to take 1, 2 or 3 seconds - and there are a lot of them. The total is 120 seconds for count at ManyValuesThreshold: 54 and 125 - This is a significant bottleneck and can be sped up with using !isEmpty instead of count. All of the min jobs take 2 or 3 seconds. Total is 28 seconds for min at ManyValuesThreshold: 125 All of the runJob jobs take 1 or 2 seconds. The total is 51 seconds for runJob at ManyValuesThreshold: 83 1s for collect at MDLPDiscretizer.scala:191
I can not reproduce the coalesce problem yet in my ScalaIDE. I've tried several times but the result is always the same. The coalesce generates the same partitions. I think coalesce can generate different partitions as the parent partitions for cands is RangePartitioner. However I cannot understand why it affects the final results. Does it output a different threshold in each trial or it just generate different partitions? Could you add some prints near coalesce in order to know where is the problem
Yes, I remember that I add some isEmpty to avoid these empty and useless counts.
I'm surprised that you cannot reproduce the coalesce problem by just running the MDLPDiscretizerBigSuite. I get different throeshold results depending on whether or not I do the coalesce. I don't know why the thresholds in the results are different. There is a small chance it might be related to https://issues.apache.org/jira/browse/SPARK-14393. The other odd thing I noticed, is that using the coalesce actually makes things run slower (in the cases I looked at). In one case, it went from 29 seconds to 52 seconds. I don't fully understand it, but it seems odd to be doing all those coalesces inside the loop. I added a print statement in the loop and see many lines like this: coalescing to 2 from 27 nCands = 10549 coalescing to 1 from 27 nCands = 124 coalescing to 1 from 27 nCands = 2932 : Why not just keep the data on the original partitions? Did you do any tests to see if the coalescing gives a performance advantage? My esperiments so far indicates that it does not.
I did some rough profiling of Spark-MDLP performance using a 9.8 million row dataset running on my laptop. The dataset is too large to checkin, but I put a version at https://app.box.com/s/mrepegkdjv7im0iq1f97slp8c88h6rwj and will add a commented out test for it. It takes about 3.9 minutes to bin 4 continuous columns if the target has 5 values. But if I bin only 1 column with that label, it takes 2.3 minutes. It takes only 2 minutes to to bin 4 continuous columns with a different label which has only 3 values. I think this was faster because there was more correlation with the other columns. Also this case did not print nearly managed memory leak lines.
Below is a rough profiling of what is happening during the test, and how much time is spent in each part.
reading and cleaning - 8 seconds
fit. about to call train
get the labeled input data (1 seconds)
its cached, now calling train (training is running MDLP on all continuous cols)
in MDLPDiscretizer.runAll
num labels = 5 (10 seconds )
labels = 0.0 -> 0, 1.0 -> 1, 2.0 -> 2, 3.0 -> 4, 4.0 -> 3
Compute classDistrib = (5 seconds)
classDistrib = 0 -> 1953125, 1 -> 1953125, 2 -> 1953125, 3 -> 1953125, 4 -> 1953125
now sorting distinct values. partitions = 15 {
start sortByKey. numpartitions = 30
16/11/16 10:40:18 ERROR Executor: Managed memory leak detected; size = 78727902 bytes, TID = 152
This looks like a spark memory leak bug. See https://issues.apache.org/jira/browse/SPARK-18181
This shows the distribution of label values for each split.
((3,64.65005),5, 0, 0)
((2,351.14984),9, 0, 0)
((3,55.925964),0, 1, 0)
((3,88.87153),1, 0, 0)
((3,51.176464),0, 1, 0)
((3,81.87806),2, 0, 0)
((3,60.07228),0, 2, 0)
((2,325.05286),0, 8, 0)
((3,62.296955),0, 1, 0)
((3,87.25891),1, 0, 0) .......
repartition. numparts = 30
done sortByKey (~50 seconds for just for sortByKey)
16/11/16 10:40:38 ERROR Executor: Managed memory leak detected; size = 38036528 bytes, TID = 213
} done sorting distinct values in (63 seconds - expensive)
about to find initial thresholds
about to create featureInfo list
featureInfo = (0,5,0,5,1,0), (1,5,5,5,1,1), (2,1632699,10,96042,17,2), (3,5249987,1632709,99057,53,19)
totalParts = 72
points parts = 30
partitionPts parts = 72 count = 6,882,696
done computing initial thresholds (26 seconds)
about to find all thresholds
the first value of part 0 is 30.0
the first value of part 3 is 320.82028
:
To summarize. Finding the unique values and sorting can take more than half the time. Finding the initial thresholds cold take up to about 1/4 of the time. Finally, finding the thresholds using MDLP can take anywhere from 5% to 60% depending on a lot of data dependent factors.
Coalesce operation is done to homogenize the number of points in each partition. In the worst case the previous filter may only select points in a single partition. Coalesce is needed to take advantage of the maximum level of parallelism in clusters. I think what you show here is that the time spent on coalescing is much longer than the time gained using the maximum level of parallelism vs. the previous partitioning scheme. For a small number of elements (27 ncands), coalescing is heavier than maintaining the original scheme, however, I remember that the threshold for "big" features was set to higher value (10,000?).
About the determinism, maybe we can add up a new parameter that allows us to switch between versions: the original one and your version (non-deterministic). Exact version can be useful if we want accurate results, whereas my version can be used when the performance acquires a higher relevance. What do you think? This solution can make the merge possible.
Yes, I think a new param to allow switching would be good, but I don't know when I will have a chance to add it. The non-deterministic version is very hard to have test cases for.
Yes, it's true. But I don't want to loose the non-deterministic version because it's much faster I think. Maybe we can offer the non-deterministic version without test cases, or to consider good a test if there only exist a small differences between the output and the deterministic results. In other words, just to relax the conditions to pass a test.
What do you think?
Barry, Spark release 2.1.0 is out. Coudl you try your testings in order to know if our problem has been fixed or not, please? As you know, I'm not been able to reproduce the problem in my computer.
Here you can see the notes:
I am running on spark 2.1.0 right now using the mdlp/soark 2.x branch that I created. Yes, it works and is deterministic. It was not any changes in 2.x that fixed the non-determinism problem though.
Do you mean that you try out non-deterministic version and it isn't working?
I have not tried the non-deterministic version on 2.1.
I made three changes to fix non-deterministic behavior. 1) When finding the midpoint, check if one of the values is null. If so, take the other as the midpoint. This avoids having more than one NaN in the split list. 2) Partition by featureIdx when finding initialThresholds. This avoids the need to manage frequencies across boundaries. 3) In ManyValuesFinder removed the call to coalesce. Changing the partitioning within the while loop was causing different splits to be generated. If it is necessary for some reason, an alternative will need to be found.
I ran the tests a couple of dozen times both from within the IDE and on the cmd line on two different computers. They seem to always return consistent results now.