airflow-helm / charts

The User-Community Airflow Helm Chart is the standard way to deploy Apache Airflow on Kubernetes with Helm. Originally created in 2017, it has since helped thousands of companies create production-ready deployments of Airflow on Kubernetes.
https://github.com/airflow-helm/charts/tree/main/charts/airflow
Apache License 2.0
665 stars 476 forks source link

task-aware celery worker autoscaling (+ `pod-deletion-cost`) #339

Open thesuperzapper opened 3 years ago

thesuperzapper commented 3 years ago

The chart currently supports primitive autoscaling for celery workers, using HorizontalPodAutoscalers with memory metrics. But this is very flawed, as there is not necessarily a link between RAM usage, and the number of pending tasks, meaning you could have a situation where your workers don't scale up despite having pending tasks.

We can make a task-aware autoscaler that will scale up the number of celery workers when there are not enough task slots, and scale down when there are too many.

In past, scale down was dangerous to use with airflow workers, as Kubernetes had no way to influence which Pods were removed, meaning Kubernetes often removes a busy worker where there are workers that are doing nothing.

As of Kubernetes 1.22, there is a beta annotation for Pods managed by ReplicaSets called controller.kubernetes.io/pod-deletion-cost, which tells Kubernetes how "expensive" killing a particular Pod is when decreasing the replicas count.

NOTE: Previously we considered using KEDA (https://github.com/airflow-helm/charts/issues/103) to manage autoscaling, but this will not work with controller.kubernetes.io/pod-deletion-cost, as the HorizontalPodAutoscaler created by KEDA can not patch the required annotations BEFORE scaling down.


Our Celery Worker Autoscaler can perform the following loop:

  1. Cleanup from any past loops:
    1. Remove any controller.kubernetes.io/pod-deletion-cost annotations
      • NOTE: there will only be dangling annotations if Kubernetes did not remove our "chosen" Pods, or if the autoscaler crashed halfway through a loop
      • NOTE: we need to attempt to prevent multiple instances of our autoscaler running at a time
        1. Send each worker Pod that we removed an annotation from an app.control.add_consumer() command, so it resumes picking up new airflow tasks
  2. Calculate the ideal number of worker replicas for the current task load:
    • if the load factor of workers is above A for B time --> increase replicas to meet the target load factor
    • if the load factor of workers is below X for Y time --> decrease replicas to meet the target load factor
      • NOTE: the load factor is the number of available task slots which are consumed
      • NOTE: we should put some limit on the number of scaling decisions per A seconds (to prevent a yo-yo effect), (perhaps have separate limits for down and up to allow faster upscaling)
      • NOTE: we should have a "scaling algorithm" config, even if we only start with 1
      • NOTE: we should have minium and maximum replicas configs
      • _NOTE: if using CeleryKubernetesExecutor, we must exclude tasks that are in the AIRFLOW__CELERY_KUBERNETES_EXECUTOR__KUBERNETES_QUEUE_
  3. If replicas are going to be decreased by N:
    1. Sort the worker pods by their pod-deletion-cost in ascending order
      • NOTE: the pod-deletion-cost is the number of running tasks, weighted by the total running time of each task (so long-running tasks are not needlessly evicted), specifically we want smaller numbers of long-running tasks to be weighted higher than larger numbers of short-running tasks
      • NOTE: add a DAG/Task label which will prevent any worker running it from being killed (or allow a "weighting" per Task)
    2. Annotate the N worker Pods with the lowest cost Pods with the controller.kubernetes.io/pod-deletion-cost annotation
      • NOTE: if there are pods in a Pending/Unready state, we can reduce N by this number, as Kubernetes will remove these pods first
    3. Send each worker Pod that was annotated an app.control.cancel_consumer(...) command, so does not pick up new airflow tasks after being "marked" for deletion
    4. Patch the replicas down by N

Important changes to make this work:

potiuk commented 2 years ago

Interesting approach 👀

NitinKeshavB commented 1 year ago

Cant wait to see this in action. please let us know once this is available.

thesuperzapper commented 1 year ago

Cant wait to see this in action. please let us know once this is available.

@NitinKeshavB I agree, I am sorry it's taken so long!

I actually have a mostly working prototype, but I have paused work on it until I can get the first release of deployKF (a new open-source ML Platform for Kubernetes, which will include Airflow) out the door.

After that, it is top of my list!

brtkwr commented 11 months ago

Ping :D would this support scale to 0 by any chance?

lexey-e-shelf commented 9 months ago

Hi @thesuperzapper, I'm very interested in this feature as well, and I see that you recently added a new Kubernetes proposal related to controller.kubernetes.io/pod-deletion-cost. I don't fully grasp the details, but will that change this approach as well? Perhaps more pertinently, will the implementation of this approach depend on the implementation of that Kubernetes proposal?

Joffreybvn commented 3 months ago

I prototyped a "Keda Airflow autoscaler", and it doesn't work as good as I expected.

The autoscaler:

The autoscaler is deployed as a new endpoint in Airflow"s API, called every 10 seconds by Keda (metrics-api).

Problem: Sometimes, a task get picked up by an empty worker right after the database is queried (before getting a 'TERM' signal). Which leads to task eviction. This is especially true with short running dynamic tasks.


Some thoughts/ learnings:

That said, I'm going to give a try to this flow, with the 'simple'/'safe' downscale logic