sramirez / spark-MDLP-discretization

Spark implementation of Fayyad's discretizer based on Minimum Description Length Principle (MDLP)
Apache License 2.0
44 stars 27 forks source link

Possible performance issue on 40k row dataset #22

Closed barrybecker4 closed 7 years ago

barrybecker4 commented 8 years ago

I've started doing some testing on larger datasets, and it seems to be taking more time to do the discretization - although I really do not know what to expect. I've added test cases for a 40,000 row dataset with several numeric columns to bin. I ran the tests on a quadcore laptop with 32G of memory and spark in standalone mode, but I saw similar results running on a 24 core server (which is what prompted me to add the tests). In the following results, 6 numeric columns are discretized at the same time (in batch).

srvRequest40000 data (label = churned, maxBins = 100, maxByPart = 10000) --- 98 seconds srvRequest40000 data (label = churned, maxBins = 10, maxByPart = 10000) --- 46 seconds srvRequest40000 data (label = churned, maxBins = 10, maxByPart = 1000000) --- 36 seconds

I noticed that performance improves when I reduce maxBins and increase maxByPart, but overall it seems to be taking too long. I will profile a little and see if I can find any bottlenecks. I would like to know what sort of performance numbers were attained for the 64 million row protein dataset using what hardware.

barrybecker4 commented 8 years ago

I got some interesting results after running more tests - discretizing the columns individually rather than in batch. At first it seemed that the algorithm had trouble with the UniqueKey column. Not surprisingly, it does not find any splits for this column (-Inf, Inf), but it takes a very long time to do it. When I binned 4 of the columns individually, these are the results I got:

UniqueKey = 76 seconds ClosedDate = 30 seconds X Coordinate = 13 seconds Longitude = 11 seconds

However, if I removed UniqueKey from the maxBins=100, maxByPart=10000 test, it still took 86 seconds.

Lastly, I changed the order of running the columns individually. Now, not only does "UniqueKey" take only 1 second to run instead of 76, but "X Coordinate" and "Longitude" produce different splits than they did when run with a different order!

ClosedDate = 31 seconds X Coordinate = 14 seconds Longitude = 12 seconds UniqueKey = 1 second

sramirez commented 8 years ago

The algorithm is optimized for obtaining all splits in a single step. Then, it isn't a good idea to run the algorithm on each column. The performance would degrade a lot using this scheme. I suppose you have used the individual scheme.

I'm pretty sure that the number of distinct points for every attribute is less than 10,000 points. If not, you can increase this value to 100,000. The idea is not to control the number of distinct points to avoid to run the procedure for big attributes.

Could you give more info. about the dataset and the execution?

sramirez commented 8 years ago

About the hardware that we used in the protein experiment, you can find below all the related description:

For all experiments, we have used a cluster composed of 20 computing nodes and 1 master node. The computing nodes hold the following characteristics: 2 processors × Intel Xeon CPU E5-2620, 6 cores per processor, 2.00 GHz, 15 MB cache, QDR InfiniBand Network (40 Gbps), 2 TB HDD, 64 GB RAM. Regarding software, we have used the following configuration: Hadoop 2.5.0-cdh5.3.1 from Cloudera’s open-source Apache Hadoop distribution, Apache Spark and MLlib 1.2.0, 480 cores (24 cores/node), 1040 RAM GB (52 GB/node).

barrybecker4 commented 8 years ago

I realize that the algorithm works best when run against all continuous columns at once. That is demonstrated by the results above, and how I use it in practice. However, several interesting things can be noticed when the columns are discretized individually.

  1. The resulting splits are sometimes different when runs are done individually compared with batch (even though the parameters are the same), and
  2. The order that the columns are run makes a big difference in performance. For example, when binning UniqueId before other columns it took 76 seconds, but only 1 second when binned last. This seems to indicate that there is persisted state carrying over from one run to the next.

I have not yet run on a real cluster, but it seems much slower than a C++ oblivious decision tree implementation on similar hardware. Probably the results will be much different on a cluster as the ODT implementation only runs on a single machine. The splits produced between the two algorithms are very similar.

Most of the numeric data is lat/long coordinates. It is checked in on the branch. Let me know if you need more info.

sramirez commented 8 years ago

Hi again,

In my opinion, the time problem is less relevant than the other problem, because sometimes spark introduces some delays caused by the memory eviction mechanism. A good idea would be to run 10 times the experiment and see what happens with time and accuracy. In my previous experiments, time usually has a large variance.

I've been thinking that maybe your execution is again affected by this non-deterministic behavior. Anyway, I'll also try this test on my own machine. Could you give us a small table with these results to compare?

I've been also reviewing the code and I've realized that:

barrybecker4 commented 8 years ago

When I print the initialCandidates in findSmallThresholds (when running on the 40k dataset) I noticed that NaN values can cause an issue because it thinks they are all unique. The candidates printed by println("cands = " + initialCandidates.collect().map(a => a._1 + " s=" + a._2._1 +" ["+ a._2._2.mkString(", ") +"]").mkString(";\n")) include the following. There probably should not be separate NaN entries. Still investigating.

: 3 s=258124.0 [0, 5, 0, 0, 0, 0]; 3 s=258214.5 [0, 0, 0, 29, 0, 0]; 3 s=258221.5 [0, 1, 0, 0, 0, 0]; 3 s=258459.5 [0, 0, 0, 39, 0, 0]; 3 s=258465.0 [0, 1, 0, 0, 0, 0]; 3 s=258504.5 [0, 0, 0, 13, 0, 0]; 3 s=258510.5 [0, 4, 0, 0, 0, 0]; 3 s=258815.0 [0, 0, 0, 41, 0, 0]; 3 s=258818.5 [0, 1, 0, 0, 0, 0]; 3 s=258978.0 [0, 0, 0, 16, 0, 0]; 3 s=258988.5 [0, 1, 0, 0, 0, 0]; 3 s=NaN [0, 0, 0, 1066, 0, 0]; 3 s=NaN [0, 1, 0, 0, 0, 0]; 3 s=NaN [0, 0, 1, 0, 0, 0]; 3 s=NaN [0, 0, 0, 0, 0, 2]; 3 s=NaN [0, 0, 1, 0, 0, 0]; 3 s=NaN [0, 0, 0, 0, 0, 1]; 3 s=NaN [0, 0, 0, 0, 1, 0]; 3 s=NaN [0, 0, 1, 0, 0, 0]; 3 s=NaN [0, 0, 0, 0, 0, 1]; :

barrybecker4 commented 8 years ago

I think I worked out what the performance issue is. The problem is with this line val nonZeros = featureValues.reduceByKey... When keys are like (0, Float.NaN) they are considered different values, but should not be. I ran some tests and found that reduceByKey works as expected if the keys are Floats with some Foat.NaN's present among the keys. In this case there is only one aggregate produced for the Float.NaN value. However, if the keys are tuples, then every occurrence of (0, Float.NaN) is treated as a unique value. I turned the value into a string and then back to a tuple after calling reduceByKey and the test that took 98 seconds to run, now only takes 26 seconds (3x speedup). Here are the new times for all three cases:

srvRequest40000 data (label = churned, maxBins = 100, maxByPart = 10000) -- 26 seconds (was 98) srvRequest40000 data (label = churned, maxBins = 10, maxByPart = 10000) -- 6 seconds (was 46) srvRequest40000 data (label = churned, maxBins = 10, maxByPart = 1000000) -- 2 seconds (was 36)

So if we increase maxByPart to 1000000, we can get 18x speedup in that last case. I will make a pull request once I clean things up a bit.

sramirez commented 7 years ago

It's a bit weird because Tuple's equal method should notice that all tuples (0, Float.NaN) are equals, and hence, all these tuples should be sent to the same reduce process. But I suppose that it is because the second value is a NaN, and it's undetermined. Anyway, I just merge the PR.

Could I close this PR?

barrybecker4 commented 7 years ago

Yes its strange. I don't think that the workaround I added should be needed if tuple comparison worked as we expected (when tuples contained NaN). Perhaps the behavior will change in a future version of scala - in which case the workaround could be removed. Closing now that it has been merged.