apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
37.12k stars 14.3k forks source link

Can NOT expand on KubernetesPodOperator #39880

Closed stephen-lazarionok closed 4 months ago

stephen-lazarionok commented 5 months ago

Apache Airflow version

Other Airflow 2 version (please specify below)

If "Other Airflow 2 version" selected, which one?

2.5.0-python3.10

What happened?

When trying to use dynamic task mapping with KubernetesPodOperator getting the following:

Broken DAG: [/opt/airflow/dags/misc/mydag.py] Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/mappedoperator.py", line 149, in __attrs_post_init__
    validate_mapping_kwargs(self.operator_class, "partial", self.kwargs)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/mappedoperator.py", line 109, in validate_mapping_kwargs
    raise TypeError(f"{op.__name__}.{func}() got {error}")
TypeError: KubernetesPodOperator.partial() got an unexpected keyword argument 'provide_context'

What you think should happen instead?

Everything should work

How to reproduce

Try to use:

 KubernetesPodOperator.partial(
        namespace=NAMESPACE,
        image=f"{REGISTRY_NAME}/{IMAGE_NAME}:{IMAGE_TAK}",
        cmds=["python"],
        arguments=["/src/pipelines/pipeline.py"],
        name="abc",
        task_id="abc",
        get_logs=True,
        is_delete_operator_pod=True,
        dag=dag,
        labels={
            "job_type": JobType.ETL,
            "platform": Platform.ALL_PLATFORMS,
            "app": JOB_NAME,
        },
        container_resources=k8s.V1ResourceRequirements(
            limits={"memory": "256Mi"}, requests={"cpu": "100m", "memory": "256Mi"}
        ),
        image_pull_secrets=[k8s.V1LocalObjectReference(IMAGE_PULL_SECRET_NAME)],
    ).expand(env_vars=[{"A": "A"}, {"A": "B"}])

Operating System

Linux

Versions of Apache Airflow Providers

apache-airflow-providers-cncf-kubernetes==7.8.0
apache-airflow-providers-amazon==6.2.0
apache-airflow-providers-slack==7.2.0
apache-airflow-providers-postgres==5.4.0

Deployment

Official Apache Airflow Helm Chart

Deployment details

No response

Anything else?

No response

Are you willing to submit PR?

Code of Conduct

boring-cyborg[bot] commented 5 months ago

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

jscheffl commented 5 months ago

I tried copy&paste the example code you provided into a example DAG - used current main and a developer setup. Could not see any reported problem.

But I am wondering a bit, the term provide_context is not in your example. Might your problem be sitting somewhere around in your DAG? As your reported version is a bit old, can you try re-producing on Airflow 2.9.1 and the provider package 8.2.0? If the same problem can be re-produced, can you post a full DAG with the problem?

stephen-lazarionok commented 5 months ago

Hi, here is the codebase:

from datetime import datetime, timedelta

from airflow import DAG
from airflow.decorators import task
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import (
    KubernetesPodOperator,
)
from dags.config import (
    IMAGE_PULL_SECRET_NAME,
    MISC_IMAGE_NAME,
    MISC_ROUTINES_MOST_RECENT_TAG,
    NAMESPACE,
    REGISTRY_NAME,
    misc_jobs_args,
)
from dags.constants import JobType, Platform
from dags.utils import failure_callback, get_job_name
from kubernetes.client import models as k8s

K8S_ARGS = {}

JOB_NAME = "MY_JOB_NAME"

with DAG(
    dag_id=JOB_NAME,
    default_args=misc_jobs_args,
    catchup=False,
    max_active_runs=1,
    is_paused_upon_creation=False,
    schedule_interval="0 * * * *",  # Run once an hour at the beginning of the hour
    start_date=datetime(2022, 7, 13, 11, 0),
    dagrun_timeout=timedelta(minutes=10),
    on_failure_callback=failure_callback,
    tags=[
        f"job_type:{JobType.ETL}",
        f"platform:{Platform.ALL_PLATFORMS}",
    ],
) as dag:

    read_groups = KubernetesPodOperator(
        namespace=NAMESPACE,
        image=f"{REGISTRY_NAME}/{MISC_IMAGE_NAME}:{MISC_ROUTINES_MOST_RECENT_TAG}",
        cmds=["python"],
        arguments=["/src/pipelines/read_groups.py"],
        name="read_groups",
        task_id="read_groups",
        get_logs=True,
        is_delete_operator_pod=True,
        do_xcom_push=True,
        dag=dag,
        labels={
            "job_type": JobType.ETL,
            "platform": Platform.ALL_PLATFORMS,
            "app": JOB_NAME,
        },
        container_resources=k8s.V1ResourceRequirements(
            limits={"memory": "256Mi"}, requests={"cpu": "100m", "memory": "256Mi"}
        ),
        image_pull_secrets=[k8s.V1LocalObjectReference(IMAGE_PULL_SECRET_NAME)],
        env_vars=[
            k8s.V1EnvVar(
                name="PLATFORM_NAME",
                value="sv",
            ),
            k8s.V1EnvVar(
                name="ML_MODEL_ENTRY_NAME",
                value="my_model",
            ),
        ],
    )

    KubernetesPodOperator.partial(
        namespace=NAMESPACE,
        image=f"{REGISTRY_NAME}/{MISC_IMAGE_NAME}:{MISC_ROUTINES_MOST_RECENT_TAG}",
        cmds=["python"],
        name="process_group",
        task_id="process_group",
        get_logs=True,
        is_delete_operator_pod=True,
        dag=dag,
        labels={
            "job_type": JobType.ETL,
            "platform": Platform.ALL_PLATFORMS,
            "app": JOB_NAME,
        },
        container_resources=k8s.V1ResourceRequirements(
            limits={"memory": "256Mi"}, requests={"cpu": "100m", "memory": "256Mi"}
        ),
        image_pull_secrets=[k8s.V1LocalObjectReference(IMAGE_PULL_SECRET_NAME)],
        **K8S_ARGS,
    ).expand(
        arguments=[
            ["/src/pipelines/sampleprocess.py"],
            ["/src/pipelines/sampleprocess.py"],
        ]
    )
jscheffl commented 5 months ago

Thanks for posting a full DAG code. Unfortunately I was not able to make it working. As you have a couple of external imports that do not resolve "standalone" I suspect from code posted that somewhere the misc_jobs_args is introducing the parameter which causes the error.

Can you substitute all external dependency imports with dummy values and paste a DAG which is free of your custom imports? Else it is not possible to re-produce.

github-actions[bot] commented 5 months ago

This issue has been automatically marked as stale because it has been open for 14 days with no response from the author. It will be closed in next 7 days if no further activity occurs from the issue author.

github-actions[bot] commented 4 months ago

This issue has been closed because it has not received response from the issue author.