LamaAni / KubernetesJobOperator

An airflow operator that executes a task in a kubernetes cluster, given a kubernetes yaml configuration or an image refrence.
57 stars 8 forks source link

Executing each task without restarting pods for every task #42

Closed LamaAni closed 3 years ago

LamaAni commented 3 years ago

ORIGINAL POST FROM: @teaglebuilt

I am looking for a way to execute each task without restarting pods for every task....even if it is a job...do you know how this could be possible?

Originally posted by @teaglebuilt in https://github.com/LamaAni/KubernetesJobOperator/issues/40#issuecomment-845937366

LamaAni commented 3 years ago

There is a configuration which allows airflow to run in celery worker mode. There, when running a task it will run in one of the worker pods. I have a project that deploys airflow that way. Feel free to look at the examples there or checkout the official chart here (I prefer mine :))

Another way to do this would be to write a custom operator that exec a command in a pre-running pod using the kuberentes api (official, or mine). But that would be very problematic, since it breaks the kubernetes separation principle.

Is there a specific reason you would prefer not to restart the pod?

teaglebuilt commented 3 years ago

Is there a specific reason you would prefer not to restart the pod?

Yes, So i am on kubernetes and have different tenant/teams....We are orchestrating long and short processes but not doing any heavy computing....So spinning up pods for every task would probably slow us down.

However, using Celery would be a good approach, but my question is how do I tell the scheduler to delegate tasks to a certain worker.

What i am ultimately trying to do, is make deployments without restarting airflow processes. I originally wanted to use containerization for this, and if i can treat each dag as a container / airflow worker and airflow knows how to use a specific worker for a specific dag, then that would give us the ability to deploy changes to workers without restarting the webserver / scheduler....

LamaAni commented 3 years ago

Hum. I see.

I think that in celery you cannot specify which worker dose which tasks. That is not possible.

Also, I found working with Celery for ~1 year, that its very slow in scheduling tasks. The requirement for synchronization (rabbitmq + DB) usually ends up slowing down the process. When there are many small tasks, I found that this sometimes drives the scheduling to respond on the order of minutes.

My recommendation would be to restart the pod every time. It has a few advantages:

  1. Unlimited scalability. A worker pod(s) (even with auto-scaling) would be less available to scale, and would respond slower.
  2. Restarting pods is fast, very fast. I found that working on starting up the container faster is possible (which I implemented in my open source repo zairflow). The overhead per task is ~1-5 seconds once the image was downloaded once.
  3. Cost. Workers which are not being used are wasting resources. Unless you are running 100% of the time. You can use autoscale (as I did with celery) but that was slow. See above point 1.
  4. Much easier to debug. Pods are standalone and you can use either stack driver or other methodologies to track airflow related issues (package mismatch is usually an issue)

Further, I should note, that it is possible to make deployments without restarting airflow. In general, by creating a DB associated with the deployments that uses a PVC (I use postgres statefull set). This db runs external to the scheduler/webserver/etc. You can also persist the airflow logs using this package.

Feel free to try the example I have here. It is a helm chart, using helmfile to manage the deployemnt. It should provide you with production ready airflow who's db persists through deployments. This package provides,

  1. Airflow deployment
  2. DB deployment
  3. State + Log preservation (using postgres DB)
  4. Ability to sync a github repository (or any git repository for that matter) to the pod. (Use for faster development)
  5. State preserving restarts.

You can also find the stable official chart here, it should do the same.

teaglebuilt commented 3 years ago

Hmm, ok would a job vs pod differ in terms of speed? I am using the local executor now but I need to scale this workload because i will have many clients.

Is a job faster than a pod? Bc a pod is still launched for a job i believe. I guess i will try to test it. But i am curious also about the following....

  1. Using Docker Operator in kubernetes
  2. Use Kubernetes Operator as a sub dag operator to use the pod / task as a dagrun.
  3. Tell it to use the same pod instead of using PodLauncher
  4. Use Celery and each client has its own worker which i found can but done by the following
class CustomOperator(BaseOperator):
  def __init__(self,...):
     self.queue = 'client queue'. <- now on the base operator class in airflow 2.1.0
 command: ["airflow", "celery", "worker", "-q", "client"]

Although, the issue with this is that I would still have to use a PV to mount the client dag to the scheduler I think

teaglebuilt commented 3 years ago

@LamaAni So it is interesting how it is being used here, Kedro KubernetesPodOperator

They are launching it per node, but would you see any issues around telling the whole dag run to execute? I might try just returning a different operator on certain steps to tell it not to spin up a pod for a specific task maybe.

LamaAni commented 3 years ago

Oh, I see the confusion now.

The setup I was talking about is the the one that uses the KubernetesExecutor. Which dose the following,

  1. A task is scheduled.
  2. A new worker (kubernetes pod) is generated (per task)
  3. The task is executed on the worker
  4. The worker is deleted.

Diagram

When there are not tasks running, there is no worker pods running. Note that the KubernetesExecutor differes from the KubernetesPodOperator and the KubernetesJobOperator. The kubernetes executor will run any operator, and dose not have to be an image.

See here for more information about the kubernetes executor: https://airflow.apache.org/docs/apache-airflow/stable/executor/kubernetes.html

LamaAni commented 3 years ago

Issue closed. Stale