sramirez / spark-MDLP-discretization

Spark implementation of Fayyad's discretizer based on Minimum Description Length Principle (MDLP)
Apache License 2.0
44 stars 27 forks source link

Upgrade to spark 2.x #27

Closed barrybecker4 closed 7 years ago

barrybecker4 commented 7 years ago

Currently the MDLP code runs fine with spark 1.6.x, but changes are needed in order for it to work using 2.x spark. Probably there needs to be a seprate versioned release of MDLP for spark 2.x.

barrybecker4 commented 7 years ago

I made changes on this branch so that MDLP lib will build with spark 2.0.1 https://github.com/barrybecker4/spark-MDLP-discretization/tree/spark-2.0.1-upgrade-bb

sramirez commented 7 years ago

Is it also compatible with version 1.6.x? If yes, we can just merge with master branch.

barrybecker4 commented 7 years ago

I doubt that it is compatible with 1.6.x, but I have not tried it.

sramirez commented 7 years ago

I think DataSet API was firstly introduced in version 1.6, so it has to be compatible.

sramirez commented 7 years ago

Barry, do you think we could merge your branch for spark 2 with the master branch w/o conflicts? It would be nice because I'm already planning to upload a new version of this project to spark-packages.

barrybecker4 commented 7 years ago

Probably, but I'm not sure. I think it has the correctness changes that you wanted to avoid until we had another parameter to be able to switch between fast/inaccurate/non-deterministic and slow/accurate/deterministic. I also do not have performance numbers that tell me if the performance difference is significant. I have been running with some larger datasets with lots of continuous columns on a cluster. They work, but take longer than I was expecting. Lots of other pressing issues on my plate right now. I will try to get back to it in coming weeks.

sramirez commented 7 years ago

Ok, what I can do is to compare your current master with your 2.0 branch, and apply these changes to my master. I've seen that difference is quite small. By doing so we have more time to decide the other important topic.

MarcKaminski commented 7 years ago

Hi, I would like to use the Spark 2.1 adaption mentioned by @barrybecker4 (https://github.com/esi-mineset/spark-MDLP-discretization), but am struggling to integrate it into my Spark pipeline.

First of all, I noticed that some of the tests are failing for the linked package. To be exact:

- Run MDLPD on all columns in srvRequest40000 data (label = churned, maxBins = 100, maxByPart = 10000) *** FAILED *** [...] (MDLPDiscretizerBigSuite.scala:404)
- Run MDLPD on all columns in titanic data (label = sex) *** FAILED ***
[info]   Expected "...ty;-Infinity, 1.4435[9817]E12, Infinity", but got "...ty;-Infinity, 1.4435[658]E12, Infinity" (MDLPDiscretizerSuite.scala:266)
- Run MDLPD on all columns in titanic data (label = sex, maxByPart = 100) *** FAILED ***
[info]   Expected "...ty;-Infinity, 1.4435[9817]E12, Infinity", but got "...ty;-Infinity, 1.4435[658]E12, Infinity" (MDLPDiscretizerSuite.scala:300)
- Run MDLPD on all columns in titanic data (label = sibsp) *** FAILED ***
[info]   Expected "...ty;-Infinity, 1.4435[9817]E12, Infinity", but got "...ty;-Infinity, 1.4435[658]E12, Infinity" (MDLPDiscretizerSuite.scala:337)

I don't exactly know, if those tests are supposed to run, are flaky or just fail because I am using Spark 2.1.1, but maybe this is of interest for my actual problem.

My data consist of ~2000 features and 100000 rows. The DataFrame that is put into the MDLPDiscretizer consists of two columns, a label column that has been indexed by a StringIndexer and a feature column that has been created by a VectorAssembler.

When fitting my Pipeline, an java.lang.IndexOutOfBoundsException is thrown in mllib/feature/MDLPDiscretizer (in line 194; function findAllThresholds) (See full stack trace below).

Did any of you ever get a similar problem and has an idea how to fix it?

Besides that: I noticed that in your unittests that the Int/ Date columns are casted as Double. Because of the huge number of features, it would be very inconvenient for me to recast all NumberType columns to double, so I would like to know, if this is a requirement of the MDLPDiscretizer stage and if it could be the reason for the exception.

Thanks in advance for an answer. Let me know if I shall create a new bug for this issue!

Cheers, Marc Stack trace:

building model failed: org.apache.spark.SparkException: Job aborted due to stage failure: Task 42 in stage 69.0 failed 4 times, most recent failure: Lost task 42.3 in stage 69.0 (TID 2797, 10.180.251.3, executor 0): java.lang.IndexOutOfBoundsException: 744
___at scala.collection.LinearSeqOptimized$class.apply(LinearSeqOptimized.scala:65)
___at scala.collection.immutable.List.apply(List.scala:84)
___at org.apache.spark.mllib.feature.InitialThresholdsFinder$FeaturePartitioner$1.getPartition(InitialThresholdsFinder.scala:69)
___at org.apache.spark.util.collection.ExternalSorter.getPartition(ExternalSorter.scala:103)
___at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:202)
___at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
___at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
___at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
___at org.apache.spark.scheduler.Task.run(Task.scala:99)
___at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
___at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
___at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
___at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
___at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
___at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
___at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
___at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
___at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
___at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
___at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
___at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
___at scala.Option.foreach(Option.scala:257)
___at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
___at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
___at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
___at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
___at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
___at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
___at org.apache.spark.SparkContext.runJob(SparkContext.scala:1925)
___at org.apache.spark.SparkContext.runJob(SparkContext.scala:1938)
___at org.apache.spark.SparkContext.runJob(SparkContext.scala:1951)
___at org.apache.spark.SparkContext.runJob(SparkContext.scala:1965)
___at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:936)
___at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
___at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
___at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
___at org.apache.spark.rdd.RDD.collect(RDD.scala:935)
___at org.apache.spark.rdd.PairRDDFunctions$$anonfun$countByKey$1.apply(PairRDDFunctions.scala:375)
___at org.apache.spark.rdd.PairRDDFunctions$$anonfun$countByKey$1.apply(PairRDDFunctions.scala:375)
___at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
___at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
___at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
___at org.apache.spark.rdd.PairRDDFunctions.countByKey(PairRDDFunctions.scala:374)
___at org.apache.spark.mllib.feature.MDLPDiscretizer.findAllThresholds(MDLPDiscretizer.scala:194)
___at org.apache.spark.mllib.feature.MDLPDiscretizer.runAll(MDLPDiscretizer.scala:135)
___at org.apache.spark.mllib.feature.MDLPDiscretizer$.train(MDLPDiscretizer.scala:325)
___at org.apache.spark.ml.feature.MDLPDiscretizer.fit(MDLPDiscretizer.scala:135)
___at org.apache.spark.ml.feature.MDLPDiscretizer.fit(MDLPDiscretizer.scala:96)
___at org.apache.spark.ml.Pipeline$$anonfun$fit$2.apply(Pipeline.scala:153)
___at org.apache.spark.ml.Pipeline$$anonfun$fit$2.apply(Pipeline.scala:149)
___at scala.collection.Iterator$class.foreach(Iterator.scala:893)
___at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
___at scala.collection.IterableViewLike$Transformed$class.foreach(IterableViewLike.scala:44)
___at scala.collection.SeqViewLike$AbstractTransformed.foreach(SeqViewLike.scala:37)
___at org.apache.spark.ml.Pipeline.fit(Pipeline.scala:149)
___at main.model.Modeler.runLR(Modeler.scala:246)
___at main.model.Modeler.run(Modeler.scala:105)
___at main.main$.doEval(main.scala:262)
___at main.main$$anonfun$run$1$$anonfun$apply$1$$anonfun$apply$2.apply(main.scala:208)
___at main.main$$anonfun$run$1$$anonfun$apply$1$$anonfun$apply$2.apply(main.scala:201)
___at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
___at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
___at main.main$$anonfun$run$1$$anonfun$apply$1.apply(main.scala:201)
___at main.main$$anonfun$run$1$$anonfun$apply$1.apply(main.scala:193)
___at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
___at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
___at main.main$$anonfun$run$1.apply(main.scala:193)
___at main.main$$anonfun$run$1.apply(main.scala:150)
___at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
___at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
___at main.main$.run(main.scala:150)
___at main.main$.main(main.scala:78)
___at main.main.main(main.scala)
___at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
___at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
___at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
___at java.lang.reflect.Method.invoke(Method.java:498)
___at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:743)
___at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187)
___at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212)
___at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126)
___at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.IndexOutOfBoundsException: 744
___at scala.collection.LinearSeqOptimized$class.apply(LinearSeqOptimized.scala:65)
___at scala.collection.immutable.List.apply(List.scala:84)
___at org.apache.spark.mllib.feature.InitialThresholdsFinder$FeaturePartitioner$1.getPartition(InitialThresholdsFinder.scala:69)
___at org.apache.spark.util.collection.ExternalSorter.getPartition(ExternalSorter.scala:103)
___at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:202)
___at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
___at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
___at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
___at org.apache.spark.scheduler.Task.run(Task.scala:99)
___at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
___at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
___at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
___at java.lang.Thread.run(Thread.java:745)
barrybecker4 commented 7 years ago

I think what you are seeing is a manifestation of https://github.com/sramirez/spark-MDLP-discretization/issues/14 I have fixed this problem in my fork https://github.com/esi-mineset/spark-MDLP-discretization but Sergio was reluctant to take the PR because he felt that it might adversely effect performance. Which fork and branch are you using to build? I believe the unit tests were working when I build with 2.1.0, but I have not checked recently.

CarreauClement commented 7 years ago

Hello, I needed to use the MDLP discretizer on Spark 2.X so I wanted to use yours but it didn’t compile. Anyway, here’s what I did to make it work : In the pom.xml • Changed the version of spark in the pom.xml to 2.X (instead of 1.6.2)

In src/main/scala/org/apache/spark/ml/feature/MDLPDiscretizer.scala • Changed “import org.apache.spark.mllib.linalg.” to “import org.apache.spark.ml.linalg.” • Added “import org.apache.spark.mllib.linalg.{Vectors => OldVectors}” • Changed “val discOp = udf { discModel.transform _ }” to “val discOp = udf ( (mlVector: Vector) => discModel.transform(OldVectors.fromML(mlVector)).asML )”

I think that’s all. Now it compiles, works with Spark 2.X and I can use it in my pipelines (or so it seems ☺)

Can you tell me if this problem is on my side ?

MarcKaminski commented 7 years ago

Thanks for your replies!

@barrybecker4
I was accidentally using branch "spark-2.0.1-upgrade-bb" from your fork, but now switched to master. Fitting the model now works! For transform() to work, I had to implement @CarreauClement's changes, otherwise I was getting an exception:

error from UDF that gets an element of a vector: argument 1 requires vector type, however, '_column_' is of vector type

@CarreauClement Thanks for your comment - it helped me resolve my issue very quickly!

Some of the tests still fail, but it seems that his is due to issue #14.
One further test fails in the fork (- Run MDLPD on all columns in red_train data (label = outcome, maxBins = 100) *** FAILED ***), because the file "red_train.data" is missing, which I could also not find in the repository.

Thanks again for your support and the good work! :)

sramirez commented 7 years ago

Yes, it seems that many changes have been introduced with Spark 2.1.x as I remember that my code worked properly with Spark 2.0.x. Anyway I've made some changes, including those pointed out by @MarcKaminski and @CarreauClement (thank you guys), which has been implemented in the master branch. If you notice something weird, please let me know. Almost all unit test ran successfully.

Here you can find a question in StackOverflow about this problem: https://stackoverflow.com/questions/38818879/matcherror-while-accessing-vector-column-in-spark-2-0#38819323

Apart from that, I'm going to upload a new version to spark-packages.

sramirez commented 7 years ago

New version uploaded, with all the problems fixed. Plus, PR made by @MarcKaminski related to metadata has been included.