apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
36.35k stars 14.1k forks source link

Success callback not being executed in Custom kubernetesPodOperator #41661

Open joselfrias opened 3 weeks ago

joselfrias commented 3 weeks ago

Apache Airflow version

2.9.3

If "Other Airflow 2 version" selected, which one?

No response

What happened?

I've veen trying to extend the Kubernetes Pod Operator in order to assign it a successfull callback when the pod executes with success. The main idea here is to create a custom Operator that will execute the assigned callback everytime this Custom Operator is used. However, when I run it inside the DAG, it appears from the logs that the callback is not executed.

The following presents the code of that CustomKubernetesPodOperator that I'm trying to build.

from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from airflow.utils.decorators import apply_defaults

class ExtendedKubernetesPodOperator(KubernetesPodOperator):
    @apply_defaults
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

    def execute(self, context):
        # Execute the original KubernetesPodOperator logic
        result = super().execute(context)

        # Call the success callback if the pod was successful
        #if self.success_callback and self.pod.status.phase == 'Succeeded':
        default_success_callback(context)

        return result

# Define a default success callback function
def default_success_callback(context):
    print(f"Pod succeeded!")

class CustomKubernetesPodOperator(ExtendedKubernetesPodOperator):
    @apply_defaults
    def __init__(self, *args, **kwargs):
        if 'on_success_callback' not in kwargs:
            kwargs['on_success_callback'] = default_success_callback
        super().__init__(*args, **kwargs)

And the DAG where this custom operator is used.

from datetime import datetime
from airflow import DAG
#from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
from custom_kubernetes_operator import CustomKubernetesPodOperator
import os

# Define the default_args
default_args = {
    'owner': 'HelloWorldOwner',
    'start_date': datetime(2024, 8, 22),
    'retries': 1,
}

# Define the DAG
dag = DAG(
    'hello_world_kubernetes_pod',
    default_args=default_args,
    description='Hello World DAG',
    schedule_interval=None,
    catchup=False,
)

# Define the KubernetesPodOperator
hello_world = CustomKubernetesPodOperator(
    dag=dag,
    image='ubuntu:latest',
    cmds=["sh", "-c"],
    arguments=[
        """
        echo Hello World
        """
    ],
    name='hello-world-pod',
    task_id="hello_world",
    get_logs=True
)

# Set the task in the DAG
hello_world

What you think should happen instead?

I tried to create for example a custom bash operator, similar to the Custom Operator defined above, but in this case the callback is executed with success and the message is printed in the logs. Here is the code of that custom bash operator that works. Is there something I am missing in the definition of the custom Kubernetes Pod Operator or the behaviour should be equal?

The custom bash operator:

from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from airflow.utils.decorators import apply_defaults
from airflow.operators.bash import BashOperator

class ExtendedBashOperator(BashOperator):
    @apply_defaults
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

    def execute(self, context):
        result = super().execute(context)

        # Call the success callback if the pod was successful
        default_success_callback(context)

        return result

# Define a default success callback function
def default_success_callback(context):
    print(f"Pod succeeded!")

class CustomBashOperator(ExtendedBashOperator):
    @apply_defaults
    def __init__(self, *args, **kwargs):
        if 'on_success_callback' not in kwargs:
            kwargs['on_success_callback'] = default_success_callback
        super().__init__(*args, **kwargs)

The DAG where the bash operator is imported.

from datetime import datetime
from airflow import DAG
from custom_bash_operator import CustomBashOperator
import os

# Define the default_args
default_args = {
    'owner': 'Hello World Owner',
    'start_date': datetime(2024, 8, 22),
    'retries': 1,
}

# Define the DAG
dag = DAG(
    'hello_world_bash',
    default_args=default_args,
    description='Hello World DAG',
    schedule_interval=None,
    catchup=False,
)

# Define the KubernetesPodOperator
hello_world_bash = CustomBashOperator(
    dag=dag,
    bash_command="echo HELLO",
    name='hello-world-bash',
    task_id="hello_world_bash"
)

# Set the task in the DAG
hello_world_bash

How to reproduce

  1. Create a custom Operator that inherits from the KubernetesPodOperator (similar to above).
  2. Create a DAG that will use this Custom Operator, with no success callback defined in the task definition.
  3. Run the DAG.
  4. Verify that the callback is not executed.

Operating System

Ubuntu 22.04.2 LTS

Versions of Apache Airflow Providers

No response

Deployment

Virtualenv installation

Deployment details

Native installation with pip.

Anything else?

No response

Are you willing to submit PR?

Code of Conduct

boring-cyborg[bot] commented 3 weeks ago

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.