apache / incubator-uniffle

Uniffle is a high performance, general purpose Remote Shuffle Service.
https://uniffle.apache.org/
Apache License 2.0
381 stars 148 forks source link

Rework stage retry #1840

Open zuston opened 4 months ago

zuston commented 4 months ago

After revisiting the stage retry feature, I found some obvious bugs that were not ready for production use. This issue is to track the design,existing problems and solutions.

Background and design

Some background and design are included in the previous design doc and github issues. You can search the github to retrieve them to know more about this feature.

  1. https://github.com/apache/incubator-uniffle/issues/477
  2. https://docs.google.com/document/d/1OGswqDDQ52rpw5Lat-FpEGDfX1T6EFXYLC7L0G5O4WE/edit?usp=sharing
  3. https://github.com/apache/incubator-uniffle/issues/825

The stage retry whole design is based on the Spark's fetchFailedException , once spark scheduler accepts this exception thrown from task, it will not schedule the next tasks in this stage and trigger the stage resubmit to run. That means uniffle will use this exception as the way to trigger whatever encountering fetch/write failure.

Leveraging from this mechanism, Uniffle wants to hack this to acheive the stage retry ability to improve the spark jobs stability, especially on the case of shuffle-server shutdown unnormally.

The shuffle task failure could be split into 2 parts: write failure and read(fetch) failure, that should be digged separately and together. Below description will be shown by my understanding correct solution.

All the below design architecture is based on the consensus that we want to keep the spark's original semantic of shuffleId.

Write failure

Once the task failed on the write failure that may be shuffle-server hang or shutdown or network lost..., task will ask shuffle manager whether throwing spark's fetchFailedException. At this time, shuffle manager will check whether the task failure reaches the spark max failure threshold. Once this condition is satisfied, this will make task throw and fail.

To avoid the data duplication, the unregister map output tracker related with the shuffleId is necessary.

Fetch failure

Fetch failure shows the similar logic with the above write failure.

The special point of fetch failure is that will rerun all the map tasks from the upstream stage, which is different with the spark vanilla shuffle service that only will trigger the maps executed on the broken servers.

Write + fetch failure together

Write and fetch failures won't happen at the same time, that's ensured by the spark, that's not pipelined data flow.

For uniffle shuffle manager, the write and fetch failure problem is on the retry condition. But I think the shuffle manager only needs to worry about the reported index of task attempts, whatever the failure type.

QA

How to clear previous stage attempt data and reassign

Before answering this question, another question is that do you really need to clear previous stage data. I think this is necessary

  1. Current blockId layout's taskAttemptId is not real unique taskId, that means we can't depend this to filter the retry tasks outdate data. (Although in the following QA, I will re-introduce this real task attempt id into blockId layout)
  2. Too much data will make client read unnecessary memory data, although the localfile data could be filtered out by the index file. This will bring too much cost
  3. Outdate data remains on the shuffle-server that will waste disk/memory space, this is unnessary

Based on the above thought, let's answer the initial question.

The unregister and reassign will occur on the first request that reaches the retry condition with the lock to make other tasks' ask requests to hang.

Once the reassign is finished, shuffle manager will register the next stage attempt shuffle to the reassignment shuffle-servers. At this time, server will accept this request and then to clear out the previous data, including in the memory/disk/hdfs.

We hope this deletion is quick, but properly not. From my survey, the 200g disk file with 3000 partition costs 40s. this is not accpetable for a grpc request.

So the two phase deletion should be introduced to solve the problem of data visibility and delete speed, that splits the deletion into rename sync and delete async 2 phases. This is based on the rename quick speed of disk and hdfs. If having object store, the first phase will not use the rename mechansim, or store some metadata into memory and then to rename. However, there are always ways to solve the OSS storage type.

And in now codebase, the report failure and reassignOnStageRetry is splitted into 2 rpcs, it's necessary to unify them into one.

How to ensure shuffle data consistency in different stage attempt

Everything looks fine now, but some corner cases are ignored especially on the high-concurrent jobs.

When deleting previous shuffle data, the another writer or reader may read the same partition data.

For the writer, it's hard to use lock to ensure consistency as it need to ensure write throughput. So in this case, we hope that the partial leak data could be filtered out from the client side by the task attempt id. Based on this, the real unique task attemptId will be introduced here to solve this problem, which may bring some limitation for the huge jobs, but I think this is necessary and worthwhile.

For the reader, the premise of accuracy is to obtain the real blockId bitmap, so locking is required here by stage attempt number check.

Next step

The current code has some bugs described on the above Q&A, so this is time to create some issues to solve them.

  1. Unify the fetch failure + write failure retry checking into the RssStageResubmitManager, the checking logic may be different but we could distingush them and use different strategies.
  2. RPC merge from 2 rpcs of reportXXFailure and reassignOnStageRetry into reportXXFailure, that include the reassign into one request.
  3. Introduce the real unique taskAttemptId into uniffle by the indiviual config option to make client filter out previous stage data correctly
  4. Ensure the getting shuffle result safe
  5. Some bugs
PengleiShi commented 4 months ago

The stage retry whole design is based on the Spark's fetchFailedException , once spark scheduler accepts this exception thrown from task, it will kill all the running tasks in this stage and trigger the stage resubmit to run.

AFAIK, vanillia spark will not kill any task if FetchFailed occured, new stage attempt will submit missing tasks which are not finished. Does it have same behavior with rss enabled?

zuston commented 4 months ago

The stage retry whole design is based on the Spark's fetchFailedException , once spark scheduler accepts this exception thrown from task, it will kill all the running tasks in this stage and trigger the stage resubmit to run.

AFAIK, vanillia spark will not kill any task if FetchFailed occured, new stage attempt will submit missing tasks which are not finished. Does it have same behavior with rss enabled?

Good to know this, and I have checked this part that spark will not kill all running tasks. Let me fix this description

qqqttt123 commented 4 months ago
  1. Exchange reuse.
  2. Map stiil runs after stage finished
  3. Limit
  4. Stage fail, shuffle will consists of multple attempts.
zuston commented 4 months ago

To ensure the data consistency, I will support the original unique task attempt id in blockId layout for the stage retry to filter out the previous stage data. PTAL @EnricoMi

zuston commented 4 months ago

I don't find out any concept that the limit operator will get the stage attempt 0 + stage attempt 1 data to be as the normal data pipeline. Could you give me some background or spark issues? @jerqi

zuston commented 4 months ago

Could you help review this? @advancedxy I want to go on to finish this feature.

advancedxy commented 4 months ago

Could you help review this? @advancedxy I want to go on to finish this feature.

Thanks for ping me. I maybe able to review your design and PRs this week. And by the way, I haven't followed how shuffle writer failure triggered a stage recompute, I may be wrong about that.

Some inline comments.

task will ask shuffle manager whether throwing spark's fetchFailedException. At this time, shuffle manager will check whether the task failure reaches the spark max failure threshold. Once this condition is satisfied, this will make task throw and fail.

I don’t think this is correct per se? ShuffleWriteFailures should not trigger aFetchFailedException from executor side which might mess around Spark's driver logic. The shuffle write stage doesn't involve a shuffle fetch if it's not read from another shuffle stage. I think a more appropriate approach would be that the shuffle writer reports its write failures and the shuffle manager decides whether to retry the whole stage or simply relaunch tasks in the same stage with different shuffler assignments. If it decides to retry the whole stage, it might need to do something hacky to fail the current shuffle write stage fast and unregister all its map outputs. This logic is similar like how Spark handles its barrier tasks. However, it might be hard to simulate that case from Uniffle's perspective.

Write and fetch failures won't happen at the same time, that's ensured by the spark, that's not pipelined data flow.

it might be as long as there's one stage that reading from Uniffle's shuffle and writing shuffle data to uniffle too. There's only one instance of exception per task, though. So I think the shuffle manager should be aware of that when receiving shuffle fetch failures and write failures.

So the two phase deletion should be introduced to solve the problem of data visibility and delete speed, that splits the deletion into rename sync and delete async 2 phases.

Hmm, we don't encode stage attempt number in the output path? If the stage attempt number is encoded, I think we can just leave the shuffle data as it is and put it into a asyn delete queue if necessary.

EnricoMi commented 4 months ago

I strongly object to moving back to the original unique task attempt id which limits the number of tasks or partitions, which renders Uniffle unusable in our production environment.

I am happy to discuss alternative approaches like adding the stage attempt no, similar to us adding the task attempt no (#1529), to the block id.

EnricoMi commented 4 months ago

Can you elaborate on why the current block id scheme does not work for stage retry given all data of previous stage gets erased?

zuston commented 3 months ago

Can you elaborate on why the current block id scheme does not work for stage retry given all data of previous stage gets erased?

Sorry for the late reply (Pity. I'm busy for my daily work). If the stage retry, the taskAttemptId in the blockId is not unique for the multiple stage retry. Right?

zuston commented 3 months ago

I strongly object to moving back to the original unique task attempt id which limits the number of tasks or partitions, which renders Uniffle unusable in our production environment.我强烈反对回到原来的唯一任务尝试 ID,这会限制任务或分区的数量,从而导致 Uniffle 在我们的生产环境中无法使用。

I am happy to discuss alternative approaches like adding the stage attempt no, similar to us adding the task attempt no (#1529), to the block id.我很高兴讨论替代方法,例如添加阶段尝试编号,类似于我们将任务尝试编号 ( #1529 ) 添加到块 ID。

Yes. This is the important concern for this feature. So I want to try whether to add the retry attempt number into memory/file prefix or memory data structure indicator

EnricoMi commented 3 months ago

Before retrying a stage, isn't all data deleted of the earlier attempt?