microsoft / SynapseML

Simple and Distributed Machine Learning
http://aka.ms/spark
MIT License
5.06k stars 832 forks source link

java.net.ConnectException: Connection refused (Connection refused) with LightGBMClassifier in Databricks #609

Closed emnajaoua closed 5 years ago

emnajaoua commented 5 years ago

I am trying to run this example with my own dataset on databricks. https://github.com/microsoft/recommenders/blob/master/notebooks/02_model/mmlspark_lightgbm_criteo.ipynb My cluster configuration is from 2 until 10 worker nodes. Worker Type is 28.GB Memory, 8 cores. In the beginning of my notebook I set the following properties spark.conf.set("spark.executor.memory", "80g") spark.conf.set("spark.driver.maxResultSize", "6g") but it seems that it doesn't effect the notebook environment.

I am using for the LightGBMClassifier , the library Azure:mmlspark:0.16. My dataset has 1.502.306 rows and 9 columns. It is a spark dataframe, result of 3 joins between 3 SQL Tables (transformed to spark dataframes with the command spark.sql()) I apply feature_processor step to encode the categorical columns. Then after setting the LightGBMClassifier parameter, I train the model. My LightGBMClassifier parameters are : `NUM_LEAVES = 8 NUM_ITERATIONS = 20 LEARNING_RATE = 0.1 FEATURE_FRACTION = 0.8 EARLY_STOPPING_ROUND = 5

Model name

MODEL_NAME = 'lightgbm_criteo.mml'

lgbm = LightGBMClassifier( labelCol="kategorie1", featuresCol="features", objective="multiclass", isUnbalance=True, boostingType="gbdt", boostFromAverage=True, baggingSeed=3, #früher 42 numLeaves=NUM_LEAVES, numIterations=NUM_ITERATIONS, learningRate=LEARNING_RATE, featureFraction=FEATURE_FRACTION, earlyStoppingRound=EARLY_STOPPING_ROUND, timeout=1200.0

parallelism='data_parallel'

) I applied the repartition trick as well before training the model train = train.repartition(50) train.rdd.getNumPartitions() Then when I runmodel = lgbm.fit(train)then I get the following error Py4JJavaError: An error occurred while calling o1125.fit. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 10 in stage 36.0 failed 4 times, most recent failure: Lost task 10.3 in stage 36.0 (TID 3493, 10.139.64.10, executor 7): java.net.ConnectException: Connection refused (Connection refused) at java.net.PlainSocketImpl.socketConnect(Native Method) at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350) at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206) at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188) at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) at java.net.Socket.connect(Socket.java:589) at java.net.Socket.connect(Socket.java:538) at java.net.Socket.(Socket.java:434) at java.net.Socket.(Socket.java:211) at com.microsoft.ml.spark.TrainUtils$.getNodes(TrainUtils.scala:178) at com.microsoft.ml.spark.TrainUtils$$anonfun$5.apply(TrainUtils.scala:211) at com.microsoft.ml.spark.TrainUtils$$anonfun$5.apply(TrainUtils.scala:205) at com.microsoft.ml.spark.StreamUtilities$.using(StreamUtilities.scala:29) at com.microsoft.ml.spark.TrainUtils$.trainLightGBM(TrainUtils.scala:204) at com.microsoft.ml.spark.LightGBMClassifier$$anonfun$3.apply(LightGBMClassifier.scala:83) at com.microsoft.ml.spark.LightGBMClassifier$$anonfun$3.apply(LightGBMClassifier.scala:83) at org.apache.spark.sql.execution.MapPartitionsExec$$anonfun$5.apply(objects.scala:200) at org.apache.spark.sql.execution.MapPartitionsExec$$anonfun$5.apply(objects.scala:197) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:852) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:852) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:340) at org.apache.spark.rdd.RDD.iterator(RDD.scala:304) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:340) at org.apache.spark.rdd.RDD.iterator(RDD.scala:304) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.doRunTask(Task.scala:139) at org.apache.spark.scheduler.Task.run(Task.scala:112) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$13.apply(Executor.scala:497) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1481) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:503) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)

Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2355) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2343) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2342) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2342) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:1096) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:1096) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1096) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2574) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2522) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2510) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:893) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2240) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2338) at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1051) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:379) at org.apache.spark.rdd.RDD.reduce(RDD.scala:1033) at org.apache.spark.sql.Dataset$$anonfun$reduce$1.apply(Dataset.scala:1650) at org.apache.spark.sql.Dataset$$anonfun$withNewRDDExecutionId$1.apply(Dataset.scala:3409) at org.apache.spark.sql.execution.SQLExecution$$anonfun$withCustomExecutionEnv$1.apply(SQLExecution.scala:99) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:228) at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:85) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:158) at org.apache.spark.sql.Dataset.withNewRDDExecutionId(Dataset.scala:3405) at org.apache.spark.sql.Dataset.reduce(Dataset.scala:1649) at com.microsoft.ml.spark.LightGBMClassifier.train(LightGBMClassifier.scala:85) at com.microsoft.ml.spark.LightGBMClassifier.train(LightGBMClassifier.scala:27) at org.apache.spark.ml.Predictor.fit(Predictor.scala:118) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380) at py4j.Gateway.invoke(Gateway.java:295) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:251) at java.lang.Thread.run(Thread.java:748) Caused by: java.net.ConnectException: Connection refused (Connection refused) at java.net.PlainSocketImpl.socketConnect(Native Method) at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350) at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206) at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188) at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) at java.net.Socket.connect(Socket.java:589) at java.net.Socket.connect(Socket.java:538) at java.net.Socket.(Socket.java:434) at java.net.Socket.(Socket.java:211) at com.microsoft.ml.spark.TrainUtils$.getNodes(TrainUtils.scala:178) at com.microsoft.ml.spark.TrainUtils$$anonfun$5.apply(TrainUtils.scala:211) at com.microsoft.ml.spark.TrainUtils$$anonfun$5.apply(TrainUtils.scala:205) at com.microsoft.ml.spark.StreamUtilities$.using(StreamUtilities.scala:29) at com.microsoft.ml.spark.TrainUtils$.trainLightGBM(TrainUtils.scala:204) at com.microsoft.ml.spark.LightGBMClassifier$$anonfun$3.apply(LightGBMClassifier.scala:83) at com.microsoft.ml.spark.LightGBMClassifier$$anonfun$3.apply(LightGBMClassifier.scala:83) at org.apache.spark.sql.execution.MapPartitionsExec$$anonfun$5.apply(objects.scala:200) at org.apache.spark.sql.execution.MapPartitionsExec$$anonfun$5.apply(objects.scala:197) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:852) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:852) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:340) at org.apache.spark.rdd.RDD.iterator(RDD.scala:304) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:340) at org.apache.spark.rdd.RDD.iterator(RDD.scala:304) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.doRunTask(Task.scala:139) at org.apache.spark.scheduler.Task.run(Task.scala:112) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$13.apply(Executor.scala:497) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1481) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:503) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ... 1 more ` I really want to understand the reason behind this error and try the suggestions that you offer. I have been stuck on this problem since 2 weeks. I have read many similar errors, implemented some suggestions like increasing the cluster memory, configuring spark.executor.memory, repartitioning the data but still I cannot train the LightGBMClassifier with my input data.

gramhagen commented 5 years ago

Couple questions, are you running this on Databricks or a managed cluster service? The reason I ask is you will need to make sure the auto-scaling options are disabled for this algorithm to work properly.

Also, have you tried MMLSpark version 0.17? There were improvements to the efficiency of LightGBM that might help.

@imatiach-msft any other ideas for debugging?

emnajaoua commented 5 years ago

Hi, thank you for your reply. I am running this on Databricks. Concerning the auto-scaling option I didn't find as an option here in the lightgbm classifier https://mmlspark.azureedge.net/docs/pyspark/LightGBMClassifier.html

I will try again with MMLSpark version 0.17

imatiach-msft commented 5 years ago

hi @emnajaoua , sorry about the trouble you are having,

from 2 until 10 worker nodes

please make sure to disable dynamic allocation, as that is not supported yet

gramhagen commented 5 years ago

Sorry for the confusion, I meant auto-scaling as an option on the Databricks cluster. See info here: https://docs.databricks.com/user-guide/clusters/sizing.html

Make sure Enable Auto-Scaling is disabled.

emnajaoua commented 5 years ago

I did exactly as you recommended. Auto-scaling is disabled. Now I am working with 10 worker nodes and I have updated also MMLSpark to version 0.17. the model is training since 45 minutes and I am still waiting for the result: image It is stuck in this step. My question is my repartioning correct ? the train data had previously 200 partitions and then I decreased to 40 partitions.

Also, is 10 worker nodes with 28 GB memory for each is enough in my case ?

Thank you in advance :)

imatiach-msft commented 5 years ago

@emnajaoua you have only 10 executors or tasks? It looks like you have 40 tasks. How many rows/columns do you have? What is the current lightgbm debug output in the log4j logs - has it gotten past the network init stage? If it hasn't gotten past network init then it may be stuck and time out (the driver might be waiting to get all of the workers and there may be fewer workers than it is expecting).

imatiach-msft commented 5 years ago

Lightgbm repartitions the data to the number of possible workers/tasks on the cluster, so if you have 200 partitions it will repartition the data before doing training.

emnajaoua commented 5 years ago

@imatiach-msft since I have 10 worker nodes I assume that I have 10 executors. or is not the case ? I have 10 columns (9 features and the column feature according to the feature_process) and 1.502.306 rows in my train data.

Concerning the driver logs, you will find attached the log file, these are the last logs I am getting 19/07/08 15:32:29 INFO ContextCleaner: Cleaned accumulator 1096 (name: number of output rows) 19/07/08 15:32:29 INFO ContextCleaner: Cleaned accumulator 1061 (name: number of files read) 19/07/08 15:32:29 INFO ContextCleaner: Cleaned accumulator 1370 (name: internal.metrics.output.bytesWritten) 19/07/08 15:32:29 INFO ContextCleaner: Cleaned accumulator 1100 (name: collision rate (min, med, max)) 19/07/08 15:32:29 INFO ContextCleaner: Cleaned accumulator 1308 (name: internal.metrics.input.sampledBytesRead) 19/07/08 15:32:29 INFO ContextCleaner: Cleaned accumulator 1063 (name: dynamic partition pruning time total (min, med, max)) 19/07/08 15:32:29 INFO ContextCleaner: Cleaned accumulator 1032 (name: spill write time total (min, med, max)) 19/07/08 15:32:29 INFO ContextCleaner: Cleaned accumulator 1079 (name: number of output rows) 19/07/08 15:32:29 INFO ContextCleaner: Cleaned accumulator 1271 (name: internal.metrics.shuffle.read.recordsRead) 19/07/08 15:32:29 INFO BlockManagerInfo: Removed broadcast_33_piece0 on 10.139.64.4:36295 in memory (size: 46.3 KB, free: 9.4 GB) 19/07/08 15:32:29 INFO ContextCleaner: Cleaned shuffle 38 19/07/08 15:32:29 INFO ContextCleaner: Cleaned accumulator 1003 (name: dynamic partition pruning time total (min, med, max)) 19/07/08 15:32:29 INFO ContextCleaner: Cleaned accumulator 1176 (name: internal.metrics.shuffle.read.localBlocksFetched) 19/07/08 15:32:29 INFO ContextCleaner: Cleaned accumulator 1141 (name: internal.metrics.peakExecutionMemory) 19/07/08 15:32:29 INFO BlockManagerInfo: Removed broadcast_33_piece0 on 10.139.64.5:46621 in memory (size: 46.3 KB, free: 9.4 GB) 19/07/08 15:32:29 INFO BlockManagerInfo: Removed broadcast_33_piece0 on 10.139.64.7:34257 in memory (size: 46.3 KB, free: 9.4 GB) 19/07/08 15:32:29 INFO BlockManagerInfo: Removed broadcast_33_piece0 on 10.139.64.15:36649 in memory (size: 46.3 KB, free: 9.4 GB) 19/07/08 15:32:29 INFO BlockManagerInfo: Removed broadcast_33_piece0 on 10.139.64.9:38729 in memory (size: 46.3 KB, free: 9.4 GB) 19/07/08 15:37:26 INFO HiveMetaStore: 2: get_database: default 19/07/08 15:37:26 INFO audit: ugi=root ip=unknown-ip-addr cmd=get_database: default
19/07/08 15:37:26 INFO DriverCorral: Metastore health check ok 19/07/08 15:37:26 INFO DriverCorral: DBFS health check ok 19/07/08 15:42:26 INFO HiveMetaStore: 2: get_database: default 19/07/08 15:42:26 INFO audit: ugi=root ip=unknown-ip-addr cmd=get_database: default
19/07/08 15:42:26 INFO DriverCorral: Metastore health check ok 19/07/08 15:42:26 INFO DriverCorral: DBFS health check ok 19/07/08 15:47:26 INFO HiveMetaStore: 2: get_database: default 19/07/08 15:47:26 INFO audit: ugi=root ip=unknown-ip-addr cmd=get_database: default
19/07/08 15:47:26 INFO DriverCorral: Metastore health check ok 19/07/08 15:47:26 INFO DriverCorral: DBFS health check ok 19/07/08 15:52:26 INFO HiveMetaStore: 2: get_database: default 19/07/08 15:52:26 INFO audit: ugi=root ip=unknown-ip-addr cmd=get_database: default
19/07/08 15:52:26 INFO DriverCorral: Metastore health check ok 19/07/08 15:52:26 INFO DriverCorral: DBFS health check ok

emnajaoua commented 5 years ago

I apologize I don't know exactly how to attach the log file that I got from driver logs but I think that it is not stuck at the init process. Concerning the repartitioning thing, do you think then that it is useless to repartition the train data before training the model ?

imatiach-msft commented 5 years ago

@emnajaoua how many CPUs do you have per worker? Can you make sure it is set to 1? That was a bug fixed on latest master (to allow multiple CPUs per task). Are you running binary or multiclass classification and how many classes? Does each partition have at least one of each label?

imatiach-msft commented 5 years ago

@emnajaoua you can also try running the build from latest master which has several fixes: --packages com.microsoft.ml.spark:mmlspark_2.11:0.17.dev27 and --repositories https://mmlspark.azureedge.net/maven

You can also try using barrier execution mode as I recommended here: https://github.com/Azure/mmlspark/issues/600

you can set .setUseBarrierExecutionMode(true) in scala or useBarrierExecutionMode=True in python.

see doc for more info on barrier execution mode: https://github.com/Azure/mmlspark/blob/master/docs/lightgbm.md#barrier-execution-mode Which was introduced in spark 2.4.

emnajaoua commented 5 years ago

@imatiach-msft Thank you for your help. I don't know exactly how many CPUs per worker but I would like to know. Here is an image of my cluster configuration image I am not sure how to set 1 CPU per worker. is it like spark.conf.set property ?

I am running a multiclass classification: 13 classes. To be honest I am not sure that each partition have at least one label/class but I would like to know how to do it or if you have any example for that.

imatiach-msft commented 5 years ago

@emnajaoua it looks like you are using databricks, which by default has 1 CPU per task, so I think you are good unless you explicitly set

spark.task.cpus 2

eg: image

emnajaoua commented 5 years ago

@emnajaoua you can also try running the build from latest master which has several fixes: --packages com.microsoft.ml.spark:mmlspark_2.11:0.17.dev27 and --repositories https://mmlspark.azureedge.net/maven

You can also try using barrier execution mode as I recommended here:

600

you can set .setUseBarrierExecutionMode(true) in scala or useBarrierExecutionMode=True in python.

see doc for more info on barrier execution mode: https://github.com/Azure/mmlspark/blob/master/docs/lightgbm.md#barrier-execution-mode Which was introduced in spark 2.4.

I will try again with the useBarrierExecutionMode=True and the build from the last master. I will let you know tomorrow how did it go :)

emnajaoua commented 5 years ago

@emnajaoua it looks like you are using databricks, which by default has 1 CPU per task, so I think you are good unless you explicitly set

spark.task.cpus 2

eg: image

in my Spark config there is no specification for spark.task.cpus so I guess it is one CPU per task.

imatiach-msft commented 5 years ago

can you also do a count of the distinct classes:

https://stackoverflow.com/questions/30218140/spark-how-to-translate-countdistinctvalue-in-dataframe-apis

you can use countDistinct as one way to find out how many of each class you have, but not per partition

emnajaoua commented 5 years ago

@imatiach-msft I have implemented the countDistinct on the raw data (before feature_processor) to know the number of classes as you recommended. as I mentioned before here are the 13 classes image

emnajaoua commented 5 years ago

@imatiach-msft as you recommended to me, I installed the latest repo for mmlspark and I set useBarrierExecutionMode=True. The model is still training (it has been 3 hours on this step): image

according to the logs (stdout) for one worker, it is still training image

imatiach-msft commented 5 years ago

@emnajaoua if you have gotten to that point then you must be training and you have gotten past the network initialization stage, which is where you were getting the connection refused error. When you look at the workers, did any of them have status failure or success? Sometimes a worker may exit earlier, I've seen that happen when a partition only has one label, but I think that is unlikely in your scenario. Also, with 13 classes training might take longer than usual, but based on your dataset size I don't think it should take 3 hours. Are you seeing iterations progress in the debug? I think it might be easier to just discuss over skype or teams, I was able to resolve an issue like that yesterday. I can send you a teams link.

imatiach-msft commented 5 years ago

I wonder if the multiclass classifier is having issues with different numbers of labels on different partitions, since some of the labels have very few instances (eg 1 or 2). Maybe I can add a test and see if I can reproduce this issue.

emnajaoua commented 5 years ago

@imatiach-msft concerning whether the workers have failure or success status, I stopped the training now but if it is okey with you we can schedule today a skype meeting and before that I will run my notebook again so we can check the worker statues and depending on that, we will adjust :)

imatiach-msft commented 5 years ago

@emnajaoua here is a teams link if that works for you: https://teams.microsoft.com/l/meetup-join/19%3ameeting_NmU1NjgyODEtYjBkNS00YTg2LTk2MDYtNzdkNTc2NWUxZDU2%40thread.v2/0?context=%7b%22Tid%22%3a%2272f988bf-86f1-41af-91ab-2d7cd011db47%22%2c%22Oid%22%3a%227ac33778-88d2-407b-bfe1-d64366fff0e4%22%7d


Join Microsoft Teams Meeting +1 347-991-7781 United States, New York City (Toll) (866) 641-7188 (Toll-free) Conference ID: 703 977 44# Local numbers | Reset PIN | Learn more about Teams | Meeting options


when would you prefer to discuss?

emnajaoua commented 5 years ago

@imatiach-msft I trained the algorithm again without using repartitioning for the training data. this is what I got as an error: org.apache.spark.scheduler.BarrierJobUnsupportedRDDChainException: [SPARK-24820][SPARK-24821]: Barrier execution mode does not allow the following pattern of RDD chain within a barrier stage:


Py4JJavaError Traceback (most recent call last)

in () ----> 1 model = lgbm.fit(train) /databricks/spark/python/pyspark/ml/base.py in fit(self, dataset, params) 130 return self.copy(params)._fit(dataset) 131 else: --> 132 return self._fit(dataset) 133 else: 134 raise ValueError("Params must be either a param map or a list/tuple of param maps, " /databricks/spark/python/pyspark/ml/wrapper.py in _fit(self, dataset) 293 294 def _fit(self, dataset): --> 295 java_model = self._fit_java(dataset) 296 model = self._create_model(java_model) 297 return self._copyValues(model) /databricks/spark/python/pyspark/ml/wrapper.py in _fit_java(self, dataset) 290 """ 291 self._transfer_params_to_java() --> 292 return self._java_obj.fit(dataset._jdf) 293 294 def _fit(self, dataset): /databricks/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args) 1255 answer = self.gateway_client.send_command(command) 1256 return_value = get_return_value( -> 1257 answer, self.gateway_client, self.target_id, self.name) 1258 1259 for temp_arg in temp_args: /databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw) 61 def deco(*a, **kw): 62 try: ---> 63 return f(*a, **kw) 64 except py4j.protocol.Py4JJavaError as e: 65 s = e.java_exception.toString() /databricks/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) 326 raise Py4JJavaError( 327 "An error occurred while calling {0}{1}{2}.\n". --> 328 format(target_id, ".", name), value) 329 else: 330 raise Py4JError( Py4JJavaError: An error occurred while calling o433.fit. : org.apache.spark.scheduler.BarrierJobUnsupportedRDDChainException: [SPARK-24820][SPARK-24821]: Barrier execution mode does not allow the following pattern of RDD chain within a barrier stage: 1. Ancestor RDDs that have different number of partitions from the resulting RDD (eg. union()/coalesce()/first()/take()/PartitionPruningRDD). A workaround for first()/take() can be barrierRdd.collect().head (scala) or barrierRdd.collect()[0] (python). 2. An RDD that depends on multiple barrier RDDs (eg. barrierRdd1.zip(barrierRdd2)). at org.apache.spark.scheduler.DAGScheduler.checkBarrierStageWithRDDChainPattern(DAGScheduler.scala:510) at org.apache.spark.scheduler.DAGScheduler.createResultStage(DAGScheduler.scala:585) at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:1132) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2531) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2522) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2510) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:893) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2240) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2338) at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1051) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:379) at org.apache.spark.rdd.RDD.reduce(RDD.scala:1033) at com.microsoft.ml.spark.LightGBMBase$class.innerTrain(LightGBMBase.scala:87) at com.microsoft.ml.spark.LightGBMClassifier.innerTrain(LightGBMClassifier.scala:24) at com.microsoft.ml.spark.LightGBMBase$class.train(LightGBMBase.scala:37) at com.microsoft.ml.spark.LightGBMClassifier.train(LightGBMClassifier.scala:24) at com.microsoft.ml.spark.LightGBMClassifier.train(LightGBMClassifier.scala:24) at org.apache.spark.ml.Predictor.fit(Predictor.scala:118) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380) at py4j.Gateway.invoke(Gateway.java:295) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:251) at java.lang.Thread.run(Thread.java:748)
emnajaoua commented 5 years ago

@emnajaoua here is a teams link if that works for you: https://teams.microsoft.com/l/meetup-join/19%3ameeting_NmU1NjgyODEtYjBkNS00YTg2LTk2MDYtNzdkNTc2NWUxZDU2%40thread.v2/0?context=%7b%22Tid%22%3a%2272f988bf-86f1-41af-91ab-2d7cd011db47%22%2c%22Oid%22%3a%227ac33778-88d2-407b-bfe1-d64366fff0e4%22%7d

Join Microsoft Teams Meeting +1 347-991-7781 United States, New York City (Toll) (866) 641-7188 (Toll-free) Conference ID: 703 977 44# Local numbers | Reset PIN | Learn more about Teams | Meeting options

when would you prefer to discuss?

I am available now if you like :)

imatiach-msft commented 5 years ago

@emnajaoua sure we can meet right now

imatiach-msft commented 5 years ago

debugged on call, this issue looks similar to https://github.com/Azure/mmlspark/issues/569 , the workers seem to be getting out of sync. I wonder if it is due to unbalanced classes, since for some classes there are only 1 or 2 instances.

emnajaoua commented 5 years ago

as disussed yesterday, I will try the first solution which is removing classes that have less instances. This is the list of classes that I have

image they have more or less instances comparing to the previous dataframe.

So the classes now have more instances than before. I have used the same configuration as discussed yesterday in the meeting but I received the connection refused error again. Here are screenshots from the worker stderr: image

image I doubt that this is due to another notebook in databricks using the same resources or this cannot be the reason ? I will test again tomorrow and will update you here.

imatiach-msft commented 5 years ago

@emnajaoua "due to another notebook in databricks using the same resources or this cannot be the reason" I don't think that would be the reason. The iterations are in order here though, which looks much better than when we looked at it. How many iterations did you set, and how long did you run it for? Also, did you shuffle the dataset beforehand (I believe the classes should be distributed across all partitions such that on each partition at least one instance of each class should appear, but I could be wrong)?

imatiach-msft commented 5 years ago

also if you remove classes 12, 3, 5, 7 - do you still see the connection refused error? What if you just have classes 2 and 4 and use binary classification? Also, are you using string indexer on the label column - the labels need to start from 0 to n. I'm also wondering if you could try using multiclassova objective, although it would be much slower, it's worth trying it out: https://github.com/microsoft/LightGBM/blob/master/docs/Parameters.rst#objective

imatiach-msft commented 5 years ago

hi @emnajaoua I was able to reproduce the issue for multiclass classifier, I've put the test case in a branch:

https://github.com/imatiach-msft/mmlspark/commit/c2568b11ab6e4f74a7f349d84cb8358437eed2b2

I've confirmed the following:

1.) If the labels on a partition skip a value, eg [0 to j] inclusive, and [j+2 to k], lightgbm multiclass classifier gets stuck 2.) if parititions have different labels but some have fewer than others, eg one has 0 to k and another has 0 to k+1, lightgbm multiclass classifier finishes

My recommendation is to ensure that all partitions have all labels from 0 to total number of labels.

In your image above you have labels 12, 1, 3, 5, 4, 8, 7, 2 - but you are missing label 0, 6, 9, 10, 11. Can you try running StringIndexer or ValueIndexer, as I do in the multiclass lightgbm tests:

import com.microsoft.ml.spark.featurize.ValueIndexer
val labelizer = new ValueIndexer().setInputCol(labelColumnName).setOutputCol(labelColumnName).fit(tmpTrainData)
    val labelizedData = labelizer.transform(tmpTrainData)
imatiach-msft commented 5 years ago

it looks like there is a stratified sampler by key in spark, I think that could be used as a workaround for now: https://spark.apache.org/docs/2.4.0/api/python/pyspark.html#pyspark.RDD.sampleByKey

emnajaoua commented 5 years ago

ong did you run it for?

yes the iterations look in order. I have run it for 10 minutes and then it gave me the connection error again. I didn't shuffle the dataset beforehand. I will start by trying this and will let you know afterwards. I wanna try your suggestions one by one. Thank you so much for your help so far :)

emnajaoua commented 5 years ago

hi @emnajaoua I was able to reproduce the issue for multiclass classifier, I've put the test case in a branch:

imatiach-msft@c2568b1

I've confirmed the following:

1.) If the labels on a partition skip a value, eg [0 to j] inclusive, and [j+2 to k], lightgbm multiclass classifier gets stuck 2.) if parititions have different labels but some have fewer than others, eg one has 0 to k and another has 0 to k+1, lightgbm multiclass classifier finishes

My recommendation is to ensure that all partitions have all labels from 0 to total number of labels.

In your image above you have labels 12, 1, 3, 5, 4, 8, 7, 2 - but you are missing label 0, 6, 9, 10, 11. Can you try running StringIndexer or ValueIndexer, as I do in the multiclass lightgbm tests:

import com.microsoft.ml.spark.featurize.ValueIndexer
val labelizer = new ValueIndexer().setInputCol(labelColumnName).setOutputCol(labelColumnName).fit(tmpTrainData)
    val labelizedData = labelizer.transform(tmpTrainData)

I tried this solution using python: from pyspark.ml.feature import ValueIndexer labelizer = ValueIndexer().setInputCol('kategorie1').setOutputCol('kategorie1').fit(train) labelizedData = labelizer.transform(train)

but I get this error: ImportError: cannot import name 'ValueIndexer' although I checked that this ValueIndexer function exists here http://mmlspark.azureedge.net/docs/pyspark/ValueIndexer.html?highlight=valueindexer#module-ValueIndexer

imatiach-msft commented 5 years ago

It's under the mmlspark import namespace, not pyspark. Also you can try using StringIndexer.

Get Outlook for Androidhttps://aka.ms/ghei36


From: emnajaoua notifications@github.com Sent: Thursday, July 11, 2019 7:16:04 AM To: Azure/mmlspark mmlspark@noreply.github.com Cc: Ilya Matiach ilmat@microsoft.com; Mention mention@noreply.github.com Subject: Re: [Azure/mmlspark] java.net.ConnectException: Connection refused (Connection refused) with LightGBMClassifier in Databricks (#609)

hi @emnajaouahttps://nam06.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Femnajaoua&data=02%7C01%7Cilmat%40microsoft.com%7C430fa0199ab540c1572308d705f12aa8%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C636984405668548557&sdata=%2B78%2BHNxuT%2FogBXY1BltIsf6dIoS0UvDNusn1weXWaMY%3D&reserved=0 I was able to reproduce the issue for multiclass classifier, I've put the test case in a branch:

imatiach-msft@c2568b1https://nam06.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fimatiach-msft%2Fmmlspark%2Fcommit%2Fc2568b11ab6e4f74a7f349d84cb8358437eed2b2&data=02%7C01%7Cilmat%40microsoft.com%7C430fa0199ab540c1572308d705f12aa8%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C636984405668548557&sdata=iXDkFm4g4O62tEnCro601NchG%2Bzy20ySZBzOF8IuwWU%3D&reserved=0

I've confirmed the following:

1.) If the labels on a partition skip a value, eg [0 to j] inclusive, and [j+2 to k], lightgbm multiclass classifier gets stuck 2.) if parititions have different labels but some have fewer than others, eg one has 0 to k and another has 0 to k+1, lightgbm multiclass classifier finishes

My recommendation is to ensure that all partitions have all labels from 0 to total number of labels.

In your image above you have labels 12, 1, 3, 5, 4, 8, 7, 2 - but you are missing label 0, 6, 9, 10, 11. Can you try running StringIndexer or ValueIndexer, as I do in the multiclass lightgbm tests:

import com.microsoft.ml.spark.featurize.ValueIndexer val labelizer = new ValueIndexer().setInputCol(labelColumnName).setOutputCol(labelColumnName).fit(tmpTrainData) val labelizedData = labelizer.transform(tmpTrainData)

I tried this solution using python: from pyspark.ml.feature import ValueIndexer labelizer = ValueIndexer().setInputCol('kategorie1').setOutputCol('kategorie1').fit(train) labelizedData = labelizer.transform(train)

but I get this error: ImportError: cannot import name 'ValueIndexer' although I checked that this ValueIndexer function exists here http://mmlspark.azureedge.net/docs/pyspark/ValueIndexer.html?highlight=valueindexer#module-ValueIndexerhttps://nam06.safelinks.protection.outlook.com/?url=http%3A%2F%2Fmmlspark.azureedge.net%2Fdocs%2Fpyspark%2FValueIndexer.html%3Fhighlight%3Dvalueindexer%23module-ValueIndexer&data=02%7C01%7Cilmat%40microsoft.com%7C430fa0199ab540c1572308d705f12aa8%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C636984405668558565&sdata=LEnTVUe%2FCevD4hfHnWiqjuuDjx57ZfzRxZk%2FIjS0Kwg%3D&reserved=0

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHubhttps://nam06.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2FAzure%2Fmmlspark%2Fissues%2F609%3Femail_source%3Dnotifications%26email_token%3DAF4KFMHJJD7RQWZF45BHDR3P64I7JA5CNFSM4H63URZ2YY3PNVWWK3TUL52HS4DFVREXG43VMVBW63LNMVXHJKTDN5WW2ZLOORPWSZGODZWMBQA%23issuecomment-510443712&data=02%7C01%7Cilmat%40microsoft.com%7C430fa0199ab540c1572308d705f12aa8%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C636984405668568574&sdata=5dczBw1VeIIiPujSRo9DIjHY2O1KCVpUZ%2Fwx6SDStMA%3D&reserved=0, or mute the threadhttps://nam06.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fnotifications%2Funsubscribe-auth%2FAF4KFMBUUNCBJSMM4I6G4XLP64I7JANCNFSM4H63URZQ&data=02%7C01%7Cilmat%40microsoft.com%7C430fa0199ab540c1572308d705f12aa8%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C636984405668568574&sdata=y6STc2%2FYTmtzDeMq8ssexlHPiYW2ZaVYzszZUyzCQII%3D&reserved=0.

emnajaoua commented 5 years ago

Si I tried shuffling the data gives this error: image it didn't even get me to the training iterations process. this error occurs just after initialization. what I used as code for shuffling the data is : shuffledDF = projectYFeaturesTransformed_df_less_classes.orderBy(rand())

emnajaoua commented 5 years ago

hi @emnajaoua I was able to reproduce the issue for multiclass classifier, I've put the test case in a branch:

imatiach-msft@c2568b1

I've confirmed the following:

1.) If the labels on a partition skip a value, eg [0 to j] inclusive, and [j+2 to k], lightgbm multiclass classifier gets stuck 2.) if parititions have different labels but some have fewer than others, eg one has 0 to k and another has 0 to k+1, lightgbm multiclass classifier finishes

My recommendation is to ensure that all partitions have all labels from 0 to total number of labels.

In your image above you have labels 12, 1, 3, 5, 4, 8, 7, 2 - but you are missing label 0, 6, 9, 10, 11. Can you try running StringIndexer or ValueIndexer, as I do in the multiclass lightgbm tests:

import com.microsoft.ml.spark.featurize.ValueIndexer
val labelizer = new ValueIndexer().setInputCol(labelColumnName).setOutputCol(labelColumnName).fit(tmpTrainData)
    val labelizedData = labelizer.transform(tmpTrainData)

I tried this code with only the following classes image

the code is : labelizer = ValueIndexer(inputCol="kategorie1", outputCol="kategorie1").fit(train) labelizedData = labelizer.transform(train)

I got this error again in the stderr of one of the workers image then the famous connection refused error

emnajaoua commented 5 years ago

@imatiach-msft I have good news. When I used stratified sampling with the only classes [1, 2, 3, 4, 5, 7, 8, 12] as you suggested previously, it worked. My model was able to train, transform and then gave 0.67 accuracy as well. The exact data processing code that I used is the following

projectYFeaturesTransformed_df_less_classes = projectYFeaturesTransformed_df.filter(col("kategorie1").isin([12, 1, 3, 5, 4, 8, 7, 2])) raw_train, raw_test = spark_random_split(projectYFeaturesTransformed_df_less_classes, ratio=0.8, seed=42) columns = [c for c in projectYFeaturesTransformed_df_less_classes.columns if c != 'kategorie1']

feature_processor = FeatureHasher(inputCols=columns, outputCol='features') train = feature_processor.transform(raw_train) test = feature_processor.transform(raw_test) fractions = train.select("kategorie1").distinct().withColumn("fraction", lit(0.3)).rdd.collectAsMap() print(fractions)

sampled_df = train.stat.sampleBy("kategorie1", fractions, seed=12)

Now I need to use all the classes not juste the one that have more instances!

emnajaoua commented 5 years ago

@imatiach-msft I am afraid to tell you that I coulnd't enjoy the success for so long because when I tried with more data but using the same process, I received this connection error again. This is the reason in the stderr behind it: image

imatiach-msft commented 5 years ago

@emnajaoua I think the problem may be that some labels have only 1-2 instances, which is not enough across 40 partitions. We need to have at least 1 instance per partition. sampleBy is not with replacement, so the learner still gets stuck.

imatiach-msft commented 5 years ago

@emnajaoua sorry maybe we can debug on teams again? '[1, 2, 3, 4, 5, 7, 8, 12]' shouldn't work, the labels need to be ordered from 0 to n across all partitions. Did you use StringIndexer or ValueIndexer? I don't see that in the code above.

imatiach-msft commented 5 years ago

@emnajaoua We can use the same link if that works for you:

https://teams.microsoft.com/l/meetup-join/19%3ameeting_NmU1NjgyODEtYjBkNS00YTg2LTk2MDYtNzdkNTc2NWUxZDU2%40thread.v2/0?context=%7b%22Tid%22%3a%2272f988bf-86f1-41af-91ab-2d7cd011db47%22%2c%22Oid%22%3a%227ac33778-88d2-407b-bfe1-d64366fff0e4%22%7d

Join Microsoft Teams Meeting +1 347-991-7781 United States, New York City (Toll) (866) 641-7188 (Toll-free) Conference ID: 703 977 44# Local numbers | Reset PIN | Learn more about Teams | Meeting options

when would you prefer to discuss?

emnajaoua commented 5 years ago

@emnajaoua I think the problem may be that some labels have only 1-2 instances, which is not enough across 40 partitions. We need to have at least 1 instance per partition. sampleBy is not with replacement, so the learner still gets stuck.

I don't think that this is the problem because I am still training my model with just the classes mentioned before. I used ValueIndexer as I mentioned before in the code :) the idea or suggestion of using the ValueIndexer didn#t work. what it worked is applying the stratified sampler. Please look at the code above. Give me sometime and I will call you on teams so I can show you the progress.

imatiach-msft commented 5 years ago

" shuffledDF = projectYFeaturesTransformed_df_less_classes.orderBy(rand())" that should work, I'm not sure why that is running out of memory

imatiach-msft commented 5 years ago

@emnajaoua I think you need to use both ValueIndexer and stratified sampler. That was the only thing that worked for me consistently.

emnajaoua commented 5 years ago

" shuffledDF = projectYFeaturesTransformed_df_less_classes.orderBy(rand())" that should work, I'm not sure why that is running out of memory

this was the first thing that I tried for today but it didn#t work unfortunately

emnajaoua commented 5 years ago

@emnajaoua I think you need to use both ValueIndexer and stratified sampler. That was the only thing that worked for me consistently.

I will try the combination of both then. the ValueIndexer and then the stratified sampler and I will let you know the result but for now the stratified sampler works! the thing is that I was always working with a part of my dataset (select where date is between date1 and date2) but when I did select then model.fit(train) didn#t work !

emnajaoua commented 5 years ago

In fact I am confused to know the correct value that I should use in the stratified sampling here fractions = train.select("kategorie1").distinct().withColumn("fraction", lit(0.3)).rdd.collectAsMap() is there any formula for that ?

imatiach-msft commented 5 years ago

if you get the distinct values, eg 0 through 9 inclusive, you can then decide the fraction for each, eg 1/n, which would be 0.1 for each value in this case. When you have some labels with only one or two rows using 1/n is probably unreasonable. I agree that it's difficult to figure out, and it partly depends on the use case - do you want those classes with only one or two rows to be very accurate? In that case you can oversample more and increase the fraction in order to get better accuracy for those labels, which may be at the cost of accuracy for other labels.

imatiach-msft commented 5 years ago

I think using 1/n is a good starting point though, and you can iterate from there. Using 0.3 won't work above because the total won't add up to 1, eg if you have 10 labels the total will be 0.3 * 10 = 3, but the sum needs to be 1.

imatiach-msft commented 5 years ago

@emnajaoua I added a stratified repartition stage in my branch, will try to get a build to you by Monday with a combination of that transform and early termination in lightgbm if labels are incorrect on one of the partitions (instead of getting stuck): https://github.com/imatiach-msft/mmlspark/commits/ilmat/lgbm-multiclass-stuck

I validated that using that transformer fixed the issue for me locally.