sramirez / spark-MDLP-discretization

Spark implementation of Fayyad's discretizer based on Minimum Description Length Principle (MDLP)
Apache License 2.0
44 stars 27 forks source link

few values threshold method does not seem to be deterministic #14

Open barrybecker4 opened 8 years ago

barrybecker4 commented 8 years ago

This is a strange one, and not that easy to reproduce. If I run the tests a bunch of times, I have noticed that on occasion, I get an extra split for the fare attribute in the "Run MDLPD on all columns in titanic data (label = embarked)" test.

Sometimes the splits are

-Infinity, 7.1833496, 7.2396, 7.6875, 7.7625, 13.20835, 15.3729, 15.74585, 56.7125, Infinity

but usually they are

-Infinity, 6.6229, 7.1833496, 7.2396, 7.6875, 7.7625, 13.20835, 15.3729, 15.74585, 56.7125, Infinity

I can't think why this might be unless something is non-deterministic or maintaining state between runs. I just ran a bunch of times and always get the second result, but know I have sometimes seen the first.

barrybecker4 commented 8 years ago

I've seen this a few times now. It's frustrating because its hard to reproduce and makes that one unit tests occasionally fail. I wonder if this is a concurrency issue - where the results could be different based on the order of processing some of the data.

sramirez commented 8 years ago

Is it now fixed in the referenced PR or not?

barrybecker4 commented 8 years ago

Unfortunately, the issue is still there (I think). I see it only occasionally on one or two of the unit tests. If you have any thoughts about how it might occur, please let mew know.

This may not be the right forum, but I have a couple of other general questions:

sramirez commented 8 years ago

Hi again,

It's certainly weird, it's supposed to be fully deterministic. I'm going to check the code to look for a possible problem with concurrency. About the other topics:

  1. The client is supposed to persist the data as he/she wants to. Most of methods have reviewed in MLlib use this message to warn about data persistence. I think it's good to follow this solution. Therefore, it'd be nice to include a .cache() call in the unit tests because the algorithm is designed to be iterative in some parts.
  2. Yes, it's clearly influenced by the fact that the data is not cached, so it's re-loaded in Spark every time that a RDD variable is called twice or more times.
  3. I normally use broadcast when the variable to be shared is more complex than a simple type like integer, string, etc. Mainly, these variables are data structures like arrays or vectors. This is done so because if not the variable would be replicated by Spark in each function instance, which implies a lot of data replication. In counterpart, broadcast replication only sends a copy to each machine. I think this "philosophy" has been assumed by all Spark developers, so maybe is redundant to show this information in the code.

Thanks for your help again!

sramirez commented 8 years ago

About the determinism:

I got the following results from one Eclipse execution of the unit tests:

Tests ok: 1-5, 9.

Failing tests:

Test 6: org.scalatest.exceptions.TestFailedException: Expected "...Infinity;-Infinity, [6.6229, 6.9624996, ]7.1833496, 7.2396, 7...", but got "...Infinity;-Infinity, []7.1833496, 7.2396, 7..."

the rest:

org.scalatest.exceptions.TestFailedException: Expected "...ty;-Infinity, 1.4435[9817]E12, Infinity", but got "...ty;-Infinity, 1.4435[658]E12, Infinity"

The last one seems a problem about precision. Test 6 always outputs the same result in my environment. So maybe there is a problem with yours. Have you checked the correct solutions for 6 using a sequential implementation for MDLP?

barrybecker4 commented 8 years ago

I will look into 6 some more. I think that is the one I have seen fail intermittently as well. Sometimes I see it give the same result many times in a row - more than if it were just random, then for no reason it will start giving a result with an extra split.

barrybecker4 commented 8 years ago

I just ran the tests 7 times in a row. They all passed except for the second time when I got

[info] - Run MDLPD on all columns in titanic2 data (label = embarked) * FAILED * [info] Expected "...Infinity;-Infinity, []7.175, 7.2396, 7.687...", but got "...Infinity;-Infinity, [6.6229, ]7.175, 7.2396, 7.687..." (MDLPDiscretizerSuite.scala:205)

Note that it gave an extra split on that particular run. I am running on Windows, but it should not matter.

sramirez commented 8 years ago

After running all the tests 10 times each one, I get the following result in test 6:

"...Infinity;-Infinity, []7.1833496, 7.2396, 7..."

So, maybe there is a problem related to your environment. Could you check it in another SO and/or machine, please?

I'm using Debian 8.4 and Scala IDE 4.1.0 (I directly run the JUnit tests from here).

barrybecker4 commented 8 years ago

I just ran the tests 10 times in a row after pulling and building in my Ubuntu linux VM and there were no failures at all. I have also had streaks like this on Windows, but then occasionally it will start streaking the other way (with failures). It may also have to do with what else is running on the machine at the time that I run the tests. Its definitely a strange issue.

I run the tests from the command line (using sbt assembly) and from within Intellij (using scala plugin) on windows and linux (4 different configurations total).

barrybecker4 commented 8 years ago

I think this issue still persists - though it is hard to reproduce. I don't know where the source of the non-determinism is, but I think it is premature to close this issue without more analysis. I've also noticed that there can be different splits produced when an attribute is binned by itself as opposed to when it is processed with a collection of numeric features.

barrybecker4 commented 8 years ago

I really think that this issue should be reopened. Sometimes inserting print statements in the code can cause it to run at different rates and give different splits.

sramirez commented 7 years ago

Ok, we need a third-person to give more insights about this topic, because I haven't been able to reproduce this problem yet in my experiments.

barrybecker4 commented 7 years ago

I will need to take a closer look at this soon. I just got a new machine and tried running the MDLP unit tests and found that one of them was failing because of this. Just yesterday, one of our automated tests started failing (I think because of this). It seems that there are some sources for on-determinism in spark for some typical operations. See http://stackoverflow.com/questions/34188834/sources-of-non-determinism-of-apache-spark

barrybecker4 commented 7 years ago

I haven't solved this yet, but I think I have narrowed it down. I made some local changes so that findSmallThresholds was called for all features. I noticed that I often got different splits when running the tests. In particular, I seemed to get different splits when running the tests through Intellij versus running with sbt on the command line. Then I made local changes so that findBigThresholds was called for all features. Then the unit tests always produced consistent results. My conclusion is that the source of the non-determinism is most likely in the FewValuesThresholdFinder. My next step will be to run unit tests on just FewValueThresholdFinder in isolation to see if I can reproduce the non-dterminism. This should be easy not that that logic is in a separate class.

barrybecker4 commented 7 years ago

I added some unit tests for the Few and ManyValuesThresholdFinders in my fork. Though I am seeing intermittent failures in the other tests, the tests I added for the Threshold finders are producing consistent results.

barrybecker4 commented 7 years ago

I was reading about an interesting case in scala where parallel collections can lead to non-deterministic results if a variable declared outside a closure is accumulated in the closure.

scala> list.foreach(sum += _); sum
res01: Int = 467766
scala> var sum = 0
sum: Int = 0
scala> list.foreach(sum += _); sum
res02: Int = 457073 

Could it be that something like this is happening in the evalThresholds method? I'm not sure. I didn't think a parallel collection was being used, but I don't think I fully understand what calculations are happening in different threads yet. I'll keep looking.

sramirez commented 7 years ago

I haven't used parallel collections in this project. So, I think we can discard this hypothesis at least scala and/or spark uses parallel collection internally.

About Spark's non-determinism, if the problem comes from FewValuesThresholdsFinders, the problem has to be caused by scala, because there is no Spark code in this class.

barrybecker4 commented 7 years ago

Another possibility might be in the initialThresholds method, but currently don't have any theories as to the source. I see the intermittent failures fairly often, but it is not reliably reproducible unfortunately. I tried things like adding some sleep calls to change the timing, but it has not helped to make it more reproducible.

barrybecker4 commented 7 years ago

I found one problem. Unfortunately, it does not seem to be the only one. I think there may be two issues with the initialThresholds method. The one I identified today again has to do with the presence of nulls. I started printing out the thresholds found and saw that there were some cases were there were some sequences like this

... (0,72.5), (0,NaN), (0,NaN), (1,2.00625), (1,4.50625), ....

There should not be thresholds with duplicates. If there are duplicates, the sort order will not necessarily be unique. The reason that there are duplicates is that whenever we try to find the midpoint between NaN and some other value, the result is NaN. The solution, I found, was to always take the non-null value as the midpoint if one of the two was NaN. Things seemed to work better after this change, but then I still noticed some occasional inconsistency. This change required some unit tests updates.

Next, I printed out the initial candidate thresholds found for each attribute. They were definitely different between runs. Sometimes there are extra thresholds in the list. I have a couple of thoughts.
1) The code block in mapPartitionsWithIndex works concurrently on the partitions, so we cannot rely on the partitions being executed in order. I thought maybe we were relying on the order, but it does not seem to be the case. I get 4 partitions when running, but it depends on spark configuration. 2) Each time you run the partitioning can be different for the same dataset. Could it be that the firstElements found outside of initialThresholds are determined using a different partitioning scheme than within. This doesn't seem to be the case, but I'm not sure yet. Any other thoughts?

Separate question: Why do we need to braodcast variables that are small, like "arr" which is just an array of boolean whose size is the number of features? I thought broadcasting would only be needed for variables that took a lot of memory.

barrybecker4 commented 7 years ago

I think I know what is happening and its not 1) or 2) above. For a given run, the partitions stay constant, but since the partitions may be different between runs, there can sometimes be extra candidate splits that should not be there. The problem is that the splits do not seem to be determined correctly across a partition. If the label frequencies do not vary, unique points are skipped until they do, and a split is created. However, across a partition, a candidate is always added, whether it should be or not. In the for loop of initialThresholds, there is a call to isBoundary used to tell if a candidate should be added. This call is not made for the lastPoint, but I think it should be. Unfortunately, that may be difficult because we do not know the freqs for bcFirsts.value(index + 1). Probably the label frequency information needs to be part of bcFirsts. Sound right?

barrybecker4 commented 7 years ago

Sergio, can you reproduce the problem by using setMaster("local[1]") in TestHelper instead of setMaster("local[4]")? When I switch to local[1] in splits the RDD into 2 partitions instead of 4, and I get different results when running the tests. In theory, the results should not change when the number of threads is changed.

barrybecker4 commented 7 years ago

I included the frequencies with the bcFirsts, and added the isBoundary check for the lastPoint as I describe above, but its not sufficient. I still get inconsistencies because the frequencies need to be accumulated across the partition boundary in the case when its not a new feaure, and isBoundary is false. I don't know how to pass accumFreqs to the next partition since all the partitions are being processed concurrently - so I am stuck again. Any thoughts?

barrybecker4 commented 7 years ago

There is one potential issue with my pull request that I just thought of. The partitioner I use when finding the initialThresholds creates partions for each feature. However, if the initialThresholds for a singe feature are very large, they may not fit into memory on a node. It would likely be better to create a more sophisticated partitioner that will at least create partitions for each feature, but will further split large feature partitions as needed. There would be two difficulties to implementing this correctly: 1) it needs to split those feature partitions consistently 2) it should preserve the accumFreqs across splits. Probably 1) is good enough, and 2) (which may not be possible with RDDs) would be nice to have. One possible way to implement this custom partitioner would be to determine the max and min values for a feature, then partition the sorted values into ranges to create additional partitions.

sramirez commented 7 years ago

I know that last condition is not completely fair with the fact that initialThresholds should only generate boundary points, but it is known (I'm going to look for this paper) that most of discretizers are based on measures that mostly select boundary points despite some other points are among the points to evaluate. It means that the last point (or the midpoint between the last and the next one) is always added to avoid more communication between partitions. I think the final result shouldn't be too affected by this problem to be considered as a big problem. Maybe we should check if classification results change a lot or not. In my experiments, I didn't notice that. Actually, I got the same cut points than those generated by the original algorithm (developed by Fayyad).

barrybecker4 commented 7 years ago

I don't mind if the initialThresholds include a few extra thresholds that don't need to be there, but we do need to figure out a way to make the results deterministic. My current pull request is deterministic, but it may not work well for attributes that have so many distinct values that they do not fit on a node. Were you able to reproduce the non-determinism by changing N in setMaster("local[N]") in TestHelper?

sramirez commented 7 years ago

I still think that this kind of determinism is not too bad, and avoids all considerations we are taking into account here. But, maybe we can find a proper solution to improve the original design of this algorithm.

I don't know how to pass accumFreqs to the next partition since all the partitions are being processed concurrently

It can be done as a previous step. But it implies to launch another Spark operation over all data. Communication between partitions implies more than one stage.

It would likely be better to create a more sophisticated partitioner that will at least create partitions for each feature, but will further split large feature partitions as needed.

SortPartitioner was the best choice when I designed this algorithm, as it generates (more or less) equal-sized partitions. The problem is that you have to deal with points from different attributes. The solution I took was to allow these non-boundary points between partitions.

I've also performed some experiments, which you can find below:

sramirez commented 7 years ago

Maybe this link can be useful:

http://stackoverflow.com/questions/23127329/how-to-define-custom-partitioner-for-spark-rdds-of-equally-sized-partition-where#23228151

barrybecker4 commented 7 years ago

Sergio, I fixed the scalability issue that my previous changes (to fix the non-determinism problem) had introduced. I now use a more sophisiticated partitioner that will subdivide the initial thresholds into smaller partitions if their number exceeds maxByPart. Now, no partition will ever exceed maxByPart. The unit tests still run approximately as fast as they did before this change. I added unit tests for the logic which finds the initial thresholds. Please review and merge the pull request when you can. If all looks good, I think you could now craete the release.

barrybecker4 commented 7 years ago

I was running some performance tests with larger datasets. Things seemed to scale ok when using the FewValuesFinder, but not the ManyValuesFinder. I upgraded to spark 2.1.0 and tried restoring the coalesce statement in ManyValuesFinder, but now unit tests are failing again with non-deterministic results. It seems there is still a problem with that coalesce statement when using spark 2.1.0.

barrybecker4 commented 7 years ago

When I added back the coalesce it took 440 seconds on to induce the NB model for my test dataset. It was only taking 164 seconds before I added back the coalesce. In addition the unit tests fail intermittently with the coalesce in. For now, I am leaving it out.

sramirez commented 7 years ago

Ok, I'll try to create a snippet in order to limit the problem with coalesce. If you already have this code, it'd be nice.

About the classification time, have you repeated and averaged the execution time? maybe the result is generated by chance. Anyway, time results should depend on the number of intervals generated. More intervals imply a longer time learning.

barrybecker4 commented 7 years ago

I did run it a few times, and it seemed fairly consistent. It took at least 2x longer with the coalesce on a 4 node cluster. Curious to see if you see similar result. I don't have any special code - just comment out or uncomment the coalesce line.

sramirez commented 7 years ago

@barrybecker4, I've accepted your PR and mixed both versions, but I've noticed that 4 out 24 of your unit tests in MDLPDiscretizerSuite are failing. I've checked with your fork and got the same results. All of them failed because they expect this:

Expected "...ty;-Infinity, 1.4435[9817]E12, Infinity", but got "...ty;-Infinity, 1.4435[658]E12, Infinity"

Any clue about what's happening there?

barrybecker4 commented 7 years ago

What version of spark are you running on? Are they always failing with the same result? Maybe they just need to be updated with the new result after a change in spark. Looking at the difference it does not look very significant. I will try running it myself on 2.1.1 sometime in the next few days.

barrybecker4 commented 7 years ago

I think I have many other changes other than the ones that were in that PR on my master branch of my fork. When I do a diff with your master, I see a lot of differences. I will try merging in your changes and see if it all works. Before merging all the tests are passing. I am running on spark 2.1.1.

barrybecker4 commented 7 years ago

@sramirez, I have merged in your changes from your master branch, but when I diff your master branch and mine, I still see some significant differences that you might want to review. In particular, I'm not sure what the significance of origAttrGroup in transformSchema is for. I upgraded to spark 2.1.1. I updated tests that were failing and pushed to my master branch in my fork. I ran on 3 different machines, but am seeing some intermittent test failures in blockbuster, titanic, and srvRequest40000. I will look and see if I can find the source of the non-determinism. It should be using the deterministic approach in the unit tests since that is the default and it is not explicitly being set.

barrybecker4 commented 7 years ago

Did you run any benchmarks to see how much slower the deterministic approach is from the non-deterministic approach, if at all?

barrybecker4 commented 7 years ago

Once I fixed my minor merge issue, the tests are passing consistently on all 3 of my machines (using spark 2.1.1). Thanks for merging in my deterministic approach.

barrybecker4 commented 7 years ago

Performance-wise, running the deterministic approach seems to be about 35%-50% slower. On one of my machines the time to run all the unit tests goes from 1 min 29 sec to 2 min. On my faster machine, it goes from 38 sec to 56 sec.

sramirez commented 7 years ago

Nice work, Barry. Regarding my adaptation to Spark 2.1.X, origAttrGroup is useful to complete metadata. Concretely, to retrieve the name and the index of continuous features. Calls to OldLabeledPoint and other related functions are no longer needed with your adaptation using Dataset in mllib/MDLPDiscretization. This issue is related to #29, created and solved by @MarcKaminski.

I prefer to maintain the non-deterministic version as default because of faster performance is much attractive than the exact version from my point of view. The deterministic version could be set as default in the unit tests, for sure.

In summary, all your changes are quite valuable for DMDLP. So, could you include the metadata in your fork and create the PR, please? I will accept and merge conflicts.

Tomorrow I will test time performance in my workstation, and will send you some feedback about that.

sramirez commented 7 years ago

I've just merged your fork and add a further commit to introduce all the metadata information included in #29. Feel free to merge my master branch with yours. Tomorrow I will make some tests concerning time performance and discrete results. If it's all ok, I think we can close this issue.

sramirez commented 7 years ago

After some minor changes performed in MDLPDiscretizationSuite regarding precision, all tests have run correctly. However, I got 3/4 errors in the big suite. For example, in the second case I got the following result:

org.scalatest.exceptions.TestFailedException: Expected "...nity;-Infinity, 1.45[602196E12, 1.45611135E12, 1.45641701E12, 1.45642855E12, 1.4564372E12, 1.45644808E12, 1.45647757E12, 1.45649959E12, 1.45650955E12, 1.45651453E12, 1.4565224E12, 1.45654704E12, 1.45673683E12, 1.45675518E12, 1.4567654E12, 1.45677615E12, 1.45679371E12, 1.45681626E12, 1.45684483E12, 1.45686371E12, 1.45687917E12, 1.45691011E12, 1.4569316E12, 1.45699635E12, 1.45700684E12, 1.4570244E12, 1.45704354E12, 1.45708286E12, 1.45711405E12, 1.45711982E12, 1.45716701E12, 1.4571712E12, 1.45720056E12, 1.45722678E12, 1.45728052E12, 1.45730961E12, 1.4573222E12, 1.45734251E12, 1.45742771E12, 1.45743885E12, 1.45745183]E12, Infinity;-Infin...", but got "...nity;-Infinity, 1.45[598946E12, 1.45607885E12, 1.45638451E12, 1.45639604E12, 1.45640496E12, 1.4564157E12, 1.45644559E12, 1.45647862E12, 1.45648281E12, 1.45649015E12, 1.45651453E12, 1.45670432E12, 1.45672267E12, 1.45673303E12, 1.45674365E12, 1.45676147E12, 1.45678402E12, 1.45681233E12, 1.4568312E12, 1.45684693E12, 1.4568776E12, 1.45689936E12, 1.45696358E12, 1.45696463E12, 1.45699032E12, 1.45701103E12, 1.45705035E12, 1.45708168E12, 1.45708732E12, 1.45713463E12, 1.45713883E12, 1.45716832E12, 1.45719532E12, 1.45724814E12, 1.45727711E12, 1.45728995E12, 1.45731014E12, 1.45739534E12, 1.45741237E12, 1.45742011]E12, Infinity;-Infin..."

It seems another problem with precision in results, but more severe. Does it happen to you also?

Concerning performance tests, the time to run all the unit tests with exact version is 105s (3 errors), whereas for the approximate version is 87s (5 errors). All errors in exact version come from "srvRequest40000".

barrybecker4 commented 7 years ago

I'm not sure I understand. What changes were made to cause the test failures. Are the test failures consistent or intermittent? How do you know which set of results for the srvRequest40000 tests are correct? Maybe they just need to be updated if the new cutpoints are better. They look very similar.

sramirez commented 7 years ago

My current master is failing with these unit tests, but your master is also failing. The test failures are consistent, actually, current failures are the same I got before previous merging. I do not know if the discretization scheme in srvRequest is correct, I just took it from your fork. What sequential version of MDLP did you run?

barrybecker4 commented 7 years ago

I ran the tests in my fork's master branch, but did not get any failures. I merged in a few changes from your upstream master and ran the tests 2 more times on linux. Still no failures. I also ran on windows and did not get any failures.

sramirez commented 7 years ago

I ran the tests in your fork's master branch from a unix shell and I got 3 errors from normal and big suites. I'll try in my workstation tomorrow, but I think I also got failures in this machine. It'is quite weird. Is there something non-deterministic that you have added in the last versions? It is possible that data partitioning can still cause this problem? Finally, do your test results come from a sequential version of MDLP?