rapidsai / spark-examples

[ARCHIVED] Moved to github.com/NVIDIA/spark-xgboost-examples
https://github.com/NVIDIA/spark-xgboost-examples
Apache License 2.0
70 stars 40 forks source link

Multi-GPU run of standalone-python example fails #77

Closed srinivas-varadharajan closed 4 years ago

srinivas-varadharajan commented 4 years ago

Hey Guys,

I'm trying to run the standalone-python example, "Get Started with XGBoost4J-Spark on an Apache Spark Standalone Cluster". I've 2 T4s in the host. I'm setting the master and workers in the same node/host. It runs fine and prints accuracy if I set one worker/executor. The issue is when I try to run on multiple workers. I've also set EXCLUSIVE_PROCESS as mentioned in the document. SPARK_WORKER_INSTANCES=2 is set in spark-env.sh. What am I missing?

More details:

Command line:

export SPARK_MASTER=spark://hostname -f:7077 export SPARK_CORES_PER_WORKER=2 export DATA_PATH=/home/test/workspace/srini/spark_on_gpu/xgboost4j_spark/data export LIBS_PATH=/home/test/workspace/srini/spark_on_gpu/xgboost4j_spark/libs export SPARK_CORES_PER_EXECUTOR=${SPARK_CORES_PER_WORKER} export SPARK_NUM_EXECUTORS=2 export TOTAL_CORES=$((SPARK_CORES_PER_EXECUTOR * SPARK_NUM_EXECUTORS)) export SPARK_DRIVER_MEMORY=4g export SPARK_EXECUTOR_MEMORY=8g export SPARK_PYTHON_ENTRYPOINT=${LIBS_PATH}/main.py export EXAMPLE_CLASS=ai.rapids.spark.examples.mortgage.gpu_main

export SPARK_JARS=${LIBS_PATH}/cudf-0.9.2-cuda10-1.jar,${LIBS_PATH}/xgboost4j_2.x-1.0.0-Beta3.jar,${LIBS_PATH}/xgboost4j-spark_2.x-1.0.0-Beta3.jar

export SPARK_PY_FILES=${LIBS_PATH}/xgboost4j-spark_2.x-1.0.0-Beta3.jar,${LIBS_PATH}/samples.zip

export TREE_METHOD=gpu_hist

spark-submit \ --master ${SPARK_MASTER} \ --driver-memory ${SPARK_DRIVER_MEMORY} \ --executor-memory ${SPARK_EXECUTOR_MEMORY} \ --conf spark.cores.max=${TOTAL_CORES} \ --jars ${SPARK_JARS} \ --py-files ${SPARK_PY_FILES} \ ${SPARK_PYTHON_ENTRYPOINT} \ --mainClass=${EXAMPLE_CLASS} \ --trainDataPath=${DATA_PATH}/mortgage/csv/train/mortgage_train_merged.csv \ --evalDataPath=${DATA_PATH}/mortgage/csv/test/mortgage_eval_merged.csv \ --format=csv \ --numWorkers=${SPARK_NUM_EXECUTORS} \ --treeMethod=${TREE_METHOD} \ --numRound=100 \ --maxDepth=8

Error from stdout

2020-03-31 10:59:16 INFO DAGScheduler:54 - ResultStage 0 (foreachPartition at XGBoost.scala:639) failed in 178.554 s due to Stage cancelled because SparkContext was shut down 2020-03-31 10:59:16 ERROR RabitTracker:91 - Uncaught exception thrown by worker: org.apache.spark.SparkException: Job 0 cancelled because SparkContext was shut down at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:837) at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:835) at scala.collection.mutable.HashSet.foreach(HashSet.scala:78) at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:835) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:1900) at org.apache.spark.util.EventLoop.stop(EventLoop.scala:83) at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1813) at org.apache.spark.SparkContext$$anonfun$stop$8.apply$mcV$sp(SparkContext.scala:1931) at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1361) at org.apache.spark.SparkContext.stop(SparkContext.scala:1930) at org.apache.spark.TaskFailedListener$$anon$1$$anonfun$run$1.apply$mcV$sp(SparkParallelismTracker.scala:144) at org.apache.spark.TaskFailedListener$$anon$1$$anonfun$run$1.apply(SparkParallelismTracker.scala:144) at org.apache.spark.TaskFailedListener$$anon$1$$anonfun$run$1.apply(SparkParallelismTracker.scala:144) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) at org.apache.spark.TaskFailedListener$$anon$1.run(SparkParallelismTracker.scala:143) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2034) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2055) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2074) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:935) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:933) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:363) at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:933) at ml.dmlc.xgboost4j.scala.spark.XGBoost$$anonfun$trainDistributedForGpuDataset$1$$anon$1.run(XGBoost.scala:639) 2020-03-31 10:59:16 INFO StandaloneSchedulerBackend:54 - Shutting down all executors 2020-03-31 10:59:16 INFO StandaloneAppClient$ClientEndpoint:54 - Executor added: app-20200331105616-0000/2 on worker-20200331105533-x.x.x.x-46189 (x.x.x.x:46189) with 1 core(s) 2020-03-31 10:59:16 INFO CoarseGrainedSchedulerBackend$DriverEndpoint:54 - Asking each executor to shut down 2020-03-31 10:59:16 INFO StandaloneSchedulerBackend:54 - Granted executor ID app-20200331105616-0000/2 on hostPort x.x.x.x:46189 with 1 core(s), 8.0 GB RAM 2020-03-31 10:59:16 INFO StandaloneAppClient$ClientEndpoint:54 - Executor added: app-20200331105616-0000/3 on worker-20200331105536-x.x.x.x-37651 (x.x.x.x:37651) with 1 core(s) 2020-03-31 10:59:16 INFO StandaloneSchedulerBackend:54 - Granted executor ID app-20200331105616-0000/3 on hostPort x.x.x.x:37651 with 1 core(s), 8.0 GB RAM 2020-03-31 10:59:16 ERROR TransportRequestHandler:209 - Error while invoking RpcHandler#receive() for one-way message. org.apache.spark.SparkException: Could not find CoarseGrainedScheduler. at org.apache.spark.rpc.netty.Dispatcher.postMessage(Dispatcher.scala:160) at org.apache.spark.rpc.netty.Dispatcher.postOneWayMessage(Dispatcher.scala:140) at org.apache.spark.rpc.netty.NettyRpcHandler.receive(NettyRpcEnv.scala:655) at org.apache.spark.network.server.TransportRequestHandler.processOneWayMessage(TransportRequestHandler.java:207) at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:113) at org.apache.spark.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:118) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:85) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1359) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:935) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:138) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459) at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858) at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138) at java.lang.Thread.run(Thread.java:748) 2020-03-31 10:59:16 INFO MapOutputTrackerMasterEndpoint:54 - MapOutputTrackerMasterEndpoint stopped! 2020-03-31 10:59:16 ERROR TransportRequestHandler:209 - Error while invoking RpcHandler#receive() for one-way message. org.apache.spark.SparkException: Could not find CoarseGrainedScheduler. at org.apache.spark.rpc.netty.Dispatcher.postMessage(Dispatcher.scala:160) at org.apache.spark.rpc.netty.Dispatcher.postOneWayMessage(Dispatcher.scala:140) at org.apache.spark.rpc.netty.NettyRpcHandler.receive(NettyRpcEnv.scala:655) at org.apache.spark.network.server.TransportRequestHandler.processOneWayMessage(TransportRequestHandler.java:207) at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:113) at org.apache.spark.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:118) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:85) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1359) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:935) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:138) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459) at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858) at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138) at java.lang.Thread.run(Thread.java:748) 2020-03-31 10:59:16 INFO MemoryStore:54 - MemoryStore cleared 2020-03-31 10:59:16 INFO BlockManager:54 - BlockManager stopped 2020-03-31 10:59:16 INFO BlockManagerMaster:54 - BlockManagerMaster stopped 2020-03-31 10:59:16 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint:54 - OutputCommitCoordinator stopped! 2020-03-31 10:59:16 INFO SparkContext:54 - Successfully stopped SparkContext 2020-03-31 10:59:21 INFO RabitTracker$TrackerProcessLogger:61 - Tracker Process ends with exit code 143 2020-03-31 10:59:21 INFO RabitTracker:196 - Tracker Process ends with exit code 143 2020-03-31 10:59:21 INFO XGBoostSpark:645 - GpuDataset Rabit returns with exit code 143 Traceback (most recent call last): File "/home/test/workspace/srini/spark_on_gpu/xgboost4j_spark/libs/main.py", line 18, in main() File "/home/test/workspace/srini/spark_on_gpu/xgboost4j_spark/libs/ai/rapids/spark/examples/main.py", line 21, in main getattr(import_module(args.mainClass), 'main')(args, xgboost_args) File "/home/test/workspace/srini/spark_on_gpu/xgboost4j_spark/libs/ai/rapids/spark/examples/mortgage/gpu_main.py", line 48, in main model = with_benchmark('Training', lambda: classifier.fit(train_data)) File "/home/test/workspace/srini/spark_on_gpu/xgboost4j_spark/libs/ai/rapids/spark/examples/utility/utils.py", line 42, in with_benchmark result = action() File "/home/test/workspace/srini/spark_on_gpu/xgboost4j_spark/libs/ai/rapids/spark/examples/mortgage/gpu_main.py", line 48, in model = with_benchmark('Training', lambda: classifier.fit(train_data)) File "/home/test/workspace/srini/spark_on_gpu/xgboost4j_spark/libs/xgboost4j-spark_2.x-1.0.0-Beta3.jar/sparkxgb/common.py", line 87, in fit File "/opt/spark/python/lib/pyspark.zip/pyspark/ml/base.py", line 132, in fit File "/opt/spark/python/lib/pyspark.zip/pyspark/ml/wrapper.py", line 288, in _fit File "/opt/spark/python/lib/pyspark.zip/pyspark/ml/wrapper.py", line 285, in _fit_java File "/opt/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in call File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco File "/opt/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o32.fit. : ml.dmlc.xgboost4j.java.XGBoostError: XGBoostModel training failed at ml.dmlc.xgboost4j.scala.spark.XGBoost$.ml$dmlc$xgboost4j$scala$spark$XGBoost$$postTrackerReturnProcessing(XGBoost.scala:839) at ml.dmlc.xgboost4j.scala.spark.XGBoost$$anonfun$trainDistributedForGpuDataset$1.apply(XGBoost.scala:646) at ml.dmlc.xgboost4j.scala.spark.XGBoost$$anonfun$trainDistributedForGpuDataset$1.apply(XGBoost.scala:628) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.immutable.List.foreach(List.scala:381) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.immutable.List.map(List.scala:285) at ml.dmlc.xgboost4j.scala.spark.XGBoost$.trainDistributedForGpuDataset(XGBoost.scala:627) at ml.dmlc.xgboost4j.scala.spark.XGBoostClassifier.fit(XGBoostClassifier.scala:223) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748)

2020-03-31 10:59:21 INFO ShutdownHookManager:54 - Shutdown hook called 2020-03-31 10:59:21 INFO ShutdownHookManager:54 - Deleting directory /tmp/spark-bec2789c-0a65-42d4-9a72-27c516a5d535 2020-03-31 10:59:21 INFO ShutdownHookManager:54 - Deleting directory /tmp/spark-2b31e0b6-9e54-4ec2-88cb-19afec34757c 2020-03-31 10:59:21 INFO ShutdownHookManager:54 - Deleting directory /tmp/spark-2b31e0b6-9e54-4ec2-88cb-19afec34757c/pyspark-5016f151-81a4-4c31-9392-df960b4d51cc

firestarman commented 4 years ago

Hi, For you case, pls make sure two executors were actually launched, and each ran only one task inside.

Try to add the following configs to your command, --conf spark.executor.cores=2 --conf spark.task.cpus=2 \

Besides make sure the total memory size used is less than the size of avaialable memory on your host. I mean SPARK_NUM_EXECUTORS * SPARK_EXECUTOR_MEMORY + SPARK_DRIVER_MEMORY < size of available memory on your host.