apache / incubator-uniffle

Uniffle is a high performance, general purpose Remote Shuffle Service.
https://uniffle.apache.org/
Apache License 2.0
366 stars 142 forks source link

[FEATURE] [tez] Avoid recompute succeeded task. #1011

Open zhengchenyu opened 1 year ago

zhengchenyu commented 1 year ago

Code of Conduct

Search before asking

Describe the feature

In general local shuffle, some succeeded task may be recomputed. For example, NM is unhealthy, reduce can not read the shuffle date from map which have run successfully in unhealthy NM. But for remote shuffle, the shuffle data have store in remote shuffle service, recompute the succeeded task which have run in unhealthy node is unnecessary.

Motivation

No response

Describe the solution

No response

Additional context

No response

Are you willing to submit PR?

zhengchenyu commented 1 year ago

Analysis of recomputed tez task with uniffle

1 Background

Tez generally have mechanisms to recompute successful tasks. For example, without remote shuffle service, if the NM is unhealthy, the data generated by the map have done on unhealthy node cannot be provided to the downstream reduce. Therefore, it is necessary to recompute the task on a healthy NM node. Therefore, in general, tez will provide some mechanisms to recompute upstream succeeded tasks.

But in the RemoteShuffleService mode, if the task has successfully sent data to the remote shuffle service. Then there are no need to recompute upstream succeeded tasks when NM is unavailable. That means we have to analyze how tasks in the SUCCEEDED state are recomputed.

2 Why the succeeded task recompute

Task recompute mainly refers to the task that is already in the SUCCEEDED state, which becomes non-SUCCEEDED for some reason, and the task is recompute after certain conditions are met. We need to identify which logics in tez will generate task recompute, and identify which ones do not need to be recompute in remote shuffle mode, and prohibit them.

Tez mainly organizes tasks according to DAG->Vertx->Task->TaskImpl. there are several possible situations Will cause the task already in the SUCCEEDED state to recomputed:

2.a Too many taskattempts failed to run on one node

reason

If there are enough taskattempts that fail to run on one node (more than tez.am.maxtaskfailures.per.node, the default is 10), kill all taskattempts of all containers on this node, and let them compute again if necessary.

action

This logical should be disabled in remote shuffle service mode.

2.b When the blacklist function is turn on, will check of all node, then run the logical (a)

reason

When the number of blacklisted machines of the task is greater than 33% of the total machines (tez.am.node-blacklisting.ignore-threshold-node-percent), it will enter the function of ignoring the blacklist. If it changes from higher than 33% to less than 33%, start this function. During this process, all nodes will be checked, and the logic of (2.a) will be used to check each node, and recompute if necessary.

action

This type of rerun needs to be disabled in RSS mode....

2.c When notify by RM that node is in an unhealthy state

reason

When the node becomes unhealthy, if tez.am.node-unhealthy-reschedule-tasks is set to true (the default is false), kill all TaskAttempts of all Containers on this node, and let it recompoute if necessary.

action

In remote shuffle service mode, it is necessary to prohibit such recompute, just set tez.am.node-unhealthy-reschedule-tasks to false.

2.d Sufficient downstream notification read failures

reason

When there are enough downstream nodes that fail to read task notifications, the TaskAttempt will be killed, and recompute is necessary.

processing action

In RSS mode, if many downstream notifications fail to be read, there may be a problem with the uniffle service or some kind of calculation error, directly causing the task to fail or recompute all tasks of this vertex.

chinese document: analysis_1011.md detailed analysis: state_detailed.md

jerqi commented 1 year ago

@zhengchenyu First, we can let job fail. Then, we can optimize the logic like https://github.com/apache/incubator-uniffle/issues/477

zhengchenyu commented 1 year ago

3 Reproduce the problem

3.1 Reproduce problem (a): Too many taskattempts failed to run on the node

Steps to reproduce:

In this example, we found that the 0th attempt of Task 19 in the Tokenizer is completed first.

3 1 1

Then we killed all non-AppMaster container on this node. Then after a while, we found that Task 19 had reomputed.

3 1 2

See the following error. Note that if there are more than tez.am.maxtaskfailures.per.node task failures, the node will be added to the blacklist.

3 1 3

3.2 Reproduce problem (b)

same with (a), omit.

3.3 Reproducibility problem (c): When notify by RM that node is in an unhealthy state.

Steps to reproduce:

In this example, we can see that Task 6, 7, and 8 have been completed at this time.

3 3 1

Then we decommission the NM which task 6,7,8 have computed successfully. We found that tasks 6, 7, and 8 have been recomputed.

3 3 2 3 3 3

And I also added some logs to verify kill the succeeded task after the node is unhealthy, detailed see the patch.

3.4 Problem recurrence (d): Sufficient downstream notification read failures

We can stuck shuffle server to reproduce process is as follows:

In fact, the task is not recomputed as expected, but the reduce will report the following error and directly cause the task to fail. Because RssShuffleScheduler::copyFailed is not called under tez rss, it also means that the InputReadErrorEvent event is not sent to AM.

2023-07-20 12:41:12,932 [ERROR] [ShuffleAndMergeRunner {Tokenizer}] |orderedgrouped.RssShuffle|: Tokenizer: RSSShuffleRunner failed with error
org.apache.tez.runtime.library.common.shuffle.orderedgrouped.RssShuffle$RssShuffleError: Error during shuffle
    at org.apache.tez.runtime.library.common.shuffle.orderedgrouped.RssShuffle$RssRunShuffleCallable.callInternal(RssShuffle.java:289)
    at org.apache.tez.runtime.library.common.shuffle.orderedgrouped.RssShuffle$RssRunShuffleCallable.callInternal(RssShuffle.java:281)
    at org.apache.tez.common.CallableWithNdc.call(CallableWithNdc.java:36)
    at org.apache.uniffle.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:131)
    at org.apache.uniffle.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:74)
    at org.apache.uniffle.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:82)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.uniffle.common.exception.RssFetchFailedException: Get shuffle result is failed for appId[appattempt_1683514060868_80962_000001], shuffleId[1000001]
    at org.apache.uniffle.client.impl.ShuffleWriteClientImpl.getShuffleResult(ShuffleWriteClientImpl.java:618)
    at org.apache.tez.runtime.library.common.shuffle.orderedgrouped.RssShuffleScheduler.constructRssFetcherForPartition(RssShuffleScheduler.java:1580)
    at org.apache.tez.runtime.library.common.shuffle.orderedgrouped.RssShuffleScheduler.access$1600(RssShuffleScheduler.java:104)
    at org.apache.tez.runtime.library.common.shuffle.orderedgrouped.RssShuffleScheduler$RssShuffleSchedulerCallable.callInternal(RssShuffleScheduler.java:1520)
    at org.apache.tez.runtime.library.common.shuffle.orderedgrouped.RssShuffleScheduler$RssShuffleSchedulerCallable.callInternal(RssShuffleScheduler.java:1440)
    at org.apache.tez.common.CallableWithNdc.call(CallableWithNdc.java:36)
    at org.apache.tez.runtime.library.common.shuffle.orderedgrouped.RssShuffleScheduler.start(RssShuffleScheduler.java:495)
    at org.apache.tez.runtime.library.common.shuffle.orderedgrouped.RssShuffle$RssRunShuffleCallable.callInternal(RssShuffle.java:287)
    ... 8 more

4 Solutions

4.1 Problem (a) Solution

In TerminatedAfterSuccessTransition, we just return the succeeded, do not any other operations. see the attachment patch.

For now, I do it in tez source code. Let me find a way to do it in uniffle.

4.2 Problem (b) Solution

same question (a)

4.3 Problem (c) Solution

Set tez.am.node-unhealthy-reschedule-tasks to false (the default is false).

4.4 Problem (d) Solution

Chinese document: reproduce_1011.md Verify log patch: 0001-add-log.patch Fix problem(a) and (b) patch: 0001-rss-task.patch

okumin commented 1 year ago

I'm still new to Uniffle and I might be very wrong.

I know we can annotate the reliability of source vertices per edge. It could be an option to implement the feature of rss.avoid.recompute.succeeded.task in Tez side, and Uniffle just uses the edge type. https://github.com/apache/tez/blob/rel/release-0.10.2/tez-api/src/main/java/org/apache/tez/dag/api/EdgeProperty.java#L78-L82

zhengchenyu commented 1 year ago

@okumin

Yes! It'd better that rss.avoid.recompute.succeeded.task take effect in tez side. Why do I implement this in uniffle side? Because I do not want to change the code of tez. Because I think wait for tez release is too slow.

In this PR, I use a trick way to implement it. You can see: https://github.com/apache/incubator-uniffle/blob/ef7f392eb8f2a918f9305483275e444991759f6d/client-tez/src/main/java/org/apache/tez/dag/app/RssDAGAppMaster.java#L520

Even though uniffle just use edge type, I reset the dispatcher. All TA_NODE_FAILED events which want to send to SUCCEEDED TaskAttempt is ignored.

I test this function in test cluster and integration test, it need more production experience, so I disable it in default for now. Can you please review and give some suggestion? or give me more your production experience?