apache / celeborn

Apache Celeborn is an elastic and high-performance service for shuffle and spilled data.
https://celeborn.apache.org/
Apache License 2.0
893 stars 361 forks source link

[CELEBORN-1720] Prevent stage re-run if another task attempt is running or successful #2921

Open turboFei opened 1 week ago

turboFei commented 1 week ago

What changes were proposed in this pull request?

Prevent stage re-run if another task attempt is running.

If a shuffle read task can not read the shuffle data and the task another attempt is running or successful, just throw the CelebornIOException instead of FetchFailureException.

The app will not failure before reach the task maxFailures.

image

Why are the changes needed?

I met below issue because I set the wrong parameters, I should set spark.celeborn.data.io.connectTime=30s but set the spark.celeborn.data.io.connectionTime=30s, and the Disk IO Utils was high at that time.

  1. speculation is enabled
  2. one task failed to fetch shuffle 0 in stage 5.
  3. then it triggered the stage 0 re-run (stage 4)
  4. then stage 5 retry, however, no task run in stage 5 (retry 1) image
  5. because the speculation task succeeded, so no task in stage 5(retry 1) image

Due the stage re-run is heavy, so I wonder that, we should ignore the shuffle fetch failure, if there is another task attempt running.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

UT for the SparkUtils method only, due it is impossible to add UT for speculation.

https://github.com/apache/spark/blob/d5da49d56d7dec5f8a96c5252384d865f7efd4d9/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L236-L244

image

For local master, it would not start the speculationScheduler.

https://github.com/apache/spark/blob/d5da49d56d7dec5f8a96c5252384d865f7efd4d9/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L322-L346

image

and it is also not allowed to launch speculative task on the same host.

turboFei commented 1 week ago

Seems difficult to add UT, how do you think about? @FMX

turboFei commented 4 days ago

It is too difficult to add the UT.

Gentle ping @mridulm

FMX commented 3 days ago

Seems difficult to add UT, how do you think about? @FMX

Hi, I see this PR. IMO, you can add a test config to trigger task hang and fetch failure in certain map tasks. Maybe it won't be too difficult to add UTs.

turboFei commented 3 days ago

R. IMO, you can add a test config to trigger task hang and fetch failure in certain map tasks. Maybe it won't be too difficult to add UTs.

Thanks, added UT and tested locally.

turboFei commented 3 days ago

The UT is invalid, checking

turboFei commented 2 days ago

https://github.com/apache/spark/blob/d5da49d56d7dec5f8a96c5252384d865f7efd4d9/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L236-L244

image

For local master, it would not start the speculationScheduler.

turboFei commented 2 days ago

https://github.com/apache/spark/blob/d5da49d56d7dec5f8a96c5252384d865f7efd4d9/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L322-L346

image

and it is also not allowed to launch speculative task on the same host.

@FMX

I have to give up the UT for speculation ...

And only add UT for SparkUtils.