apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
36.16k stars 14.04k forks source link

executor_config: "limits_memory" option for operators #16563

Open ecerulm opened 3 years ago

ecerulm commented 3 years ago

Description

Currently, if you want to change the kubernetes resources allocated to an operator you need to something like

from kubernetes.client import models as k8s

task1 = PythonOperator(
       task_id="id1",
       python_callable=mycallable,
       dag=dag1,
       executor_config={
        "pod_override": k8s.V1Pod(
          spec=k8s.V1PodSpec(
            containers=[
              k8s.V1Container(
                name="base",
                resources=k8s.V1ResourceRequirements(
                  limits={
                    "memory": "6Gi",
                    "cpu": "1",
                  }
               )
              )
           ]
         )
      )
  },

which works, but I think that is very verbose and for a (in my opinion) common case. I would prefer a more direct syntax like

task1 = PythonOperator(
       task_id="id1",
       python_callable=mycallable,
       dag=dag1,
       executor_config={
           "pod_limits_memory": "6Gi",
           "pod_limits_cpu": "1",
       }

Alternatively, maybe a documentation change to

would make this (IMHO common) task easier

Use case / motivation

It think the figuring out the executor_config > pod_override and the kubernetes.client.models api just to increase the memory limit for a tasks was kind of hard. I guess a lot of people would benefit from

Are you willing to submit a PR?

Related Issues

mik-laj commented 3 years ago

Why do you think about creating a factory method?

def limits(cpu, memory):
    return {
        "pod_override": k8s.V1Pod(
          spec=k8s.V1PodSpec(
            containers=[
              k8s.V1Container(
                name="base",
                resources=k8s.V1ResourceRequirements(
                  limits={
                    "memory": memory,
                    "cpu": cpu
                  }
               )
              )
           ]
         )
      )

Usage:

task1 = PythonOperator(
       task_id="id1",
       python_callable=mycallable,
       dag=dag1,
       executor_config=limits('1', '6Gib')
)
ashb commented 3 years ago

There is already a resources argument to BaseOperator -- if we want some simple way to set those limits we could use that. I think that parameter is currently otherwise unused (it was used for the old Mesos and cgroup executors)