pangeo-forge / pangeo-forge-cloud-federation

Infrastructure for running pangeo-forge across multiple bakeries
Apache License 2.0
3 stars 6 forks source link

Flink tuning #14

Open thodson-usgs opened 9 months ago

thodson-usgs commented 9 months ago

I think we need to tune Flink before this forge can really cook. Opening this issue to start the discussion.

Here are some initial ideas:

  1. Anytime I scale up I start hitting ImagePullBackOff errors. To avoid this, we could clone our own Flink image and have the workers pull from it. I need to investigate the extent this can be setup in Terraform. Helm seems a natural place to start: https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/operations/helm/
  2. I believe those ImagePullBackOffs are also causing jobs to fail with org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy . By default, I don't think Flink will handle any failures. Once we configure this, I think execution will be much more reliable.
  3. As the flink configuration becomes more complicated, it might make sense to keep a separate flink-config.yaml that's read in like yamlencode(file(fink-config.yaml)). Some of this config may be tuned per runner, which is a reason for separating it from the static Terraform.
  4. I'm by no means an expert, but before diving deeper into Flink, we might revisit whether it's the right technology or whether Spark is a better choice for batch work. @ranchodeluxe has put in a lot getting Flink moving but also has expressed some frustration. I'm still holding out hope for Flink, but I think we'll have a better sense after some tuning. (Apparently Spark has been investigated here pangeo-forge/pangeo-forge-runner#133)
  5. As discussed below, we can currently enable task restarts by configuring flink via a recipe's config.py. But before we can enable job manager restarts (called High Availability in Flink), we'll need to add a shared filesystem where Flink can store job metadata.
cisaacstern commented 9 months ago

This is exciting!

thodson-usgs commented 9 months ago

THIS IS COMMENT IS SUPERSEDED; IGNORE

Restart strategies help but are by no means a good solution to the ImagePullBackOff errors. We need to clone the Flink image and point the operator to our copy.

NOT SURE ABOUT THE NEXT POINT. NEED TO TEST FURTHER. ...and they might be worse than I first thought. Given 40 task managers, if one fails from ImagePullBackOff, I think all 40 tasks are restarting 😬. Flink has more failure configurations. Hopefully a fix is in there, but it's deeper than I first hoped. I'm currently testing with only

restart-strategy.type: exponential-delay
restart-strategy.exponential-delay.max-backoff : 20 min

...or I might just be confused about how many tasks should occur. This job has 880 tasks and parallelism=40, so maybe it's normal that I'm at taskmanager-324 and counting.

... .... and apparently, I'm still using SPOT instances!? πŸ™„ ...fml and try again...killing gpcp-from-gcs at 1hr 30min in

thodson-usgs commented 9 months ago

THIS IS COMMENT IS SUPERSEDED BUT IT DESCRIBES A FAILURE MODE THAT SHOULD BE ADDERSSED

Testing another config. I've left quotes as they appear in config.py, but they'll get parsed out before getting to Flink.

   # HA services https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/ha/kubernetes_ha/
   "high-availability.type": "kubernetes",
   "high-availability.storageDir": "s3://usgs-pforge-us-west-2-flink-cache/recovery", # manually created
   "jobmanager.execution.failover-strategy" : "region",
   "restart-strategy" : "fixed-delay",
   "restart-strategy.fixed-delay.attempts": "5",
   "restart-strategy.fixed-delay.delay": "10 s",

Still several problems here:

  1. storageDir is empty
  2. killed tasks will restart (good), but at some point all the other tasks restart as well (bad).
  3. and if a task is killed, the whole job continues on restarting just hunky dory but ultimately fails (worse)

Reading up on restart strategies, they do seem to apply at the job level, so if one task fails, the job restarts. Just a hunch, but I believe the failure is:

  1. my cluster has a maximum of 50 nodes
  2. my job has parallelism=40
  3. I run a job, a task fails (the other tasks continue)
  4. restart strategy kicks in and tries and asks for 40 new task managers.
  5. however, the old tasks are still soldiering on, so it only gets 11.
  6. these 11 will run, but this job also fails because it wasn't able to request 40 task managers.

Thus, we enter a perpetual failure state where jobs keep restarting, running, and, ultimately, failing because they can't get exactly 40 nodes, because the nodes are still being used by zombie jobs.

UPDATE: I might have been incorrect. Flink may be restarting only the tasks with ImagePullBackOff errors, but in this example that is enough to enter the failure mode above.

thodson-usgs commented 9 months ago

Recommended Flink configuration

Currently, this will configure Flink to handle task manager failures, but we will need to make additional changes to the deployment in order to handle job manager failures and avoid the ImagePullBackOffs failure mode that can occur as the number of nodes approaches the cluster's max limit. For now, using larger instances with more slots will help keep the cluster small and avoid ImagePullBackOffs.

Issues

Config

c.FlinkOperatorBakery.flink_configuration= {
    # https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution_mode/
   "execution.runtime-mode": "BATCH",
   # recommend setting slots equal to the number of cores; t3.large=3
   "taskmanager.numberOfTaskSlots": "2", 

   # configure according to your instance; we assume >= t3.large
   # https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/memory/mem_setup_tm/
   "taskmanager.memory.flink.size": "1536m", 
   "taskmanager.memory.task.off-heap.size": "256m", 
   "taskmanager.memory.jvm-overhead.max": "1024m",

   # BROKEN job restart (HA)
   # https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/ha/kubernetes_ha/
   # broken because the cluster can't write to s3; need to create another store
   "high-availability.type": "kubernetes",
   "high-availability.storageDir": "s3://usgs-pforge-us-west-2-flink-cache/recovery",  # created manually

   # task restart
   # https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/execution/task_failure_recovery/
   "restart-strategy" : "fixed-delay",
   "restart-strategy.fixed-delay.attempts": "3",
   "restart-strategy.fixed-delay.delay": "10 s",

}

Tests

Next step will be to test job manager failures and HA mode. Test 0: Kill task manager during job 🟒 : Other tasks continue, failed task restarts, job succeeds.

Test 1: Kill jobmanager at startup 🟒 : New job manger starts and spins up tasks managers.

Test 2: Kill jobmanager during job πŸ”΄ : Job manager restarts, tasks continue, but eventually die. Flink dashboard has no record of job. The dashboard is aware of the zombie tasks, but doesn't associate them with its job. :bulb: I believe this is failing because the cluster doesn't have permission to write to storageDir (refer to Flink's HA documentation).

Test 3 (edge case): Kill jobmanager just as tasks complete. 🟑

Test 4 (edge case): Simultaneously kill job and task manager during job. 🟑

If at least 0,1, and 2 pass, I believe we're safe to run on SPOT.

Props

Test Logs

Test 2 log:

2024-01-27 16:20:07,652 INFO  org.apache.flink.kubernetes.utils.KubernetesUtils            [] - Could not find main container flink-main-container in pod template, using empty one to initialize.
2024-01-27 16:20:08,255 INFO  org.apache.flink.kubernetes.KubernetesResourceManagerDriver  [] - Recovered 5 pods from previous attempts, current attempt id is 2.
2024-01-27 16:20:08,260 INFO  org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Recovered 5 workers from previous attempt.
2024-01-27 16:20:08,261 INFO  org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Worker gh-2dhytest-2dfeedstocks-2dgpcp-2dfrom-a81484-taskmanager-1-1 recovered from previous attempt.
2024-01-27 16:20:08,261 INFO  org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Worker gh-2dhytest-2dfeedstocks-2dgpcp-2dfrom-a81484-taskmanager-1-2 recovered from previous attempt.
2024-01-27 16:20:08,262 INFO  org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Worker gh-2dhytest-2dfeedstocks-2dgpcp-2dfrom-a81484-taskmanager-1-3 recovered from previous attempt.
2024-01-27 16:20:08,262 INFO  org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Worker gh-2dhytest-2dfeedstocks-2dgpcp-2dfrom-a81484-taskmanager-1-4 recovered from previous attempt.
2024-01-27 16:20:08,265 INFO  org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Worker gh-2dhytest-2dfeedstocks-2dgpcp-2dfrom-a81484-taskmanager-1-5 recovered from previous attempt.
2024-01-27 16:23:45,565 INFO  org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Pending recovery taskmanagers 5 -> 4.
2024-01-27 16:23:45,566 INFO  org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Worker gh-2dhytest-2dfeedstocks-2dgpcp-2dfrom-a81484-taskmanager-1-4 is terminated. Diagnostics: Pod terminated, container termination statuses: [flink-main-container(exitCode=1, reason=Error, message=null)]
2024-01-27 16:23:45,855 INFO  org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Pending recovery taskmanagers 4 -> 3.
2024-01-27 16:23:45,856 INFO  org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Worker gh-2dhytest-2dfeedstocks-2dgpcp-2dfrom-a81484-taskmanager-1-3 is terminated. Diagnostics: Pod terminated, container termination statuses: [flink-main-container(exitCode=1, reason=Error, message=null)]
2024-01-27 16:23:45,861 INFO  org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Pending recovery taskmanagers 3 -> 2.
2024-01-27 16:23:45,861 INFO  org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Worker gh-2dhytest-2dfeedstocks-2dgpcp-2dfrom-a81484-taskmanager-1-2 is terminated. Diagnostics: Pod terminated, container termination statuses: [flink-main-container(exitCode=1, reason=Error, message=null)]
2024-01-27 16:25:08,280 WARN  org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Worker gh-2dhytest-2dfeedstocks-2dgpcp-2dfrom-a81484-taskmanager-1-1 did not register in PT5M, will stop it and request a new one if needed.
2024-01-27 16:25:08,281 INFO  org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Stopping worker gh-2dhytest-2dfeedstocks-2dgpcp-2dfrom-a81484-taskmanager-1-1.
2024-01-27 16:25:08,281 INFO  org.apache.flink.kubernetes.KubernetesResourceManagerDriver  [] - Stopping TaskManager pod gh-2dhytest-2dfeedstocks-2dgpcp-2dfrom-a81484-taskmanager-1-1.
2024-01-27 16:25:08,281 INFO  org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Pending recovery taskmanagers 2 -> 1.
2024-01-27 16:25:08,287 WARN  org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Worker gh-2dhytest-2dfeedstocks-2dgpcp-2dfrom-a81484-taskmanager-1-5 did not register in PT5M, will stop it and request a new one if needed.
2024-01-27 16:25:08,287 INFO  org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Stopping worker gh-2dhytest-2dfeedstocks-2dgpcp-2dfrom-a81484-taskmanager-1-5.
2024-01-27 16:25:08,287 INFO  org.apache.flink.kubernetes.KubernetesResourceManagerDriver  [] - Stopping TaskManager pod gh-2dhytest-2dfeedstocks-2dgpcp-2dfrom-a81484-taskmanager-1-5.
2024-01-27 16:25:08,287 INFO  org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Pending recovery taskmanagers 1 -> 0. Resource manager is ready to serve.
ranchodeluxe commented 9 months ago

Nice of you to look into all this stuff (especially the HA things) @thodson-usgs πŸ₯³ Thanks for doing this.

Thinking through some edge cases here.

I often think of HA as being most useful in streaming workflows b/c if something bad goes wrong the job can recover and magically pick up from where the last checkpoint was. But in batch (let's assume you're using SPOT instances for this) I'm not sure how you'd differentiate between a job that failed b/c there's a valid bug and a job that failed b/c the SPOT instance was terminated. In the former scenario wouldn't the job just infinitely restart and fail?

Anytime I scale up I start hitting ImagePullBackOff errors. To avoid this, we could clone our own Flink image and have the workers pull from it.

I haven't run into this yet b/c the rate limit is decent. Is your parallelism >= 100 workers already? AFAIK each EC2 node should be caching images for all workers on it. So this would be a hard limit to breach unless maybe you're spinning up a worker per EC2 node and there are hundreds of them (though I could be wrong about aspects here). I'm wondering if something else is going on with your network possibly? We should look into this

thodson-usgs commented 9 months ago

Good point! I won't know until I test this. I think a valid bug would cause a task failure, which is handled by a separate strategy, like retry 3 times. Hopefully, the job won't restart when the task restart stategy fails. (In my testing, a failing task doesn't cause the job manager to fail, so task failures shouldn't cause the job to rerun 🀞 )

As for ImagePullBackOffs, I would typically get several setting parallelism = 40, but that's using t3.large with one slot. I've had fewer no issues using two slots (half as many image pulls), but eventually someone will run a big job and bump into this again.

ranchodeluxe commented 9 months ago

I think a valid bug would cause a task failure, which is handled by a separate strategy, like retry 3 times

yummy, like that

that's using t3.large with one slot. I've had fewer no issues using two slots (half as many image pulls), but eventually someone will run a big job and bump into this again

yeah, let's look at this b/c we can probably tune it to work differently but I agree that certain patterns will violate it eventually

thodson-usgs commented 9 months ago

ah, might you be refering to the adaptive scheduler

jobmanager.scheduler: adaptive

Looks like jobmanager.scheduler: default uses an adaptive-batch scheduler. However, adaptive-batch will only manage the parallelism if it is not set (so our config.py is disabling it), whereas adaptive will throttle your parallelism if resources aren't available.

ranchodeluxe commented 9 months ago

ah, might you be refering to the adaptive scheduler

Interesting, I've never seen that ☝️.

I'm talking about how the kubelet and container runtime work in k8s by default as described here: https://kubernetes.io/docs/concepts/containers/images/#image-pull-policy

I don't think any of the Flink images injected into the manifests use :lastest tag. if they do then the default would be imagePullPolicy: Always and that might cause your behaviour. But my assumption is we're getting imagePullPolicy: ifNotPresent for all images in each deploy so we should check.

Assuming we have a decent sized node, all the above means is during scheduling of many pods per node only the first pod would need to request the image and all the other worker pods would be returned the cached image from the container runtime (per node). This of course assumes they are using the same image tag too.

In your case, when 40 workers get scheduled that means we should only really be hitting Docker Hub once per node for as many nodes as your workers get scheduled on. So that would be a good thing to concretize and get numbers on. Then there is is the upper limit to node autoscaling to think about. It's probably something reasonable like 10. So it's an interesting problem that you seem to be running into