microsoft / SynapseML

Simple and Distributed Machine Learning
http://aka.ms/spark
MIT License
5.07k stars 832 forks source link

[LightGBM] Train Lambdamart failed with "org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 1" #879

Closed ce39906 closed 4 years ago

ce39906 commented 4 years ago

Describe the bug Hi, @imatiach-msft , I'm using mmlspark0.18.1 to train a ranking job. This below is the main training flow.

def run(self):
        train_df = self.load_svm_rank_data()
        df = train_df.repartition(8, 'query_id')
        model = LightGBMRanker(
            parallelism='data_parallel',
            #  parallelism='voting_parallel',
            objective='lambdarank',
            boostingType='gbdt',
            numIterations=500,
            learningRate=0.1,
            # For recall task, 511,8 is enough.
            # numLeaves=511,
            # maxDepth=8,
            numLeaves=1023,
            maxDepth=10,
            earlyStoppingRound=0,
            maxPosition=20,
            #minSumHessianInLeaf=0.0005,
            minSumHessianInLeaf=0.001,
            lambdaL1=0.01,
            lambdaL2=0.01,
            isProvideTrainingMetric=True,
            #  baggingSeed=3,
            #  boostFromAverage=True,
            #  categoricalSlotIndexes=None,
            #  categoricalSlotNames=None,
            defaultListenPort=49650,
            #  defaultListenPort=12400,
            featuresCol='features',
            groupCol='query_id',
            #  initScoreCol=None,
            labelCol='label',
            #  labelGain=[],
            #  modelString='',
            numBatches=0,
            #  predictionCol='prediction',
            timeout=600000.0,
            #  useBarrierExecutionMode=False,
            #  validationIndicatorCol=None,
            verbosity=1,
            #  weightCol=None,
        ).fit(df)

This below is the spark job config.

/opt/meituan/spark-2.2/bin/spark-submit     --deploy-mode cluster --queue root.zw03_training.hadoop-map.training --executor-cores 40 --num-executors 8 --master yarn --driver-memory 8G --files /opt/meituan/spark-2.2/conf/hive-site.xml --executor-memory 16G --files /opt/tmp/etl/remote_file/session_D5E23EBC14BCEA4F_pysparkjar_00877de2cbfbf624dca5ac527f415c9e/city_province_list --repositories http://pixel.sankuai.com/repository/group-releases,http://pixel.sankuai.com/repository/mtdp --conf spark.yarn.maxAppAttempts=1 --conf spark.task.cpus=40 --conf spark.sql.autoBroadcastJoinThreshold=-1 --conf spark.kryoserializer.buffer.max=1024m --conf spark.driver.maxResultSize=10G --conf spark.executor.instances=8 --conf spark.hadoop.parquet.enable.summary-metadata=false --conf spark.executor.heartbeatInterval=30s --conf spark.default.parallelism=1024 --conf spark.sql.hive.metastorePartitionPruning=true --conf spark.yarn.driver.memoryOverhead=8096 --conf spark.sql.orc.filterPushdown=true --conf spark.sql.parquet.filterPushdown=true --conf spark.sql.shuffle.partitions=1024 --conf spark.sql.orc.splits.include.file.footer=true --conf spark.jars.packages=com.microsoft.ml.spark:mmlspark_2.11:0.18.1 --conf spark.sql.orc.cache.stripe.details.size=10000 --conf spark.sql.parquet.mergeSchema=false --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.yarn.executor.memoryOverhead=60G --conf spark.yarn.am.extraJavaOptions="-DappIdentify=hope_3375504 -Dport=AppMaster " --conf spark.driver.extraJavaOptions="-DappIdentify=hope_3375504 -Dport=Driver -XX:PermSize=128M -XX:MaxPermSize=256M " --conf spark.executor.extraJavaOptions="-DappIdentify=hope_3375504 -Dport=Executor "          --name huobaochong:/opt/meituan/20200616/topk_train_v2/shanghai/topk_train/topk_train.hope     --conf spark.job.owner=huobaochong     --conf spark.client.host=zw02-data-msp-launcher13.mt     --conf spark.job.type=mtmsp     --conf spark.flowid=D5E23EBC14BCEA4F     --conf spark.yarn.app.tags.flowid=D5E23EBC14BCEA4F     --conf spark.yarn.app.tags.schedulejobid=cantor-6177712     --conf spark.yarn.app.tags.scheduleinstanceid=     --conf spark.yarn.app.tags.scheduleplanid=     --conf spark.yarn.app.tags.onceexecid=once-exec-6163959     --conf spark.yarn.app.tags.rm.taskcode=hope:huobaochong:/opt/meituan/20200616/topk_train_v2/shanghai/topk_train/topk_train.hope     --conf spark.yarn.app.tags.rm.taskname=huobaochong:/opt/meituan/20200616/topk_train_v2/shanghai/topk_train/topk_train.hope     --conf spark.yarn.app.tags.rm.tasktype=hope     --conf spark.yarn.app.tags.mtmspCompileVersion=0     --conf spark.yarn.job.priority=1     --conf spark.hive.mt.metastore.audit.id=SPARK-MTMSP-D5E23EBC14BCEA4F     --conf spark.hadoop.hive.mt.metastore.audit.id=SPARK-MTMSP-D5E23EBC14BCEA4F     --conf spark.hbo.enabled=true     --conf spark.executor.cantorEtlIncreaseMemory.enabled=true     /opt/tmp/etl/remote_file/session_D5E23EBC14BCEA4F_pysparkjar_00877de2cbfbf624dca5ac527f415c9e/topk_train.py     20200615190316-v0.0.3_china-20200505-20200520-common-staging shangha

This below is the error info. image stdout from the driver node:

py4j.protocol.Py4JJavaError: An error occurred while calling o149.fit. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 1.1 failed 4 times, most recent failure: Lost task 3.3 in stage 1.1 (TID 13502, zw03-data-hdp-dn-cpu0244.mt, executor 9): java.net.ConnectException: Connection refused (Connection refused) at java.net.PlainSocketImpl.socketConnect(Native Method) at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350) at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206) at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188) at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) at java.net.Socket.connect(Socket.java:589) at java.net.Socket.connect(Socket.java:538) at java.net.Socket.(Socket.java:434) at java.net.Socket.(Socket.java:211) at com.microsoft.ml.spark.lightgbm.TrainUtils$.getNetworkInitNodes(TrainUtils.scala:324) at com.microsoft.ml.spark.lightgbm.TrainUtils$$anonfun$15.apply(TrainUtils.scala:398) at com.microsoft.ml.spark.lightgbm.TrainUtils$$anonfun$15.apply(TrainUtils.scala:393) at com.microsoft.ml.spark.core.env.StreamUtilities$.using(StreamUtilities.scala:28) at com.microsoft.ml.spark.lightgbm.TrainUtils$.trainLightGBM(TrainUtils.scala:392) at com.microsoft.ml.spark.lightgbm.LightGBMBase$$anonfun$6.apply(LightGBMBase.scala:85) at com.microsoft.ml.spark.lightgbm.LightGBMBase$$anonfun$6.apply(LightGBMBase.scala:85) at org.apache.spark.sql.execution.MapPartitionsExec$$anonfun$6.apply(objects.scala:196) at org.apache.spark.sql.execution.MapPartitionsExec$$anonfun$6.apply(objects.scala:193) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:834) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:834) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:43) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:43) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:89) at org.apache.spark.scheduler.Task.run(Task.scala:110) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:363) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1576) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1564) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1563) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1563) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:822) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:822) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:822) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1794) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1746) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1735) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:634) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2060) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2157) at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1033) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) at org.apache.spark.rdd.RDD.reduce(RDD.scala:1015) at org.apache.spark.sql.Dataset.reduce(Dataset.scala:1460) at com.microsoft.ml.spark.lightgbm.LightGBMBase$class.innerTrain(LightGBMBase.scala:90) at com.microsoft.ml.spark.lightgbm.LightGBMRanker.innerTrain(LightGBMRanker.scala:25) at com.microsoft.ml.spark.lightgbm.LightGBMBase$class.train(LightGBMBase.scala:38) at com.microsoft.ml.spark.lightgbm.LightGBMRanker.train(LightGBMRanker.scala:25) at com.microsoft.ml.spark.lightgbm.LightGBMRanker.train(LightGBMRanker.scala:25) at org.apache.spark.ml.Predictor.fit(Predictor.scala:118) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:280) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:214) at java.lang.Thread.run(Thread.java:745) Caused by: java.net.ConnectException: Connection refused (Connection refused) at java.net.PlainSocketImpl.socketConnect(Native Method) at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350) at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206) at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188) at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) at java.net.Socket.connect(Socket.java:589) at java.net.Socket.connect(Socket.java:538) at java.net.Socket.(Socket.java:434) at java.net.Socket.(Socket.java:211) at com.microsoft.ml.spark.lightgbm.TrainUtils$.getNetworkInitNodes(TrainUtils.scala:324) at com.microsoft.ml.spark.lightgbm.TrainUtils$$anonfun$15.apply(TrainUtils.scala:398) at com.microsoft.ml.spark.lightgbm.TrainUtils$$anonfun$15.apply(TrainUtils.scala:393) at com.microsoft.ml.spark.core.env.StreamUtilities$.using(StreamUtilities.scala:28) at com.microsoft.ml.spark.lightgbm.TrainUtils$.trainLightGBM(TrainUtils.scala:392) at com.microsoft.ml.spark.lightgbm.LightGBMBase$$anonfun$6.apply(LightGBMBase.scala:85) at com.microsoft.ml.spark.lightgbm.LightGBMBase$$anonfun$6.apply(LightGBMBase.scala:85) at org.apache.spark.sql.execution.MapPartitionsExec$$anonfun$6.apply(objects.scala:196) at org.apache.spark.sql.execution.MapPartitionsExec$$anonfun$6.apply(objects.scala:193) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:834) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:834) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:43) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:43) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:89) at org.apache.spark.scheduler.Task.run(Task.scala:110) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:363) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ... 1 more

Errors from executors like this below. image

Info (please complete the following information):

imatiach-msft commented 4 years ago

@ce39906 based on the error: image

I think this is an out of memory error: https://stackoverflow.com/questions/28901123/why-do-spark-jobs-fail-with-org-apache-spark-shuffle-metadatafetchfailedexceptio

I'm not sure if LightGBM is running out of memory or if it is a cluster configuration issue, but I will guess/assume that it is the former.

Could you try running on the latest version? I believe there were some memory optimizations. I will need to look into more ways to reduce memory. Are you perchance running LightGBM several times over?

imatiach-msft commented 4 years ago

Here is a link to the latest version, if it helps:

Maven Coordinates
com.microsoft.ml.spark:mmlspark_2.11:1.0.0-rc1-96-fce3c952-SNAPSHOT
Maven Resolver
https://mmlspark.azureedge.net/maven
ce39906 commented 4 years ago

Here is a link to the latest version, if it helps:

Maven Coordinates
com.microsoft.ml.spark:mmlspark_2.11:1.0.0-rc1-96-fce3c952-SNAPSHOT
Maven Resolver
https://mmlspark.azureedge.net/maven

Our cluster can only connect the inner network, as this https://github.com/Azure/mmlspark/issues/818 , could you post a link for me to download the jar?

imatiach-msft commented 4 years ago

@ce39906 sure, I think this is it: https://mmlspark.azureedge.net/maven/com/microsoft/ml/spark/mmlspark_2.11/1.0.0-rc1-96-fce3c952-SNAPSHOT/mmlspark_2.11-1.0.0-rc1-96-fce3c952-SNAPSHOT.jar

And you will need the newer version of lightgbm: https://repo.maven.apache.org/maven2/com/microsoft/ml/lightgbm/lightgbmlib/2.3.180/lightgbmlib-2.3.180.jar

imatiach-msft commented 4 years ago

@ce39906 If it is still failing, can you give me more information on the dataset size? What is the dimensionality of the dataset - the number of rows and columns? Is the dataset dense or sparse? This might help diagnose the out of memory issue and help me figure out if there is some bug in the scala layer.

ce39906 commented 4 years ago

@ce39906 If it is still failing, can you give me more information on the dataset size? What is the dimensionality of the dataset - the number of rows and columns? Is the dataset dense or sparse? This might help diagnose the out of memory issue and help me figure out if there is some bug in the scala layer.

The dataset is about 160GB with 101 columns and 112441295 rows. The dataset is dense. My spark conf is like this below.

/opt/meituan/spark-2.2/bin/spark-submit --deploy-mode cluster --queue root.zw03_training.hadoop-map.training --executor-cores 40 --num-executors 8 --master yarn --driver-memory 8G --files /opt/meituan/spark-2.2/conf/hive-site.xml --executor-memory 16G --files /opt/tmp/etl/remote_file/session_D5E23EBC14BCEA4F_pysparkjar_00877de2cbfbf624dca5ac527f415c9e/city_province_list --repositories http://pixel.sankuai.com/repository/group-releases,http://pixel.sankuai.com/repository/mtdp --conf spark.yarn.maxAppAttempts=1 --conf spark.task.cpus=40 --conf spark.sql.autoBroadcastJoinThreshold=-1 --conf spark.kryoserializer.buffer.max=1024m --conf spark.driver.maxResultSize=10G --conf spark.executor.instances=8 --conf spark.hadoop.parquet.enable.summary-metadata=false --conf spark.executor.heartbeatInterval=30s --conf spark.default.parallelism=1024 --conf spark.sql.hive.metastorePartitionPruning=true --conf spark.yarn.driver.memoryOverhead=8096 --conf spark.sql.orc.filterPushdown=true --conf spark.sql.parquet.filterPushdown=true --conf spark.sql.shuffle.partitions=1024 --conf spark.sql.orc.splits.include.file.footer=true --conf spark.jars.packages=com.microsoft.ml.spark:mmlspark_2.11:0.18.1 --conf spark.sql.orc.cache.stripe.details.size=10000 --conf spark.sql.parquet.mergeSchema=false --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.yarn.executor.memoryOverhead=60G --conf spark.yarn.am.extraJavaOptions="-DappIdentify=hope_3375504 -Dport=AppMaster " --conf spark.driver.extraJavaOptions="-DappIdentify=hope_3375504 -Dport=Driver -XX:PermSize=128M -XX:MaxPermSize=256M " --conf spark.executor.extraJavaOptions="-DappIdentify=hope_3375504 -Dport=Executor " --name huobaochong:/opt/meituan/20200616/topk_train_v2/shanghai/topk_train/topk_train.hope --conf spark.job.owner=huobaochong --conf spark.client.host=zw02-data-msp-launcher13.mt --conf spark.job.type=mtmsp --conf spark.flowid=D5E23EBC14BCEA4F --conf spark.yarn.app.tags.flowid=D5E23EBC14BCEA4F --conf spark.yarn.app.tags.schedulejobid=cantor-6177712 --conf spark.yarn.app.tags.scheduleinstanceid= --conf spark.yarn.app.tags.scheduleplanid= --conf spark.yarn.app.tags.onceexecid=once-exec-6163959 --conf spark.yarn.app.tags.rm.taskcode=hope:huobaochong:/opt/meituan/20200616/topk_train_v2/shanghai/topk_train/topk_train.hope --conf spark.yarn.app.tags.rm.taskname=huobaochong:/opt/meituan/20200616/topk_train_v2/shanghai/topk_train/topk_train.hope --conf spark.yarn.app.tags.rm.tasktype=hope --conf spark.yarn.app.tags.mtmspCompileVersion=0 --conf spark.yarn.job.priority=1 --conf spark.hive.mt.metastore.audit.id=SPARK-MTMSP-D5E23EBC14BCEA4F --conf spark.hadoop.hive.mt.metastore.audit.id=SPARK-MTMSP-D5E23EBC14BCEA4F --conf spark.hbo.enabled=true --conf spark.executor.cantorEtlIncreaseMemory.enabled=true /opt/tmp/etl/remote_file/session_D5E23EBC14BCEA4F_pysparkjar_00877de2cbfbf624dca5ac527f415c9e/topk_train.py 20200615190316-v0.0.3_china-20200505-20200520-common-staging shangha

should I increase executor's memory or executor.memoryOverhead ?

ce39906 commented 4 years ago

Another question, must I repartition the data frame to have the same partitions with the number of executors before training?

imatiach-msft commented 4 years ago

@ce39906 Great question! That is a large dataset for the given cluster size, assuming lower bound it is at least: 112,441,295 rows 101 columns 8 bytes per double value= ~90.8 GB in memory. On top of that spark needs to use memory for scheduling. For LightGBM, we need to load all data in memory for training; the data is already repartitioned prior to training. The DataFrame in scala is converted to native representation, and there is around 2X original dataset size overhead for this, so total size in memory should be about 3X. I think it is possible to make this 2X in the future, so we will only need to hold this data in native memory and original DataFrame. Besides that, we also need to configure the cluster well, otherwise it can effectively have a lot less memory than what the cluster can actually provide. Increasing execution memory will definitely help, if there is unused memory. In yarn, I always pay a lot of attention to the number of cores, the amount of memory per machine and the number of machines, and based on that I try to make a good estimate for the number of executors, especially number of executors per machine. If you have one executor per machine, and it has just 4GB of memory when the machine has 64 GB available, then obviously you would be using just 6% of the cluster's available memory, so you have to be very careful with the configuration.

imatiach-msft commented 4 years ago

These parameters seem very important:

-executor-cores 40 --num-executors 8 --master yarn --driver-memory 8G --files /opt/meituan/spark-2.2/conf/hive-site.xml --executor-memory 16G --

I also see you have a very high spark.yarn.executor.memoryOverhead=60G This seems a bit high to me, based on:

https://spark.apache.org/docs/latest/configuration.html The default is:

executorMemory * 0.10, with minimum of 384 

image

imatiach-msft commented 4 years ago

Can you send me more info on your cluster configuration? How many machines do you have, how much RAM does each machine have and how many cores are there?

ce39906 commented 4 years ago

@ce39906 sure, I think this is it: https://mmlspark.azureedge.net/maven/com/microsoft/ml/spark/mmlspark_2.11/1.0.0-rc1-96-fce3c952-SNAPSHOT/mmlspark_2.11-1.0.0-rc1-96-fce3c952-SNAPSHOT.jar

And you will need the newer version of lightgbm: https://repo.maven.apache.org/maven2/com/microsoft/ml/lightgbm/lightgbmlib/2.3.180/lightgbmlib-2.3.180.jar

hi, @imatiach-msft , I tried your new version jar, and it uses less memory. I config executor memory to 35g, executro.memoryOverHead to 20g, executor-num 8 , for the first time the job is successful, I tried a second time, the job failed for the memory issue. I think memory config is not appropriate for now, and I will try more config.

ce39906 commented 4 years ago

Can you send me more info on your cluster configuration? How many machines do you have, how much RAM does each machine have and how many cores are there?

The training queue in our company is managed by another team, the training queue is confined to 1048cpu and about 4000g memory. As running the jobs, I saw two kinds of executor machine, 40cpu with 128g memory and 64cpu with 256memory. Another question, should I try large executor memory and little memoryOverHead? the native part is running in off-heap memory or in-heap memory? and what's the relationship with off-heap memory and executor.memoryOverHead?

ce39906 commented 4 years ago

image image

hi, @imatiach-msft , another confusing question, I configured with 4 executors, why there is only one task in the training stage?

imatiach-msft commented 4 years ago

@ce39906 great questions!

Another question, should I try large executor memory and little memoryOverHead?

I may be wrong about my interpretation of those parameters (and I have contributed to apache spark but only to SparkML), but my understanding is that memoryOverhead is not used for the pipeline so you actually want to keep it as small as possible, eg on spark page it states:

https://spark.apache.org/docs/latest/configuration.html

This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. This tends to grow with the executor size (typically 6-10%). This option is currently supported on YARN and Kubernetes. 

My understanding is that you really want to keep spark.executor.memory as high as possible on the machines.

the native part is running in off-heap memory or in-heap memory? and what's the relationship with off-heap memory and executor.memoryOverHead?

The native part is allocated mostly in the native heap, it's definitely not on the stack. Hmm, I'm not sure about the relationship with off-heap memory and executor.memoryOverHead. In practice, I have always tried to maximize executor memory and minimize memoryOverhead, but different types of clusters (yarn, mesos, spark standalone) may use these variables differently. The doc even mentions:

This option is currently supported on YARN and Kubernetes. 
ce39906 commented 4 years ago

@ce39906 great questions!

Another question, should I try large executor memory and little memoryOverHead?

I may be wrong about my interpretation of those parameters (and I have contributed to apache spark but only to SparkML), but my understanding is that memoryOverhead is not used for the pipeline so you actually want to keep it as small as possible, eg on spark page it states:

https://spark.apache.org/docs/latest/configuration.html

This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. This tends to grow with the executor size (typically 6-10%). This option is currently supported on YARN and Kubernetes. 

My understanding is that you really want to keep spark.executor.memory as high as possible on the machines.

the native part is running in off-heap memory or in-heap memory? and what's the relationship with off-heap memory and executor.memoryOverHead?

The native part is allocated mostly in the native heap, it's definitely not on the stack. Hmm, I'm not sure about the relationship with off-heap memory and executor.memoryOverHead. In practice, I have always tried to maximize executor memory and minimize memoryOverhead, but different types of clusters (yarn, mesos, spark standalone) may use these variables differently. The doc even mentions:

This option is currently supported on YARN and Kubernetes. 

@imatiach-msft , Thanks for your reply, that's a great help to me.

ce39906 commented 4 years ago

image image

hi, @imatiach-msft , another confusing question, I configured with 4 executors, why there is only one task in the training stage?

Hi, @imatiach-msft , I found this issue https://github.com/Azure/mmlspark/issues/747, I think this issue is similar to mine. I think this issue happens when requesting large memory machines which may cost more time. So when running the method below

val numExecutorCores = ClusterUtil.getNumExecutorCores(dataset, numCoresPerExec, log) 

the requested machines are pending and the value numExecutorCores is set with 1. Like the method mentioned is https://github.com/Azure/mmlspark/issues/747, I want to do control the number of reasonable num workers based on your latest version code (com.microsoft.ml.spark:mmlspark_2.11:1.0.0-rc1-96-fce3c952-SNAPSHOT ) which made a lot of memory optimization. Where can I get the source code of the latest version? That's will be very grateful.

imatiach-msft commented 4 years ago

hi @ce39906 , I know there was a recent PR from @ocworld related to getNumExecutorCores, not sure if it would impact your setup though: https://github.com/Azure/mmlspark/pull/855 It does seem like you should have more tasks, perhaps getNumExecutorCores is not returning the correct value in your case. I do agree it would be good to add a parameter to control the number of tasks.

"Where can I get the source code of the latest version?"

The source code is on github master branch, the latest package should be the same as posted above:

Maven Coordinates
com.microsoft.ml.spark:mmlspark_2.11:1.0.0-rc1-96-fce3c952-SNAPSHOT

Maven Resolver
https://mmlspark.azureedge.net/maven

Hmm, let me see if I can implement something for you quickly to control the number of tasks, as it does seem like a useful parameter.

imatiach-msft commented 4 years ago

@ce39906 I've created a PR here to allow users to override the number of tasks as you and #747 suggested:

https://github.com/Azure/mmlspark/pull/881 Will update maven coordinates here once I get a green build.

imatiach-msft commented 4 years ago

@ce39906 new build finished for the PR:

Maven Coordinates

com.microsoft.ml.spark:mmlspark_2.11:1.0.0-rc1-91-a13271d7-SNAPSHOT
Maven Resolver

https://mmlspark.azureedge.net/maven
ocworld commented 4 years ago

@ce39906 Can you show me your driver log about ClusterUtils to check getNumExecutorCores.

ce39906 commented 4 years ago

@ce39906 Can you show me your driver log about ClusterUtils to check getNumExecutorCores.

Hi, @ocworld , this is the evidence. image

ocworld commented 4 years ago

@ce39906 Thank you for sharing your logs.

I found the reason from your configuration. spark.task.cpus=40 is set.

"spark.task.cpus" means "Number of cores to allocate for each task." (https://spark.apache.org/docs/2.4.5/configuration.html)

The number of workers per executor is equals to the number of tasks per executor. So, if you want to create 40 workers per executor, spark.task.cpus=1 and spark.executor.cores=40 should be set.

if reproducing it when setting spark.task.cpus=1, please, let me know about it.

ocworld commented 4 years ago

In spark, the number of tasks per executor = spark.executor.cores / spark.task.cpus

ocworld commented 4 years ago

In addition, about the number of executors, "Could not retrieve executors from blockmanager" and "Using default case = 1 executors" are shown in the logs. @imatiach-msft I think that getExecutors() cannot retrieve executors in the spark session in this dataset.

ce39906 commented 4 years ago

@ce39906 new build finished for the PR:

Maven Coordinates

com.microsoft.ml.spark:mmlspark_2.11:1.0.0-rc1-91-a13271d7-SNAPSHOT
Maven Resolver

https://mmlspark.azureedge.net/maven

Hi, @imatiach-msft , what 's the version of the corresponding LightGBM? I'm using com.microsoft.ml.lightgbm:lightgbmlib:2.3.180 and got an core dump like this below: image

ce39906 commented 4 years ago

In spark, the number of tasks per executor = spark.executor.cores / spark.task.cpus

I want to conf with one task on an executor with 40 CPUs

ocworld commented 4 years ago

In spark, the number of tasks per executor = spark.executor.cores / spark.task.cpus

I want to conf with one task on an executor with 40 CPUs

@ce39906 Ok, "ClusterUtils calculated...." is right in the last red bock. In my undestanding, numCores in ClusterUtils means num of tasks, while NumExecutorCores is executor cores.

Anyway, it is strange to me your second red block. I assume that they are printed in spark.

ce39906 commented 4 years ago

In spark, the number of tasks per executor = spark.executor.cores / spark.task.cpus

I want to conf with one task on an executor with 40 CPUs

@ce39906 Ok, "ClusterUtils calculated...." is right in the last red bock. In my undestanding, numCores in ClusterUtils means num of tasks, while NumExecutorCores is executor cores.

Anyway, it is strange to me your second red block. I assume that they are printed in spark.

Yes, the log in the second red block is printed in spark.

imatiach-msft commented 4 years ago

@ce39906 "what 's the version of the corresponding LightGBM? I'm using com.microsoft.ml.lightgbm:lightgbmlib:2.3.180 "

Yes, that should be the new version of LightGBM. I'm not sure what is causing the error though. Is there any way I could try to reproduce this issue? What is the cluster than you are using, is the dataset private and is there some way I could get the code to reproduce this?

imatiach-msft commented 4 years ago

@ce39906 I took a closer look at the logs and @ocworld is exactly right, the spark.task.cpus is set to 40, so you are using 40 cpus in a single task - which explains why you are only seeing one task (with 40 CPUs in it). Maybe LightGBM is not able to use as much parallelism in this scenario, I will need to take a closer look at the native code, but I would recommend trying spark.task.cpus=1, which would create 40 tasks instead.

imatiach-msft commented 4 years ago

@ce39906 for reference, please see logic here: https://github.com/Azure/mmlspark/blob/master/src/main/scala/com/microsoft/ml/spark/core/utils/ClusterUtil.scala#L25 Also see relevant config docs: https://spark.apache.org/docs/latest/configuration.html image and relevant stackoverflow thread: https://stackoverflow.com/questions/37545069/what-is-the-difference-between-spark-task-cpus-and-executor-cores

ce39906 commented 4 years ago

@ce39906 I took a closer look at the logs and @ocworld is exactly right, the spark.task.cpus is set to 40, so you are using 40 cpus in a single task - which explains why you are only seeing one task (with 40 CPUs in it). Maybe LightGBM is not able to use as much parallelism in this scenario, I will need to take a closer look at the native code, but I would recommend trying spark.task.cpus=1, which would create 40 tasks instead.

Ok, I will try it.

ce39906 commented 4 years ago

hy you are only seeing one task (with 40 CPUs in it the spark.task.cpus is set to 40, so you are using 40 cpus in a single task - which explains why you are only seeing one task (with 40 CPUs in it) I don't think only seeing one task is caused by this. Only seeing one task is caused by

val numExecutorCores = ClusterUtil.getNumExecutorCores(dataset, numCoresPerExec, log)

this method returns 1 when the requested machines are pending.

imatiach-msft commented 4 years ago

@ce39906 it still returns one? Can you post the logs? It seems somehow the setting change "spark.task.cpus=1" is not getting registered perhaps?

imatiach-msft commented 4 years ago

@ce39906 can you post your spark conf like above again?

My spark conf is like this below.
...
imatiach-msft commented 4 years ago

@ce39906 also, did you try the numTasks parameter I added in the new PR I sent you?

https://github.com/Azure/mmlspark/pull/881

Did that change the number of tasks?

ce39906 commented 4 years ago

@ce39906 also, did you try the numTasks parameter I added in the new PR I sent you?

881

Did that change the number of tasks?

@imatiach-msft , sorry about replying late, these days are the Dragon Boat Festival holidays. here is my mainly spark conf

master = yarn-cluster
driver-memory = 8G
driver-cores = 4
executor-memory = 100G
executor-cores = 40
is_dynamic_allocation = false
num-executors = 4

[option_env_args]
spark.executor.instances = 4
spark.task.cpus = 1
spark.jars.packages = com.microsoft.ml.lightgbm:lightgbmlib:2.3.180,com.microsoft.ml.spark:mmlspark_2.11:1.0.0-rc1-91-a13271d7-SNAPSHOT

spark.yarn.maxAppAttempts = 1
spark.driver.maxResultSize = 10G
spark.yarn.driver.memoryOverhead = 4096
spark.yarn.executor.memoryOverhead = 120G
spark.sql.orc.filterPushdown = true
spark.sql.orc.splits.include.file.footer = true
spark.sql.orc.cache.stripe.details.size = 10000
spark.sql.hive.metastorePartitionPruning = true

spark.hadoop.parquet.enable.summary-metadata = false
spark.sql.parquet.mergeSchema = false
spark.sql.parquet.filterPushdown = true
spark.sql.hive.metastorePartitionPruning = true

spark.sql.autoBroadcastJoinThreshold = -1

spark.sql.shuffle.partitions = 2048
spark.default.parallelism = 2048
spark.serializer = org.apache.spark.serializer.KryoSerializer
spark.kryoserializer.buffer.max = 1024m
spark.executor.heartbeatInterval = 30s
spark.network.timeout = 800s
spark.executor.extraJavaOptions="-XX:+UseG1GC -XX:-UseGCOverheadLimit"

did you try the numTasks parameter I added in the new PR I sent you?
Yes, I set numTask parameter to 40(cores of each executor) * 4 (number of executors) = 160, and the training stage had 160 tasks. The job with these parameters succeeds. I'm trying other memory configs about executor.memory and executor.memory.overhead