opentargets / orchestration

Open Targets pipeline orchestration layer
Apache License 2.0
1 stars 0 forks source link

perf(efm): enhanced flexibility mode in genetics etl cluster #63

Closed project-defiant closed 3 weeks ago

project-defiant commented 3 weeks ago

Context

Introduction of the new SuSiE credible sets from gwas_catalog (gs://gwas_catalog_sumstats_susie/credible_set_clean) resulted in ColocStep failures. Job performed on otg-etl cluster took ~4h and did not finish in that time - see job.

The most of the error logs trace the fact that he executors got lost during the job execution.

java.io.IOException: Connecting to otg-etl-sw-0qf7.europe-west1-d.c.open-targets-genetics-dev.internal/10.132.0.54:41329 failed in the last 4750 ms, fail this connection directly
    at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:214)
    at org.apache.spark.network.shuffle.ExternalBlockStoreClient.lambda$fetchBlocks$0(ExternalBlockStoreClient.java:132)
    at org.apache.spark.network.shuffle.RetryingBlockTransferor.transferAllOutstanding(RetryingBlockTransferor.java:173)
    at org.apache.spark.network.shuffle.RetryingBlockTransferor.lambda$initiateRetry$0(RetryingBlockTransferor.java:206)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)

The otg-etl cluster uses the following autoscaling policy

{
  "id": "otg-etl",
  "name": "projects/open-targets-genetics-dev/regions/europe-west1/autoscalingPolicies/otg-etl",
  "basicAlgorithm": {
    "yarnConfig": {
      "scaleUpFactor": 1,
      "scaleDownFactor": 1,
      "gracefulDecommissionTimeout": "10800s"
    },
    "cooldownPeriod": "120s"
  },
  "workerConfig": {
    "minInstances": 2,
    "maxInstances": 2,
    "weight": 1
  },
  "secondaryWorkerConfig": {
    "maxInstances": 100,
    "weight": 1
  }
}

Which specifies the ratio of secondaryWorkers to primaryWorkers to be max 100:2.

[!CAUTION] In case of the Coloc step, which requires many intensive shuffle operations, if the input dataset increases, the complexity of shuffling will also increase, thus resulting in time increase. This is a potential issue when we store the shuffle partitions on lost exectuors, as described in the EFM (Enhanced Flexibility Mode) description. Losing a worker will make the task restart, resulting in run time elongation.

Further more this can not be determined in advance due to the nature of preemptible (secondary) workers.

To accomodate for the lost shuffle partitions the EFM mode can be utilized.

The EFM mode will make the cluster to save the shuffle partitions only on primary workers. This will mean that we have to accomodate the disk size of the workers and effectively change the autoscaling policy, as EFM does not support the graceful decomissioning of workers.

Changes

The above tweaks were added to the existing dataproc cluster setup to accomodate the shuffling operations in Coloc step:

Additionally

[!NOTE] The Coloc step succeded in 1h 20 min with the EFM mode enabled - see job - we still experienced executor loses

project-defiant commented 3 weeks ago

@ireneisdoomed you are right, we will be storing the shuffle partitions in the primary workers only in EFM mode, as far as the documentation is correct :)

The new strategy makes sense to me. I think it is interesting that we only see the issue in eCAVIAR since I consider COLOC to be more optimised (although it is more complex too).

This could have been non-deterministic. eCaviar took ~3h to complete, while coloc running at the same time failed to compute even after 4 hours. The initial issue, could have been that the fraction of executors running ecaviar were decomissioned in comparision to executors running coloc. It may be that one overlaps succeded, but the others failed. Either way we can not rely on the cluster in preemptible mode so easly, as any step that is running for a long time can be affected this way. Typically from my experiments, after ~30minutes the preemption can appears always, so any data that is not cached up until this stage, will get lost and has to be recomputed.

Another option to investigate is if we could actually retrieve the offending shuffle and cache or even checkpoint the data when before that stage. Then we would do what EFM tries to do, just manually from the code.

My only comment, correct me if I'm wrong, is that we are going to use the same cluster with the same EFM strategy for all ETL jobs, also for those where tasks don't involve heavy shuffling. Do you think removing graceful decommission from other steps will have a significant impact?

This is open to discussion. I am not sure if we will benefit anyhow on other tasks, since they are small and fast in comparission to colocalisation, but then we have to have two clusters, as EFM can be only set during the cluster setup, can not be updated.

Graceful decomissioning is there for downscaling the number of workers. As you might guess, the number of primary workers is fixed in the otg-efm to 10, so we always will have these workers available due to the fact we need more space for the shuffle partitions. This inclines that we can not downscale the cluster primary workers, otherwise we still lose the shuffle partitions that were there. I am not aware if we can distinguish between primary and secondary workers graceful decomissioning unfortunately.