apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
36.15k stars 14.04k forks source link

KubernetesPodOperator callback example from Doc doesn't work #39291

Open owler opened 4 months ago

owler commented 4 months ago

What do you see as an issue?

https://airflow.apache.org/docs/apache-airflow-providers-cncf-kubernetes/stable/operators.html#id13 The problem may related to the None value for api_version and kind see. log below

import kubernetes.client as k8s
import kubernetes_asyncio.client as async_k8s

from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from airflow.providers.cncf.kubernetes.callbacks import KubernetesPodOperatorCallback
import pendulum
from airflow import DAG
from airflow.operators.empty import EmptyOperator

class MyCallback(KubernetesPodOperatorCallback):
    @staticmethod
    def on_pod_creation(*, pod: k8s.V1Pod, client: k8s.CoreV1Api, mode: str, **kwargs) -> None:
        client.create_namespaced_service(
            namespace=pod.metadata.namespace,
            body=k8s.V1Service(
                metadata=k8s.V1ObjectMeta(
                    name=pod.metadata.name,
                    labels=pod.metadata.labels,
                    owner_references=[
                        k8s.V1OwnerReference(
                            api_version=pod.api_version,
                            kind=pod.kind,
                            name=pod.metadata.name,
                            uid=pod.metadata.uid,
                            controller=True,
                            block_owner_deletion=True,
                        )
                    ],
                ),
                spec=k8s.V1ServiceSpec(
                    selector=pod.metadata.labels,
                    ports=[
                        k8s.V1ServicePort(
                            name="http",
                            port=80,
                            target_port=80,
                        )
                    ],
                ),
            ),
        )

with DAG(
        dag_id='test_dag2',
        schedule="45 * * * *",
        start_date=pendulum.datetime(2024, 3, 26, tz="UTC"),
        catchup=False,
        max_active_runs=3,
        dagrun_timeout=None,
        params={
            "srcPath": "/dimas/test_dataset",
            "partition": "",
            "dstPath": "/shared/dmitry.savenko/kube"
        }
) as dag:

        k = KubernetesPodOperator(
            task_id="test_callback",
            image="alpine",
            cmds=["/bin/sh"],
            arguments=["-c", "echo hello world; echo Custom error > /dev/termination-log; exit 1;"],
            name="test-callback",
            callbacks=MyCallback,
        )
run_this_last = EmptyOperator(
    task_id="run_this_last",
)

k >> run_this_last
 File "/home/airflow/.local/lib/python3.8/site-packages/kubernetes/client/models/v1_owner_reference.py", line 97, in api_version
    raise ValueError("Invalid value for `api_version`, must not be `None`")  # noqa: E501

airflow.exceptions.AirflowException: Pod test-callback-lvmxwlac returned a failure.
remote_pod: {'api_version': None,
 'kind': None,
 'metadata': {'annotations': None,
              'creation_timestamp': datetime.datetime(2024, 4, 27, 13, 13, 25, tzinfo=tzlocal()),
              'deletion_grace_period_seconds': None,
              'deletion_timestamp': None,
              'finalizers': None,
              'generate_name': None,
              'generation': None,
              'labels': {'airflow_kpo_in_cluster': 'True',
                         'airflow_version': '2.8.4',
                         'dag_id': 'test_dag2',
                         'kubernetes_pod_operator': 'True',
                         'run_id': 'scheduled__2024-04-27T1145000000-ac155a96d',
                         'task_id': 'test_callback',

Solving the problem

No response

Anything else

No response

Are you willing to submit PR?

Code of Conduct

boring-cyborg[bot] commented 4 months ago

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

RNHTTR commented 4 months ago

This is reproducible via the Astro CLI with the code in the original post.

One odd thing about the docs example is the exit 1 in the command -- I don't think exiting with an error has anything to do with the callback demonstration?