dmlc / xgboost

Scalable, Portable and Distributed Gradient Boosting (GBDT, GBRT or GBM) Library, for Python, R, Java, Scala, C++ and more. Runs on single machine, Hadoop, Spark, Dask, Flink and DataFlow
https://xgboost.readthedocs.io/en/stable/
Apache License 2.0
26.11k stars 8.7k forks source link

XGBoost failing with Rabit error on AWS EMR Serverless #10647

Open lsli8888 opened 1 month ago

lsli8888 commented 1 month ago

Environment

AWS EMR Serverless 7.0.0 PySpark 3.5.0 XGBoost 2.0.3

I’m using XBoost for regression, specifically the SparkXGBRegressor. I’m able to use it without issues on my local machine. However, I get the following Rabit-related error when executing it on AWS EMR Serverless. Any ideas? Is it related to this?:

    2024-05-28 21:54:33.922 [INFO] XGBoost-PySpark: Running xgboost-2.0.3 on 1 workers with
    booster params: {‘objective’: ‘reg:squarederror’, ‘device’: ‘cpu’, ‘learning_rate’: 0.03, ‘max_depth’: 5, ‘random_state’: 0, ‘subsample’: 1.0, ‘eval_metric’: ‘rmse’, ‘nthread’: 8}
    train_call_kwargs_params: {‘early_stopping_rounds’: 50, ‘verbose_eval’: True, ‘num_boost_round’: 10}
    dmatrix_kwargs: {‘nthread’: 8, ‘missing’: 0.0}
    Traceback (most recent call last):
    File “/tmp/spark-86c93d08-da51-4c1d-9834-689be49aad15/train_model.py”, line 436, in
    main()
    File “/home/hadoop/environment/lib64/python3.9/site-packages/my_project/utils/logger_utils.py”, line 38, in wrapper
    return func(*args, **kwargs)
    File “/home/hadoop/environment/lib64/python3.9/site-packages/click/core.py”, line 1157, in call
    return self.main(*args, **kwargs)
    File “/home/hadoop/environment/lib64/python3.9/site-packages/click/core.py”, line 1078, in main
    rv = self.invoke(ctx)
    File “/home/hadoop/environment/lib64/python3.9/site-packages/click/core.py”, line 1434, in invoke
    return ctx.invoke(self.callback, **ctx.params)
    File “/home/hadoop/environment/lib64/python3.9/site-packages/click/core.py”, line 783, in invoke
    return __callback(*args, **kwargs)
    File “/tmp/spark-86c93d08-da51-4c1d-9834-689be49aad15/train_model.py”, line 326, in main
    train_model(
    File “/home/hadoop/environment/lib64/python3.9/site-packages/my_project/utils/logger_utils.py”, line 55, in timed
    result: Callable = method(*args, **kw)
    File “/home/hadoop/environment/lib64/python3.9/site-packages/my_project/utils/logger_utils.py”, line 38, in wrapper
    return func(*args, **kwargs)
    File “/tmp/spark-86c93d08-da51-4c1d-9834-689be49aad15/train_model.py”, line 410, in train_model
    model_training_strategy.train_model(
    File “/home/hadoop/environment/lib64/python3.9/site-packages/my_project/pyspark_utils/model_processor/xgboost_pipeline_model_training_strategy.py”, line 38, in train_model
    return model_training_pipeline.fit(my_project_dataframe)
    File “/usr/lib/spark/python/lib/pyspark.zip/pyspark/ml/base.py”, line 205, in fit
    File “/usr/lib/spark/python/lib/pyspark.zip/pyspark/ml/pipeline.py”, line 134, in _fit
    File “/usr/lib/spark/python/lib/pyspark.zip/pyspark/ml/base.py”, line 205, in fit
    File “/home/hadoop/environment/lib64/python3.9/site-packages/xgboost/spark/core.py”, line 1136, in _fit
    (config, booster) = _run_job()
    File “/home/hadoop/environment/lib64/python3.9/site-packages/xgboost/spark/core.py”, line 1122, in _run_job
    ret = rdd_with_resource.collect()[0]
    File “/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py”, line 1833, in collect
    File “/usr/lib/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py”, line 1322, in call
    File “/usr/lib/spark/python/lib/pyspark.zip/pyspark/errors/exceptions/captured.py”, line 179, in deco
    File “/usr/lib/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/protocol.py”, line 326, in get_return_value
    py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
    : org.apache.spark.SparkException: Job aborted due to stage failure: Could not recover from a failed barrier ResultStage. Most recent failure reason: Stage failed because barrier task ResultTask(314, 0) finished unsuccessfully.
    org.apache.spark.api.python.PythonException: Traceback (most recent call last):
    File “/home/hadoop/environment/lib64/python3.9/site-packages/xgboost/spark/core.py”, line 1067, in _train_booster
    _rabit_args = _get_rabit_args(context, num_workers)
    File “/home/hadoop/environment/lib64/python3.9/site-packages/xgboost/spark/utils.py”, line 77, in _get_rabit_args
    env = _start_tracker(context, n_workers)
    File “/home/hadoop/environment/lib64/python3.9/site-packages/xgboost/spark/utils.py”, line 66, in _start_tracker
    rabit_context = RabitTracker(host_ip=host, n_workers=n_workers)
    File “/home/hadoop/environment/lib64/python3.9/site-packages/xgboost/tracker.py”, line 208, in init
    sock.bind((host_ip, port))
    OSError: [Errno 99] Cannot assign requested address

at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:572)
at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:118)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.ContextAwareIterator.hasNext(ContextAwareIterator.scala:39)
at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.hasNext(SerDeUtil.scala:86)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:80)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:322)
at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:751)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:451)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1962)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:282)

at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:3067)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:3003)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:3002)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:3002)
at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:2326)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3265)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3205)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3194)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1041)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2406)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2427)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2446)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2471)
at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1046)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:407)
at org.apache.spark.rdd.RDD.collect(RDD.scala:1045)
at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:195)
at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:568)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.base/java.lang.Thread.run(Thread.java:840)

2024-05-28 21:54:39.439 [INFO] py4j.clientserver: Closing down clientserver connection
trivialfis commented 1 month ago

We obtain the host address from a spark barrier context https://github.com/dmlc/xgboost/blob/778751a1bbc9a6ed539b54a36676351f1d185e63/python-package/xgboost/spark/utils.py#L57 not sure why the address cannot be assigned.

wbo4958 commented 1 month ago

Hi @lsli8888, Could you be able try 2.1.0 which has the new rabit implementation.

lsli8888 commented 1 month ago

Hi @wbo4958 - unfortunately, I got the same error. I used EMR Serverless 7.1.0/XGBoost 2.1.1 this time.

2024-08-05 15:04:28.150 [INFO] XGBoost-PySpark: Running xgboost-2.1.1 on 1 workers with
    booster params: {'objective': 'reg:squarederror', 'device': 'cpu', 'eval_metric': 'rmse', 'learning_rate': 0.2, 'max_depth': 15, 'random_state': 0, 'subsample': 1.0, 'nthread': 2}
    train_call_kwargs_params: {'early_stopping_rounds': 50, 'verbose_eval': True, 'num_boost_round': 1000}
    dmatrix_kwargs: {'nthread': 2, 'missing': 0.0}
Traceback (most recent call last):
  File "/tmp/spark-de893419-2160-4632-8c50-ce608e2f38fe/train_model.py", line 471, in <module>
    main()
  File "/home/hadoop/environment/lib64/python3.9/site-packages/my_project_utils/utils/logger_utils.py", line 38, in wrapper
    return func(*args, **kwargs)
  File "/home/hadoop/environment/lib64/python3.9/site-packages/click/core.py", line 1157, in __call__
    return self.main(*args, **kwargs)
  File "/home/hadoop/environment/lib64/python3.9/site-packages/click/core.py", line 1078, in main
    rv = self.invoke(ctx)
  File "/home/hadoop/environment/lib64/python3.9/site-packages/click/core.py", line 1434, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/home/hadoop/environment/lib64/python3.9/site-packages/click/core.py", line 783, in invoke
    return __callback(*args, **kwargs)
  File "/tmp/spark-de893419-2160-4632-8c50-ce608e2f38fe/train_model.py", line 351, in main
    train_model(
  File "/home/hadoop/environment/lib64/python3.9/site-packages/my_project_utils/utils/logger_utils.py", line 55, in timed
    result: Callable = method(*args, **kw)
  File "/home/hadoop/environment/lib64/python3.9/site-packages/my_project_utils/utils/logger_utils.py", line 38, in wrapper
    return func(*args, **kwargs)
  File "/tmp/spark-de893419-2160-4632-8c50-ce608e2f38fe/train_model.py", line 441, in train_model
    model_training_strategy.train_model(
  File "/home/hadoop/environment/lib64/python3.9/site-packages/my_project/pyspark_utils/model_processor/xgboost_pipeline_model_training_strategy.py", line 40, in train_model
    return model_training_pipeline.fit(my_dataframe), my_dataframe
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/ml/base.py", line 205, in fit
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/ml/pipeline.py", line 134, in _fit
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/ml/base.py", line 205, in fit
  File "/home/hadoop/environment/lib64/python3.9/site-packages/xgboost/spark/core.py", line 1125, in _fit
    (config, booster) = _run_job()
  File "/home/hadoop/environment/lib64/python3.9/site-packages/xgboost/spark/core.py", line 1111, in _run_job
    ret = rdd_with_resource.collect()[0]
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1833, in collect
  File "/usr/lib/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1322, in __call__
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/errors/exceptions/captured.py", line 179, in deco
  File "/usr/lib/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/protocol.py", line 326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Could not recover from a failed barrier ResultStage. Most recent failure reason: Stage failed because barrier task ResultTask(213, 0) finished unsuccessfully.
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/hadoop/environment/lib64/python3.9/site-packages/xgboost/spark/core.py", line 1054, in _train_booster
    _rabit_args = _get_rabit_args(context, num_workers)
  File "/home/hadoop/environment/lib64/python3.9/site-packages/xgboost/spark/utils.py", line 69, in _get_rabit_args
    env = _start_tracker(context, n_workers)
  File "/home/hadoop/environment/lib64/python3.9/site-packages/xgboost/spark/utils.py", line 58, in _start_tracker
    tracker = RabitTracker(n_workers=n_workers, host_ip=host, sortby="task")
  File "/home/hadoop/environment/lib64/python3.9/site-packages/xgboost/tracker.py", line 73, in __init__
    _check_call(_LIB.XGTrackerCreate(args, ctypes.byref(handle)))
  File "/home/hadoop/environment/lib64/python3.9/site-packages/xgboost/core.py", line 284, in _check_call
    raise XGBoostError(py_str(_LIB.XGBGetLastError()))
xgboost.core.XGBoostError: [15:04:50] /workspace/src/collective/result.cc:78: 
- [socket.h:89|15:04:50]: Failed to bind socket. system error:Cannot assign requested address
Stack trace:
  [bt] (0) /home/hadoop/environment/lib64/python3.9/site-packages/xgboost/lib/libxgboost.so(+0x22dbbc) [0x7f57217b9bbc]
  [bt] (1) /home/hadoop/environment/lib64/python3.9/site-packages/xgboost/lib/libxgboost.so(+0x2ce6c1) [0x7f572185a6c1]
  [bt] (2) /home/hadoop/environment/lib64/python3.9/site-packages/xgboost/lib/libxgboost.so(+0x2db3a9) [0x7f57218673a9]
  [bt] (3) /home/hadoop/environment/lib64/python3.9/site-packages/xgboost/lib/libxgboost.so(XGTrackerCreate+0x6b0) [0x7f57217eb550]
  [bt] (4) /lib64/libffi.so.8(+0x7d16) [0x7f57451b5d16]
  [bt] (5) /lib64/libffi.so.8(+0x456e) [0x7f57451b256e]
  [bt] (6) /lib64/libffi.so.8(ffi_call+0x123) [0x7f57451b5373]
  [bt] (7) /usr/lib64/python3.9/lib-dynload/_ctypes.cpython-39-x86_64-linux-gnu.so(+0x8e5d) [0x7f57451c2e5d]
  [bt] (8) /usr/lib64/python3.9/lib-dynload/_ctypes.cpython-39-x86_64-linux-gnu.so(+0x8742) [0x7f57451c2742]

    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:572)
    at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:118)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at org.apache.spark.ContextAwareIterator.hasNext(ContextAwareIterator.scala:39)
    at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.hasNext(SerDeUtil.scala:86)
    at scala.collection.Iterator.foreach(Iterator.scala:943)
    at scala.collection.Iterator.foreach$(Iterator.scala:943)
    at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:80)
    at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:322)
    at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:751)
    at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:451)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1962)
    at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:282)

    at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:3067)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:3003)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:3002)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:3002)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:2326)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3265)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3205)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3194)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1041)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2406)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2427)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2446)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2471)
    at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1046)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:407)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:1045)
    at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:195)
    at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:568)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
    at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
    at java.base/java.lang.Thread.run(Thread.java:840)

2024-08-05 15:04:50.852 [INFO] py4j.clientserver: Closing down clientserver connection
wbo4958 commented 1 month ago

No idea if the port the Rabit tried to bind got not insufficient permissions.

lsli8888 commented 1 month ago

Thanks @wbo4958. I've asked the same question in the AWS re:Post site - link here so I hope someone can provide some help.

sgonullu commented 3 weeks ago

Hey folks,

Just wanted to share a quick fix I found for a similar issue. I noticed RabitTracker was using spark.driver.host to bind to. By default, this is set to the local hostname, but in my case, it was pointing to a different IP.

The solution: I changed spark.driver.host to my local IP address, and it worked!

Quick thought: Wouldn't it make more sense for RabitTracker to use spark.driver.bindAddress instead? Just throwing it out there.

Hope this helps someone!

lsli8888 commented 3 weeks ago

Hi @sgonullu - thanks for the info! What Python code did you use to get the local IP address? Something like this?

socket.gethostbyname(socket.gethostname())

wbo4958 commented 3 weeks ago

Hi @sgonullu, Currently, RabitTracker is launched on the executor side, and we get the IP address from TaskContext info which is from spark. Looks like xgboost pyspark didn't use spark.driver.bindAddress or spark.driver.host

wbo4958 commented 3 weeks ago

Hi @lsli8888, @trivialfis, I once had a PR to launch the RabitTracker on the driver side https://github.com/dmlc/xgboost/pull/10281, I don't know if it can help on this case.

lsli8888 commented 3 weeks ago

@wbo4958 I definitely got farther in my XGBoost training job on EMR Serverless, but it eventually fails with the following:

24/08/23 02:14:48 INFO ExecutorContainerAllocator: Going to request 8 executors for ResourceProfile Id: 0, target: 8 already provisioned: 0.
24/08/23 02:14:48 INFO DefaultEmrServerlessRMClient: Creating containers with container role SPARK_EXECUTOR and keys: Set(67, 66, 68, 72, 65, 71, 69, 70)
24/08/23 02:14:48 INFO DefaultEmrServerlessRMClient: Containers created with container role SPARK_EXECUTOR. key to container id map: Map(72 -> a8c8be81-ea98-f69e-73ae-6196e31cb572, 66 -> 36c8be81-ea98-b22f-dc37-d9b5912f0849, 69 -> 0ac8be81-eabc-8726-454f-71bd9fc78ee7, 71 -> 8ec8be81-eab6-b6f4-e57c-b60faa55494a, 68 -> 64c8be81-ea98-b44e-086a-46064575efa5, 65 -> 1cc8be81-eaa0-4037-c5ac-9561f7d5511f, 70 -> 6ac8be81-ea98-c6a3-69eb-750181e47089, 67 -> bcc8be81-eab9-160e-76c9-8e66c481d641)
24/08/23 02:14:53 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
24/08/23 02:15:08 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
24/08/23 02:15:23 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
24/08/23 02:15:38 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
24/08/23 02:15:53 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
24/08/23 02:16:08 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
24/08/23 02:16:23 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
24/08/23 02:16:38 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
24/08/23 02:16:48 WARN ExecutorContainerStoreImpl: Executor(s) 66,70,67,71,68,72,69,65 exited unexpectedly with exit code 1.
24/08/23 02:16:49 INFO ExecutorContainerAllocator: Going to request 8 executors for ResourceProfile Id: 0, target: 8 already provisioned: 0.
24/08/23 02:16:49 INFO DefaultEmrServerlessRMClient: Creating containers with container role SPARK_EXECUTOR and keys: Set(73, 78, 77, 79, 76, 80, 75, 74)
24/08/23 02:16:49 INFO DefaultEmrServerlessRMClient: Containers created with container role SPARK_EXECUTOR. key to container id map: Map(78 -> 6ec8be82-d562-e587-3a10-0e67f81810d1, 74 -> d2c8be82-d562-1c0e-73d4-ecf320291621, 80 -> dcc8be82-d561-f74a-35d1-02d07ae3dd71, 77 -> ecc8be82-d558-04a7-56e9-0bfc209399c6, 79 -> eac8be82-d558-7826-0a21-82a35cd22aea, 73 -> 0ac8be82-d558-0e2c-5365-14b762c38e6e, 76 -> a4c8be82-d561-05bf-cfe2-bed04501ad17, 75 -> 12c8be82-d558-62a8-a26f-ca8f6884daff)
24/08/23 02:16:53 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
24/08/23 02:17:08 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
24/08/23 02:17:23 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
24/08/23 02:17:38 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
24/08/23 02:17:53 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
24/08/23 02:18:08 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
24/08/23 02:18:18 WARN ExecutorContainerStoreImpl: Executor(s) 73,77 exited unexpectedly with exit code 1.
24/08/23 02:18:19 INFO ExecutorContainerAllocator: Going to request 2 executors for ResourceProfile Id: 0, target: 8 already provisioned: 6.
24/08/23 02:18:19 INFO DefaultEmrServerlessRMClient: Creating containers with container role SPARK_EXECUTOR and keys: Set(81, 82)
24/08/23 02:18:19 INFO DefaultEmrServerlessRMClient: Containers created with container role SPARK_EXECUTOR. key to container id map: Map(82 -> dec8be83-855a-e8c7-847a-62639d6ff0ad, 81 -> b2c8be83-855a-8e30-92bb-636e720e5829)
24/08/23 02:18:23 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
24/08/23 02:18:38 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
24/08/23 02:18:48 WARN ExecutorContainerStoreImpl: Executor(s) 78,74,75,79,76,80 exited unexpectedly with exit code 1.
24/08/23 02:18:49 INFO ExecutorContainerAllocator: Going to request 6 executors for ResourceProfile Id: 0, target: 8 already provisioned: 2.
24/08/23 02:18:49 INFO DefaultEmrServerlessRMClient: Creating containers with container role SPARK_EXECUTOR and keys: Set(84, 88, 83, 87, 86, 85)
24/08/23 02:18:49 INFO DefaultEmrServerlessRMClient: Containers created with container role SPARK_EXECUTOR. key to container id map: Map(84 -> 2ec8be83-c019-85b4-f9b3-2992972587e5, 87 -> 00c8be83-c020-aa9a-ac04-e93496fb13da, 86 -> 02c8be83-c019-a9dd-b655-05c739cf4e95, 83 -> dec8be83-c022-5139-87fe-c30139c90c34, 85 -> 58c8be83-c020-e199-977c-37d94e14055c, 88 -> 98c8be83-c019-c53f-b2c1-1e3aaeda0fa0)
24/08/23 02:18:53 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
24/08/23 02:19:08 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
24/08/23 02:19:23 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
24/08/23 02:19:38 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
24/08/23 02:19:53 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
24/08/23 02:20:08 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
24/08/23 02:20:18 ERROR EmrServerlessClusterSchedulerBackend: Abandoning job due to no executor being launched within 1200000ms after driver starts.

These warnings/errors repeat quite a few times. I have 256 max concurrent vCPUs for my account and I can see it ramps us to at least 56% of the available vCPUs.

I have the following settings for EMR Serverless as well:

spark_driver_memory = "16g"
spark_emr_serverless_driver_disk = "50g"
spark_emr_serverless_executor_disk = "50g"
spark_emr_serverless_memoryoverheadfactor = 0.1
spark_executor_cores = 16
spark_executor_memory = "16g"
spark_executor_instances = 8
spark_dynamic_allocation_enabled = "true"
spark_dynamic_allocation_max_executors = 24
spark_task_cpus = 2

I would assume a max of 24 executors at 16 GB RAM for each is enough? Or do I need to allocate even more?

Just for comparison - I am able to train on my M1 Mac which has 16 GB of RAM and a sampling of 0.02. The training run above uses sampling of 0.1 - so that's why I think I have more than enough Spark executors.

sgonullu commented 3 weeks ago

Hi @sgonullu - thanks for the info! What Python code did you use to get the local IP address? Something like this?

socket.gethostbyname(socket.gethostname())

I apologize for the confusion earlier. To clarify, I was troubleshooting the issue and submitted a test Spark job with --master local[*], which resulted in the same bind error. However, when I changed the spark.driver.host setting, it worked with --master local[*]. I assumed that the tracker would always start on the driver.

After @wbo4958 mentioned that the tracker starts on the executor, I reviewed the code to understand how the IP address is determined. I tested with the following code snippet:

def get_addr_list(iterator):
    context = BarrierTaskContext.get()
    infos = context.getTaskInfos()

    addr_list = [info.address for info in context.getTaskInfos()]
    yield addr_list

executors_addr = rdd.barrier().mapPartitions(get_addr_list).collect()

print(executor_addr)

Interestingly, when using --master local[*], the IP address returned was the spark.driver.host value. However, when using --master yarn, the IP address returned was not the expected worker IP address, but rather a network address (e.g. 10.42.0.0, I am not on EMR) that corresponds to my client's network. Note that my client is running in a container and is not on the same network as the executors.

I'm unsure whether this is due to a Spark configuration issue or something else. Any further guidance would be appreciated.

wbo4958 commented 3 weeks ago

@lsli8888, From the logs, the executor exits with unexpected error when spark tried to "create" executor.

24/08/23 02:16:48 WARN ExecutorContainerStoreImpl: Executor(s) 66,70,67,71,68,72,69,65 exited unexpectedly with exit code 1.

You should check the reason here.

And to check such kind of issue, I would suggest you

1, to see if only 1 executor can work? 2, reduce the xgboost dataset to see if xgboost can work on only 1 executor with a few lines of dataset.

wbo4958 commented 3 weeks ago

Hi @sgonullu, Can xgboost work on the scenario of --master yarn? The BarrierTaskContext of each spark tasks should hold the IP address of each task, which is assigned by spark driver. So the IP should be accessible from driver.

lsli8888 commented 3 weeks ago

@wbo4958 Good thoughts - I forgot to check the Spark executor logs. I only checked the driver logs.. I tried it with a single Spark driver and single Spark executor (max was also at 1 Spark executor). 16 GB of RAM each. A sampling rate of 0.0001 which is 1000 records. Unfortunately, I got the same error, but here's the Spark executor logs indicating a connection error. 10.0.176.163 is the spark.driver.host value that I got by invoking: socket.gethostbyname(socket.gethostname()). Should I be setting spark.executor.host as well?

24/08/26 07:09:17 INFO CoarseGrainedExecutorBackend: Started daemon with process name: 36@ip-10-0-142-117.us-east-2.compute.internal
24/08/26 07:09:17 INFO SignalUtils: Registering signal handler for TERM
24/08/26 07:09:17 INFO SignalUtils: Registering signal handler for HUP
24/08/26 07:09:17 INFO SignalUtils: Registering signal handler for INT
24/08/26 07:09:17 INFO SecurityManager: Changing view acls to: hadoop
24/08/26 07:09:17 INFO SecurityManager: Changing modify acls to: hadoop
24/08/26 07:09:17 INFO SecurityManager: Changing view acls groups to: 
24/08/26 07:09:17 INFO SecurityManager: Changing modify acls groups to: 
24/08/26 07:09:17 INFO SecurityManager: SecurityManager: authentication enabled; ui acls disabled; users with view permissions: hadoop; groups with view permissions: EMPTY; users with modify permissions: hadoop; groups with modify permissions: EMPTY
Exception in thread "main" java.lang.reflect.UndeclaredThrowableException
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1915)
    at org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:61)
    at org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:478)
    at org.apache.spark.executor.CoarseGrainedExecutorBackend$.main(CoarseGrainedExecutorBackend.scala:467)
    at org.apache.spark.executor.CoarseGrainedExecutorBackend.main(CoarseGrainedExecutorBackend.scala)
Caused by: org.apache.spark.SparkException: Exception thrown in awaitResult: 
    at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
    at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
    at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
    at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
    at org.apache.spark.executor.CoarseGrainedExecutorBackend$.$anonfun$run$9(CoarseGrainedExecutorBackend.scala:498)
    at scala.runtime.java8.JFunction1$mcVI$sp.apply(JFunction1$mcVI$sp.java:23)
    at scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:985)
    at scala.collection.immutable.Range.foreach(Range.scala:158)
    at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:984)
    at org.apache.spark.executor.CoarseGrainedExecutorBackend$.$anonfun$run$7(CoarseGrainedExecutorBackend.scala:496)
    at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:62)
    at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:61)
    at java.base/java.security.AccessController.doPrivileged(AccessController.java:712)
    at java.base/javax.security.auth.Subject.doAs(Subject.java:439)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1899)
    ... 4 more
Caused by: java.io.IOException: Failed to connect to /10.0.176.163:36391
    at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:294)
    at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:214)
    at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:226)
    at org.apache.spark.rpc.netty.NettyRpcEnv.createClient(NettyRpcEnv.scala:204)
    at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:202)
    at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:198)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: /10.0.176.163:36391
Caused by: java.net.ConnectException: Connection refused
    at java.base/sun.nio.ch.Net.pollConnect(Native Method)
    at java.base/sun.nio.ch.Net.pollConnectNow(Net.java:672)
    at java.base/sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:946)
    at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:337)
    at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:334)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:776)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.base/java.lang.Thread.run(Thread.java:840)
wbo4958 commented 3 weeks ago

@lsli8888,

1, A sampling rate of 0.0001 is the xgboost parameter? if so, then the spark tasks still need to read all the data and do the sampling in xgboost. I suppose there's OOM according to your configuration. I would suggest you

train_df = train_df.head(10)
  1. socket.gethostbyname(socket.gethostname())

    Why you did this?

    Can you make the sample code working without changing any XGBoost code?

    >>> from xgboost.spark import SparkXGBClassifier
    >>> from pyspark.ml.linalg import Vectors
    >>> df_train = spark.createDataFrame([
    ...     (Vectors.dense(1.0, 2.0, 3.0), 0, False, 1.0),
    ...     (Vectors.sparse(3, {1: 1.0, 2: 5.5}), 1, False, 2.0),
    ...     (Vectors.dense(4.0, 5.0, 6.0), 0, True, 1.0),
    ...     (Vectors.sparse(3, {1: 6.0, 2: 7.5}), 1, True, 2.0),
    ... ], ["features", "label", "isVal", "weight"])
    >>> df_test = spark.createDataFrame([
    ...     (Vectors.dense(1.0, 2.0, 3.0), ),
    ... ], ["features"])
    >>> xgb_classifier = SparkXGBClassifier(max_depth=5, missing=0.0,
    ...     validation_indicator_col='isVal', weight_col='weight',
    ...     early_stopping_rounds=1, eval_metric='logloss')
    >>> xgb_clf_model = xgb_classifier.fit(df_train)
    >>> xgb_clf_model.transform(df_test).show()
lsli8888 commented 3 weeks ago

@wbo4958,

I set the spark.driver.host to:

socket.gethostbyname(socket.gethostname())

on the suggestion of @sgonullu. This was mentioned earlier in the thread. Let me know if you have a different opinion on this. I did find setting spark.driver.host this way did get me further in the training albeit with the errors I showed above.

I also limited the training data using:

train_df = train_df.limit(10)

Unfortunately, I got the same results that I did before which is the "Caused by: java.io.IOException: Failed to connect to" error that I showed above.

I will give your sample code a try soon.

lsli8888 commented 3 weeks ago

@wbo4958

I tried your code sample on EMR Serverless. Interestingly, it gave the exact same errors as I was seeing before, i.e. the Caused by: java.io.IOException: Failed to connect to" error. The code did run locally, so there's something wrong with my EMR Serverless application that doesn't work with XGBoost. Again, my configuration works with the Spark GBTRegressor just fine on EMR Serverless (although the performance and results are quite bad - that's why I'm hoping to use XGBoost).

wbo4958 commented 3 weeks ago

@lsli8888, I see, so the spark GBTRegressor can work?

But from the log, it seems something wrong with spark cluster where the executor couldn't connect to the driver. If so, the spark GBTRegressor also shouldn't work.

lsli8888 commented 3 weeks ago

Hi @wbo4958 - let me clarify. Here are my test scenarios:

SparkXGBClassifier (your sample code, spark.driver.host set to socket.gethostbyname(socket.gethostname()), get the connection failed error: Caused by: java.io.IOException: Failed to connect to /x.x.x.x:xxxx

SparkXGBClassifier (your sample code, spark.driver.host not explicitly set:

Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/hadoop/environment/lib64/python3.9/site-packages/xgboost/spark/core.py", line 1054, in _train_booster
    _rabit_args = _get_rabit_args(context, num_workers)
  File "/home/hadoop/environment/lib64/python3.9/site-packages/xgboost/spark/utils.py", line 69, in _get_rabit_args
    env = _start_tracker(context, n_workers)
  File "/home/hadoop/environment/lib64/python3.9/site-packages/xgboost/spark/utils.py", line 58, in _start_tracker
    tracker = RabitTracker(n_workers=n_workers, host_ip=host, sortby="task")
  File "/home/hadoop/environment/lib64/python3.9/site-packages/xgboost/tracker.py", line 73, in __init__
    _check_call(_LIB.XGTrackerCreate(args, ctypes.byref(handle)))
  File "/home/hadoop/environment/lib64/python3.9/site-packages/xgboost/core.py", line 284, in _check_call
    raise XGBoostError(py_str(_LIB.XGBGetLastError()))
xgboost.core.XGBoostError: [05:47:20] /workspace/src/collective/result.cc:78: 
- [socket.h:89|05:47:20]: Failed to bind socket. system error:Cannot assign requested address
Stack trace:
  [bt] (0) /home/hadoop/environment/lib64/python3.9/site-packages/xgboost/lib/libxgboost.so(+0x22dbbc) [0x7fbf64686bbc]
  [bt] (1) /home/hadoop/environment/lib64/python3.9/site-packages/xgboost/lib/libxgboost.so(+0x2ce6c1) [0x7fbf647276c1]
  [bt] (2) /home/hadoop/environment/lib64/python3.9/site-packages/xgboost/lib/libxgboost.so(+0x2db3a9) [0x7fbf647343a9]
  [bt] (3) /home/hadoop/environment/lib64/python3.9/site-packages/xgboost/lib/libxgboost.so(XGTrackerCreate+0x6b0) [0x7fbf646b8550]
  [bt] (4) /lib64/libffi.so.8(+0x7d16) [0x7fbf88087d16]
  [bt] (5) /lib64/libffi.so.8(+0x456e) [0x7fbf8808456e]
  [bt] (6) /lib64/libffi.so.8(ffi_call+0x123) [0x7fbf88087373]
  [bt] (7) /usr/lib64/python3.9/lib-dynload/_ctypes.cpython-39-x86_64-linux-gnu.so(+0x8e5d) [0x7fbf88094e5d]
  [bt] (8) /usr/lib64/python3.9/lib-dynload/_ctypes.cpython-39-x86_64-linux-gnu.so(+0x8742) [0x7fbf88094742]

GbtClassifier (your sample code, spark.driver.host set to socket.gethostbyname(socket.gethostname()), get the connection failed error: Caused by: java.io.IOException: Failed to connect to /x.x.x.x:xxxx

GbtClassifier (slight modification of your sample code to use GBT classifier instead, spark.driver.host not explicitly set:

In summary, so setting spark.driver.host set to socket.gethostbyname(socket.gethostname()) for SparkXGBClassifier seems to get things along slightly further, but still fails. Interestingly, setting spark.driver.host set to socket.gethostbyname(socket.gethostname()) for GbtClassifier seems to mess things up...

wbo4958 commented 3 weeks ago

@lsli8888, I see,

The issue is back to the original one, Cannot assign requested address.

Hmm, I would suggest you to apply https://github.com/dmlc/xgboost/pull/10281 to xgboost. this PR which launches the RabitTracker on the driver side instead of executor side, that may help.

and try below code after applying

df_train = spark.createDataFrame(
    [
        (Vectors.dense(1.0, 2.0, 3.0), 0, False, 1.0),
        (Vectors.sparse(3, {1: 1.0, 2: 5.5}), 1, False, 2.0),
        (Vectors.dense(4.0, 5.0, 6.0), 0, True, 1.0),
        (Vectors.sparse(3, {1: 6.0, 2: 7.5}), 1, True, 2.0),
    ]
    * 100,
    ["features", "label", "isVal", "weight"],
)

callbacks = EvaluationMonitor()
xgb_regressor = SparkXGBRegressor(
    num_workers=5,
    callbacks=[callbacks],
    tracker_on_driver=True,
    validation_indicator_col="isVal",
)
xgb_reg_model = xgb_regressor.fit(df_train)
lsli8888 commented 3 weeks ago

@wbo4958

Hi, unfortunately, I was not able to build xgboost with the changes from the commit:

https://github.com/dmlc/xgboost/commit/6d24bbd74c8fa1ed184124ea1c2229442f4e0bb6

Of note, GitHub shows the warning: "This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository."

So I couldn't check out this commit hash directly.

I then tried checking out parent commit:

https://github.com/dmlc/xgboost/commit/a5a58102e5e82fa508514c34cd8e5f408dcfd3e1

Of which I then applied the changes from commit hash 6d24bbd. When I tried building via make, I get the error below:

In file included from /app/xgboost/src/c_api/c_api.cc:22:
/app/xgboost/src/c_api/../common/io.h:51:8: error: conflicting return type specified for 'virtual void xgboost::common::MemoryFixSizeBuffer::Write(const void*, std::size_t)'
   51 |   void Write(const void *ptr, std::size_t size) override {
      |        ^~~~~
In file included from /app/xgboost/dmlc-core/include/dmlc/data.h:14,
                 from /app/xgboost/include/xgboost/data.h:11,
                 from /app/xgboost/include/xgboost/predictor.h:12,
                 from /app/xgboost/src/c_api/../common/api_entry.h:10,
                 from /app/xgboost/src/c_api/c_api.cc:18:
/app/xgboost/dmlc-core/include/dmlc/./io.h:45:18: note: overridden function is 'virtual size_t dmlc::Stream::Write(const void*, size_t)'
   45 |   virtual size_t Write(const void* ptr, size_t size) = 0;
      |                  ^~~~~
In file included from /app/xgboost/src/c_api/c_api.cc:22:
/app/xgboost/src/c_api/../common/io.h:94:8: error: conflicting return type specified for 'virtual void xgboost::common::MemoryBufferStream::Write(const void*, size_t)'
   94 |   void Write(const void *ptr, size_t size) override {
      |        ^~~~~
In file included from /app/xgboost/dmlc-core/include/dmlc/data.h:14,
                 from /app/xgboost/include/xgboost/data.h:11,
                 from /app/xgboost/include/xgboost/predictor.h:12,
                 from /app/xgboost/src/c_api/../common/api_entry.h:10,
                 from /app/xgboost/src/c_api/c_api.cc:18:
/app/xgboost/dmlc-core/include/dmlc/./io.h:45:18: note: overridden function is 'virtual size_t dmlc::Stream::Write(const void*, size_t)'
   45 |   virtual size_t Write(const void* ptr, size_t size) = 0;
      |                  ^~~~~
In file included from /app/xgboost/src/c_api/c_api.cc:22:
/app/xgboost/src/c_api/../common/io.h:130:8: error: conflicting return type specified for 'virtual void xgboost::common::PeekableInStream::Write(const void*, size_t)'
  130 |   void Write(const void*, size_t) override {
      |        ^~~~~
In file included from /app/xgboost/dmlc-core/include/dmlc/data.h:14,
                 from /app/xgboost/include/xgboost/data.h:11,
                 from /app/xgboost/include/xgboost/predictor.h:12,
                 from /app/xgboost/src/c_api/../common/api_entry.h:10,
                 from /app/xgboost/src/c_api/c_api.cc:18:
/app/xgboost/dmlc-core/include/dmlc/./io.h:45:18: note: overridden function is 'virtual size_t dmlc::Stream::Write(const void*, size_t)'
   45 |   virtual size_t Write(const void* ptr, size_t size) = 0;
      |                  ^~~~~
[ 19%] Building CXX object src/CMakeFiles/objxgboost.dir/collective/comm_group.cc.o
[ 20%] Building CXX object src/CMakeFiles/objxgboost.dir/collective/in_memory_handler.cc.o
[ 21%] Building CXX object src/CMakeFiles/objxgboost.dir/collective/loop.cc.o
gmake[2]: *** [src/CMakeFiles/objxgboost.dir/build.make:76: src/CMakeFiles/objxgboost.dir/c_api/c_api.cc.o] Error 1
gmake[2]: *** Waiting for unfinished jobs....
gmake[1]: *** [CMakeFiles/Makefile2:216: src/CMakeFiles/objxgboost.dir/all] Error 2
gmake: *** [Makefile:156: all] Error 2

I may be doing something wrong... so let me know!

wbo4958 commented 3 weeks ago

@lsli8888, I just updated https://github.com/dmlc/xgboost/pull/10281 to the latest. You don't need to compile XGBoost, instead, you can just copy the changed python files from https://github.com/dmlc/xgboost/pull/10281 to replace the installed xgboost python files.

lsli8888 commented 2 weeks ago

@wbo4958

I copied the changed Python files to the xgboost library in site-packages to replace the three XGBoost Python files (core.py, estimator.py, utils.py). Hopefully, I did this correctly!

Anyway, I then ran EMR Serverless with these changes and the updated SparkXgbRegressor code you suggested above. I initially ran the job with one Spark executor and 4 GB of RAM each for the driver and executor, 2 cores allocated for each task, and 4 driver and executor cores. I thought this would be more than enough resources, but the job failed with the following errors:

24/08/28 07:09:07 INFO DAGScheduler: ShuffleMapStage 0 (javaToPython at NativeMethodAccessorImpl.java:0) finished in 73.239 s
24/08/28 07:09:07 INFO DAGScheduler: looking for newly runnable stages
24/08/28 07:09:07 INFO DAGScheduler: running: Set()
24/08/28 07:09:07 INFO DAGScheduler: waiting: Set()
24/08/28 07:09:07 INFO DAGScheduler: failed: Set()
24/08/28 07:09:07 INFO SparkContext: Starting job: fit at /tmp/spark-61ee1a8b-2892-4143-83f9-b0d29080b620/train_model.py:430
24/08/28 07:09:07 WARN DAGScheduler: Barrier stage in job 1 requires 5 slots, but only 2 are available. Will retry up to 40 more times
24/08/28 07:09:22 WARN DAGScheduler: Barrier stage in job 1 requires 5 slots, but only 2 are available. Will retry up to 39 more times
...
24/08/28 07:11:02 INFO EmrServerlessClusterSchedulerBackend$EmrServerlessDriverEndpoint: No executor found for 2600:1f16:3b4:4e02:360f:ee62:383a:b8c1:45484
24/08/28 07:11:07 WARN DAGScheduler: Barrier stage in job 1 requires 5 slots, but only 2 are available. Will retry up to 32 more times
...
24/08/28 07:17:52 WARN DAGScheduler: Barrier stage in job 1 requires 5 slots, but only 2 are available. Will retry up to 5 more times
24/08/28 07:18:07 WARN DAGScheduler: Barrier stage in job 1 requires 5 slots, but only 2 are available. Will retry up to 4 more times
24/08/28 07:18:22 WARN DAGScheduler: Barrier stage in job 1 requires 5 slots, but only 2 are available. Will retry up to 3 more times
24/08/28 07:18:37 WARN DAGScheduler: Barrier stage in job 1 requires 5 slots, but only 2 are available. Will retry up to 2 more times
24/08/28 07:18:52 WARN DAGScheduler: Barrier stage in job 1 requires 5 slots, but only 2 are available. Will retry up to 1 more times
24/08/28 07:19:07 WARN DAGScheduler: Barrier stage in job 1 requires 5 slots, but only 2 are available. Will retry up to 0 more times
24/08/28 07:19:07 INFO DAGScheduler: Job 1 failed: fit at /tmp/spark-61ee1a8b-2892-4143-83f9-b0d29080b620/train_model.py:430, took 600.047621 s
24/08/28 07:19:08 INFO SparkContext: Invoking stop() from shutdown hook
24/08/28 07:19:08 INFO SparkContext: SparkContext is stopping with exitCode 0.
24/08/28 07:19:08 INFO SparkUI: Stopped Spark web UI at http://[2600:1f16:3b4:4e02:bd93:7f0d:b127:4719]:4040
24/08/28 07:19:08 INFO EmrServerlessClusterSchedulerBackend: Shutting down all executors
24/08/28 07:19:08 INFO EmrServerlessClusterSchedulerBackend$EmrServerlessDriverEndpoint: Asking each executor to shut down
24/08/28 07:19:08 INFO TimeBasedRotatingEventLogFilesWriter: Renaming file:/var/log/spark/apps/eventlog_v2_00flvmvbrhnrc80f/00flvmvbrhnrc80f.inprogress to file:/var/log/spark/apps/eventlog_v2_00flvmvbrhnrc80f/events_1_00flvmvbrhnrc80f
24/08/28 07:19:08 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
24/08/28 07:19:08 INFO MemoryStore: MemoryStore cleared
24/08/28 07:19:08 INFO BlockManager: BlockManager stopped
24/08/28 07:19:08 INFO BlockManagerMaster: BlockManagerMaster stopped
24/08/28 07:19:08 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
24/08/28 07:19:10 INFO SparkContext: Successfully stopped SparkContext
24/08/28 07:19:10 INFO ShutdownHookManager: Shutdown hook called

I then ran the job with one Spark executor and 8 GB of RAM each for the driver and executor, 2 cores allocated for each task, and 8 driver and executor cores. It failed again with a similar error:

...
Barrier stage in job 1 requires 5 slots, but only 4 are available. Will retry up to 5 more times
...

With this info, I then upped the resources again to one Spark executor and 16 GB of RAM each for the driver and executor, 2 cores allocated for each task, and 16 driver and executor cores. I then got the following error:

24/08/28 08:18:25 INFO BarrierCoordinator: Current barrier epoch for Stage 2 (Attempt 0) is 0.
24/08/28 08:18:25 INFO BarrierCoordinator: Barrier sync epoch 0 from Stage 2 (Attempt 0) received update from Task 17, current progress: 1/5.
24/08/28 08:18:25 INFO BarrierCoordinator: Current barrier epoch for Stage 2 (Attempt 0) is 0.
24/08/28 08:18:25 INFO BarrierCoordinator: Barrier sync epoch 0 from Stage 2 (Attempt 0) received update from Task 19, current progress: 2/5.
24/08/28 08:18:25 WARN TaskSetManager: Lost task 0.0 in stage 2.0 (TID 20) ([2600:1f16:3b4:4e00:7237:adba:2141:134c] executor 1): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/hadoop/environment/lib64/python3.9/site-packages/xgboost/spark/core.py", line 1054, in _train_booster
    _rabit_args = _get_rabit_args(context, num_workers)
  File "/home/hadoop/environment/lib64/python3.9/site-packages/xgboost/spark/utils.py", line 69, in _get_rabit_args
    env = _start_tracker(context, n_workers)
  File "/home/hadoop/environment/lib64/python3.9/site-packages/xgboost/spark/utils.py", line 58, in _start_tracker
    tracker = RabitTracker(n_workers=n_workers, host_ip=host, sortby="task")
  File "/home/hadoop/environment/lib64/python3.9/site-packages/xgboost/tracker.py", line 73, in __init__
    _check_call(_LIB.XGTrackerCreate(args, ctypes.byref(handle)))
  File "/home/hadoop/environment/lib64/python3.9/site-packages/xgboost/core.py", line 284, in _check_call
    raise XGBoostError(py_str(_LIB.XGBGetLastError()))
xgboost.core.XGBoostError: [08:18:25] /workspace/src/collective/result.cc:78: 
- [socket.h:89|08:18:25]: Failed to bind socket. system error:Cannot assign requested address
Stack trace:
  [bt] (0) /home/hadoop/environment/lib64/python3.9/site-packages/xgboost/lib/libxgboost.so(+0x22dbbc) [0x7f2022d05bbc]
  [bt] (1) /home/hadoop/environment/lib64/python3.9/site-packages/xgboost/lib/libxgboost.so(+0x2ce6c1) [0x7f2022da66c1]
  [bt] (2) /home/hadoop/environment/lib64/python3.9/site-packages/xgboost/lib/libxgboost.so(+0x2db3a9) [0x7f2022db33a9]
  [bt] (3) /home/hadoop/environment/lib64/python3.9/site-packages/xgboost/lib/libxgboost.so(XGTrackerCreate+0x6b0) [0x7f2022d37550]
  [bt] (4) /lib64/libffi.so.8(+0x7d16) [0x7f204668bd16]
  [bt] (5) /lib64/libffi.so.8(+0x456e) [0x7f204668856e]
  [bt] (6) /lib64/libffi.so.8(ffi_call+0x123) [0x7f204668b373]
  [bt] (7) /usr/lib64/python3.9/lib-dynload/_ctypes.cpython-39-x86_64-linux-gnu.so(+0x8e5d) [0x7f2046698e5d]
  [bt] (8) /usr/lib64/python3.9/lib-dynload/_ctypes.cpython-39-x86_64-linux-gnu.so(+0x8742) [0x7f2046698742]

    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:572)
    at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:118)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at org.apache.spark.ContextAwareIterator.hasNext(ContextAwareIterator.scala:39)
    at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.hasNext(SerDeUtil.scala:86)
    at scala.collection.Iterator.foreach(Iterator.scala:943)
    at scala.collection.Iterator.foreach$(Iterator.scala:943)
    at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:80)
    at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:322)
    at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:751)
    at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:451)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1962)
    at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:282)

24/08/28 08:18:25 INFO DAGScheduler: Marking ResultStage 2 (fit at /tmp/spark-5efa145b-ef07-496a-8825-3d2e2f37acb7/train_model.py:430) as failed due to a barrier task failed.
24/08/28 08:18:25 INFO TaskSchedulerImpl: Killing all running tasks in stage 2: Task ResultTask(2, 0) from barrier stage ResultStage 2 (fit at /tmp/spark-5efa145b-ef07-496a-8825-3d2e2f37acb7/train_model.py:430) failed.
24/08/28 08:18:25 INFO DAGScheduler: ResultStage 2 (fit at /tmp/spark-5efa145b-ef07-496a-8825-3d2e2f37acb7/train_model.py:430) failed in 25.776 s due to Stage failed because barrier task ResultTask(2, 0) finished unsuccessfully.
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/hadoop/environment/lib64/python3.9/site-packages/xgboost/spark/core.py", line 1054, in _train_booster
    _rabit_args = _get_rabit_args(context, num_workers)
  File "/home/hadoop/environment/lib64/python3.9/site-packages/xgboost/spark/utils.py", line 69, in _get_rabit_args
    env = _start_tracker(context, n_workers)
  File "/home/hadoop/environment/lib64/python3.9/site-packages/xgboost/spark/utils.py", line 58, in _start_tracker
    tracker = RabitTracker(n_workers=n_workers, host_ip=host, sortby="task")
  File "/home/hadoop/environment/lib64/python3.9/site-packages/xgboost/tracker.py", line 73, in __init__
    _check_call(_LIB.XGTrackerCreate(args, ctypes.byref(handle)))
  File "/home/hadoop/environment/lib64/python3.9/site-packages/xgboost/core.py", line 284, in _check_call
    raise XGBoostError(py_str(_LIB.XGBGetLastError()))
xgboost.core.XGBoostError: [08:18:25] /workspace/src/collective/result.cc:78: 
- [socket.h:89|08:18:25]: Failed to bind socket. system error:Cannot assign requested address
Stack trace:
  [bt] (0) /home/hadoop/environment/lib64/python3.9/site-packages/xgboost/lib/libxgboost.so(+0x22dbbc) [0x7f2022d05bbc]
  [bt] (1) /home/hadoop/environment/lib64/python3.9/site-packages/xgboost/lib/libxgboost.so(+0x2ce6c1) [0x7f2022da66c1]
  [bt] (2) /home/hadoop/environment/lib64/python3.9/site-packages/xgboost/lib/libxgboost.so(+0x2db3a9) [0x7f2022db33a9]
  [bt] (3) /home/hadoop/environment/lib64/python3.9/site-packages/xgboost/lib/libxgboost.so(XGTrackerCreate+0x6b0) [0x7f2022d37550]
  [bt] (4) /lib64/libffi.so.8(+0x7d16) [0x7f204668bd16]
  [bt] (5) /lib64/libffi.so.8(+0x456e) [0x7f204668856e]
  [bt] (6) /lib64/libffi.so.8(ffi_call+0x123) [0x7f204668b373]
  [bt] (7) /usr/lib64/python3.9/lib-dynload/_ctypes.cpython-39-x86_64-linux-gnu.so(+0x8e5d) [0x7f2046698e5d]
  [bt] (8) /usr/lib64/python3.9/lib-dynload/_ctypes.cpython-39-x86_64-linux-gnu.so(+0x8742) [0x7f2046698742]

    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:572)
    at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:118)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at org.apache.spark.ContextAwareIterator.hasNext(ContextAwareIterator.scala:39)
    at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.hasNext(SerDeUtil.scala:86)
    at scala.collection.Iterator.foreach(Iterator.scala:943)
    at scala.collection.Iterator.foreach$(Iterator.scala:943)
    at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:80)
    at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:322)
    at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:751)
    at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:451)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1962)
    at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:282)

24/08/28 08:18:25 INFO DAGScheduler: Resubmitting ResultStage 2 (fit at /tmp/spark-5efa145b-ef07-496a-8825-3d2e2f37acb7/train_model.py:430) due to barrier stage failure.

So ultimately, the job still failed. However, I hope this extra info helps debug this issue. Thanks!

lsli8888 commented 2 weeks ago

@wbo4958

Here's the standard output for the one Spark executor and 16 GB of RAM each for the driver and executor, 2 cores allocated for each task, and 16 driver and executor cores configuration. Do I need to allocate even more resources when running the Spark job? If so, that wouldn't seem to make sense as the script is so simple, the initial dataframe is so small...

2024-08-28 07:33:38.492 [WARNING] SparkXGBRegressor: The num_workers 5 set for xgboost distributed training is greater than current max number of concurrent spark task slots, you need wait until more task slots available or you need increase spark cluster workers.
2024-08-28 07:33:39.327 [INFO] XGBoost-PySpark: Running xgboost-2.1.1 on 5 workers with
    booster params: {'objective': 'reg:squarederror', 'device': 'cpu', 'tracker_on_driver': True, 'nthread': 2}
    train_call_kwargs_params: {'callbacks': [<xgboost.callback.EvaluationMonitor object at 0x7f539ec9dc40>], 'verbose_eval': True, 'num_boost_round': 100}
    dmatrix_kwargs: {'nthread': 2, 'missing': nan}
Traceback (most recent call last):
  File "/tmp/spark-b02a269f-dd3e-4b60-97ca-a68e9758c344/train_model.py", line 511, in <module>
    main()
  File "/home/hadoop/environment/lib64/python3.9/site-packages/my_utils/utils/logger_utils.py", line 43, in wrapper
    return func(*args, **kwargs)
  File "/home/hadoop/environment/lib64/python3.9/site-packages/click/core.py", line 1157, in __call__
    return self.main(*args, **kwargs)
  File "/home/hadoop/environment/lib64/python3.9/site-packages/click/core.py", line 1078, in main
    rv = self.invoke(ctx)
  File "/home/hadoop/environment/lib64/python3.9/site-packages/click/core.py", line 1434, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/home/hadoop/environment/lib64/python3.9/site-packages/click/core.py", line 783, in invoke
    return __callback(*args, **kwargs)
  File "/tmp/spark-b02a269f-dd3e-4b60-97ca-a68e9758c344/train_model.py", line 365, in main
    train_model(
  File "/home/hadoop/environment/lib64/python3.9/site-packages/my_utils/utils/logger_utils.py", line 60, in timed
    result: Callable = method(*args, **kw)
  File "/home/hadoop/environment/lib64/python3.9/site-packages/my_utils/utils/logger_utils.py", line 43, in wrapper
    return func(*args, **kwargs)
  File "/tmp/spark-b02a269f-dd3e-4b60-97ca-a68e9758c344/train_model.py", line 430, in train_model
    xgb_reg_model = xgb_regressor.fit(df_train)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/ml/base.py", line 205, in fit
  File "/home/hadoop/environment/lib64/python3.9/site-packages/xgboost/spark/core.py", line 1125, in _fit
    (config, booster) = _run_job()
  File "/home/hadoop/environment/lib64/python3.9/site-packages/xgboost/spark/core.py", line 1111, in _run_job
    ret = rdd_with_resource.collect()[0]
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1833, in collect
  File "/usr/lib/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1322, in __call__
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/errors/exceptions/captured.py", line 179, in deco
  File "/usr/lib/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/protocol.py", line 326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.scheduler.BarrierJobSlotsNumberCheckFailed: [SPARK-24819]: Barrier execution mode does not allow run a barrier stage that requires more slots than the total number of slots in the cluster currently. Please init a new cluster with more resources(e.g. CPU, GPU) or repartition the input RDD(s) to reduce the number of slots required to run this barrier stage.
    at org.apache.spark.errors.SparkCoreErrors$.numPartitionsGreaterThanMaxNumConcurrentTasksError(SparkCoreErrors.scala:235)
    at org.apache.spark.scheduler.DAGScheduler.checkBarrierStageWithNumSlots(DAGScheduler.scala:609)
    at org.apache.spark.scheduler.DAGScheduler.createResultStage(DAGScheduler.scala:687)
    at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:1355)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3214)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3205)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3194)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1041)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2406)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2427)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2446)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2471)
    at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1046)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:407)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:1045)
    at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:195)
    at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:568)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
    at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
    at java.base/java.lang.Thread.run(Thread.java:840)

Also, looking the Spark executor logs, it seems like the JVM resources seem fine:

[703.318s][info   ][gc,heap,exit   ] Heap
[703.318s][info   ][gc,heap,exit   ]  PSYoungGen      total 2446848K, used 793613K [0x0000000755580000, 0x0000000800000000, 0x0000000800000000)
[703.318s][info   ][gc,heap,exit   ]   eden space 2097664K, 37% used [0x0000000755580000,0x0000000785c837b0,0x00000007d5600000)
[703.318s][info   ][gc,heap,exit   ]   from space 349184K, 0% used [0x00000007eab00000,0x00000007eab00000,0x0000000800000000)
[703.318s][info   ][gc,heap,exit   ]   to   space 349184K, 0% used [0x00000007d5600000,0x00000007d5600000,0x00000007eab00000)
[703.318s][info   ][gc,heap,exit   ]  ParOldGen       total 5592576K, used 104307K [0x0000000600000000, 0x0000000755580000, 0x0000000755580000)
[703.318s][info   ][gc,heap,exit   ]   object space 5592576K, 1% used [0x0000000600000000,0x00000006065dccb8,0x0000000755580000)
[703.318s][info   ][gc,heap,exit   ]  Metaspace       used 91931K, committed 92992K, reserved 1179648K
[703.318s][info   ][gc,heap,exit   ]   class space    used 12356K, committed 12864K, reserved 1048576K
wbo4958 commented 2 weeks ago

Hi @lsli8888

24/08/28 08:18:25 INFO DAGScheduler: Marking ResultStage 2 (fit at /tmp/spark-5efa145b-ef07-496a-8825-3d2e2f37acb7/train_model.py:430) as failed due to a barrier task failed.
24/08/28 08:18:25 INFO TaskSchedulerImpl: Killing all running tasks in stage 2: Task ResultTask(2, 0) from barrier stage ResultStage 2 (fit at /tmp/spark-5efa145b-ef07-496a-8825-3d2e2f37acb7/train_model.py:430) failed.
24/08/28 08:18:25 INFO DAGScheduler: ResultStage 2 (fit at /tmp/spark-5efa145b-ef07-496a-8825-3d2e2f37acb7/train_model.py:430) failed in 25.776 s due to Stage failed because barrier task ResultTask(2, 0) finished unsuccessfully.
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/hadoop/environment/lib64/python3.9/site-packages/xgboost/spark/core.py", line 1054, in _train_booster
    _rabit_args = _get_rabit_args(context, num_workers)
  File "/home/hadoop/environment/lib64/python3.9/site-packages/xgboost/spark/utils.py", line 69, in _get_rabit_args
    env = _start_tracker(context, n_workers)
  File "/home/hadoop/environment/lib64/python3.9/site-packages/xgboost/spark/utils.py", line 58, in _start_tracker
    tracker = RabitTracker(n_workers=n_workers, host_ip=host, sortby="task")
  File "/home/hadoop/environment/lib64/python3.9/site-packages/xgboost/tracker.py", line 73, in __init__
    _check_call(_LIB.XGTrackerCreate(args, ctypes.byref(handle)))
  File "/home/hadoop/environment/lib64/python3.9/site-packages/xgboost/core.py", line 284, in _check_call
    raise XGBoostError(py_str(_LIB.XGBGetLastError()))
xgboost.core.XGBoostError: [08:18:25] /workspace/src/collective/result.cc:78: 
- [socket.h:89|08:18:25]: Failed to bind socket. system error:Cannot assign requested address

Looks like the Rabit Tracker was started on the Executor instead of Driver side according to the File "/home/hadoop/environment/lib64/python3.9/site-packages/xgboost/spark/core.py", line 1054, in _train_booster

The correct code starting RabitTracker should be at https://github.com/dmlc/xgboost/pull/10281/files#diff-8900fd245992de81e0a7b8cb7fcd18c37bae4473e8672e9b11fd7010cac358cfR1027-R1035

Could you please double check if you have really successfuly replaced core.py/estimator.py/utils.py under xgboost/pyspark

lsli8888 commented 2 weeks ago

Hi @wbo4958. Yep, I definitely messed up my Dockerfile and the three files were not copied into the site-packages directory. I'm sorry about that mistake! However, I have fixed the issue now.

I tried again for a configuration consisting of a single Spark executor and 16 GB of RAM each for the driver and executor, 2 cores allocated for each task, and 16 driver and executor cores. The EMR Serverless job once again failed, albeit with different errors this time.

Here's a snippet of the standard output from the Spark driver log:

Traceback (most recent call last):
  File "/tmp/spark-2df01013-276b-46b4-ae1b-60ee4cf5eb88/train_model.py", line 511, in <module>
    main()
  File "/home/hadoop/environment/lib64/python3.9/site-packages/my_utils/utils/logger_utils.py", line 43, in wrapper
    return func(*args, **kwargs)
  File "/home/hadoop/environment/lib64/python3.9/site-packages/click/core.py", line 1157, in __call__
    return self.main(*args, **kwargs)
  File "/home/hadoop/environment/lib64/python3.9/site-packages/click/core.py", line 1078, in main
    rv = self.invoke(ctx)
  File "/home/hadoop/environment/lib64/python3.9/site-packages/click/core.py", line 1434, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/home/hadoop/environment/lib64/python3.9/site-packages/click/core.py", line 783, in invoke
    return __callback(*args, **kwargs)
  File "/tmp/spark-2df01013-276b-46b4-ae1b-60ee4cf5eb88/train_model.py", line 365, in main
    train_model(
  File "/home/hadoop/environment/lib64/python3.9/site-packages/my_utils/utils/logger_utils.py", line 60, in timed
    result: Callable = method(*args, **kw)
  File "/home/hadoop/environment/lib64/python3.9/site-packages/my_utils/utils/logger_utils.py", line 43, in wrapper
    return func(*args, **kwargs)
  File "/tmp/spark-2df01013-276b-46b4-ae1b-60ee4cf5eb88/train_model.py", line 430, in train_model
    xgb_reg_model = xgb_regressor.fit(df_train)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/ml/base.py", line 205, in fit
  File "/home/hadoop/environment/lib64/python3.9/site-packages/xgboost/spark/core.py", line 1035, in _fit
    rabit_args = _get_rabit_args(driver_host, num_workers)
  File "/home/hadoop/environment/lib64/python3.9/site-packages/xgboost/spark/utils.py", line 68, in _get_rabit_args
    env = _start_tracker(host, n_workers)
  File "/home/hadoop/environment/lib64/python3.9/site-packages/xgboost/spark/utils.py", line 57, in _start_tracker
    tracker = RabitTracker(n_workers=n_workers, host_ip=host, sortby="task")
  File "/home/hadoop/environment/lib64/python3.9/site-packages/xgboost/tracker.py", line 64, in __init__
    get_family(host_ip)  # use python socket to stop early for invalid address
  File "/home/hadoop/environment/lib64/python3.9/site-packages/xgboost/tracker.py", line 14, in get_family
    return socket.getaddrinfo(addr, None)[0][0]
  File "/usr/lib64/python3.9/socket.py", line 954, in getaddrinfo
    for res in _socket.getaddrinfo(host, port, family, type, proto, flags):
socket.gaierror: [Errno -2] Name or service not known
2024-08-29 01:06:15.809 [INFO] py4j.clientserver: Closing down clientserver connection

Here's a snippet of the standard error from the Spark driver log:

24/08/29 01:06:15 ERROR TransportRequestHandler: Error sending result StreamResponse[streamId=/files/169448f8c7f5d18bef0895058b2ef6b518055c10d02a26e665865d7f4e7cb9c1.zip,byteCount=2263051466,body=FileSegmentManagedBuffer[file=/tmp/spark-2df01013-276b-46b4-ae1b-60ee4cf5eb88/169448f8c7f5d18bef0895058b2ef6b518055c10d02a26e665865d7f4e7cb9c1.zip,offset=0,length=2263051466]] to /[2600:1f16:ab4:a00:af9a:a730:9175:613d]:39094; closing connection
io.netty.channel.StacklessClosedChannelException: null
    at io.netty.channel.AbstractChannel.close(ChannelPromise)(Unknown Source) ~[netty-transport-4.1.100.Final.jar:4.1.100.Final]
24/08/29 01:06:17 INFO SparkContext: Successfully stopped SparkContext

Here's a snippet of the standard error from the Spark executor log:

24/08/29 01:06:14 INFO Executor: Fetching spark://[2600:1f16:ab4:a00:18cb:4e30:7054:cf7a]:38681/files/169448f8c7f5d18bef0895058b2ef6b518055c10d02a26e665865d7f4e7cb9c1.zip#environment with timestamp 1724893537240
24/08/29 01:06:14 INFO Utils: Fetching spark://[2600:1f16:ab4:a00:18cb:4e30:7054:cf7a]:38681/files/169448f8c7f5d18bef0895058b2ef6b518055c10d02a26e665865d7f4e7cb9c1.zip to /tmp/spark-5e0ba5d2-f802-49e7-ac39-5466390b7564/fetchFileTemp7332881780438855624.tmp
24/08/29 01:06:24 ERROR CoarseGrainedExecutorBackend: Executor self-exiting due to : Unable to create executor due to null
java.nio.channels.ClosedChannelException: null
    at org.apache.spark.network.client.StreamInterceptor.channelInactive(StreamInterceptor.java:62) ~[spark-network-common_2.12-3.5.0-amzn-1.jar:3.5.0-amzn-1]
    at org.apache.spark.network.util.TransportFrameDecoder.channelInactive(TransportFrameDecoder.java:223) ~[spark-network-common_2.12-3.5.0-amzn-1.jar:3.5.0-amzn-1]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:305) ~[netty-transport-4.1.100.Final.jar:4.1.100.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281) ~[netty-transport-4.1.100.Final.jar:4.1.100.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274) ~[netty-transport-4.1.100.Final.jar:4.1.100.Final]
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1405) ~[netty-transport-4.1.100.Final.jar:4.1.100.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:301) ~[netty-transport-4.1.100.Final.jar:4.1.100.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281) ~[netty-transport-4.1.100.Final.jar:4.1.100.Final]
    at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:901) ~[netty-transport-4.1.100.Final.jar:4.1.100.Final]
    at io.netty.channel.AbstractChannel$AbstractUnsafe$7.run(AbstractChannel.java:813) ~[netty-transport-4.1.100.Final.jar:4.1.100.Final]
    at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173) ~[netty-common-4.1.100.Final.jar:4.1.100.Final]
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:166) ~[netty-common-4.1.100.Final.jar:4.1.100.Final]
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470) ~[netty-common-4.1.100.Final.jar:4.1.100.Final]
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:566) ~[netty-transport-4.1.100.Final.jar:4.1.100.Final]
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) ~[netty-common-4.1.100.Final.jar:4.1.100.Final]
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.100.Final.jar:4.1.100.Final]
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.100.Final.jar:4.1.100.Final]
    at java.lang.Thread.run(Thread.java:840) [?:?]
24/08/29 01:06:24 INFO CoarseGrainedExecutorBackend: Driver commanded a shutdown
24/08/29 01:06:24 INFO CoarseGrainedExecutorBackend: Driver from 2600:1f16:ab4:a00:18cb:4e30:7054:cf7a:38681 disconnected during shutdown
24/08/29 01:06:24 INFO CoarseGrainedExecutorBackend: Driver from [2600:1f16:ab4:a00:18cb:4e30:7054:cf7a]:38681 disconnected during shutdown
wbo4958 commented 2 weeks ago

Hi @trivialfis, I don't know where it is wrong, could you help take a loot at it?

trivialfis commented 2 weeks ago

socket.gaierror: [Errno -2] Name or service not known

This error is raised from the Python socket module to signal an invalid address. We will have to spin up a EMR cluster to debug it's network configuration. Maybe it's firewall, maybe there are some quirks on EMR, I don't know. Safe to assume that spark is returning an address that can not be used by normal TCP socket.

I have seen some errors caused by invalid DNS configuration on another platform, Networking can be tricky.

lsli8888 commented 2 weeks ago

Thanks @trivialfis! FYI, I'm using EMR Serverless rather than the regular EMR. Not sure if that makes too much of a difference, but just wanted to let you know.