Flink supports fallback to vanilla Flink built-in shuffle implementation.
Why are the changes needed?
When quota is unenough or workers are unavailable, RemoteShuffleMaster does not support fallback to NettyShuffleMaster, and RemoteShuffleEnvironment does not support fallback to NettyShuffleEnvironment at present. Flink should support fallback to vanilla Flink built-in shuffle implementation for unenough quota and unavailable workers.
Does this PR introduce any user-facing change?
Introduce ShuffleFallbackPolicy interface to determine whether fallback to vanilla Flink built-in shuffle implementation.
/**
* The shuffle fallback policy determines whether fallback to vanilla Flink built-in shuffle
* implementation.
*/
public interface ShuffleFallbackPolicy {
/**
* Returns whether fallback to vanilla flink built-in shuffle implementation.
*
* @param shuffleContext The job shuffle context of Flink.
* @param celebornConf The configuration of Celeborn.
* @param lifecycleManager The {@link LifecycleManager} of Celeborn.
* @return Whether fallback to vanilla flink built-in shuffle implementation.
*/
boolean needFallback(
JobShuffleContext shuffleContext,
CelebornConf celebornConf,
LifecycleManager lifecycleManager);
}
Introduce celeborn.client.flink.shuffle.fallback.policy config to support shuffle fallback policy configuration.
What changes were proposed in this pull request?
Flink supports fallback to vanilla Flink built-in shuffle implementation.
Why are the changes needed?
When quota is unenough or workers are unavailable,
RemoteShuffleMaster
does not support fallback toNettyShuffleMaster
, andRemoteShuffleEnvironment
does not support fallback toNettyShuffleEnvironment
at present. Flink should support fallback to vanilla Flink built-in shuffle implementation for unenough quota and unavailable workers.Does this PR introduce any user-facing change?
ShuffleFallbackPolicy
interface to determine whether fallback to vanilla Flink built-in shuffle implementation.celeborn.client.flink.shuffle.fallback.policy
config to support shuffle fallback policy configuration.How was this patch tested?
RemoteShuffleMasterSuiteJ#testRegisterJobWithForceFallbackPolicy
WordCountTestBase#celeborn flink integration test with fallback - word count