apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
37.04k stars 14.28k forks source link

KubernetesPodOperator does not return XCOM on pod failure #8792

Open jvstein opened 4 years ago

jvstein commented 4 years ago

Apache Airflow version: 1.10.9

Kubernetes version (if you are using kubernetes) (use kubectl version): 1.14.9

Environment:

What happened:

I ran a new task using the KubernetesPodOperator on our k8s cluster. This pod is designed to write to the /airflow/xcom/return.json even in case of failures so we can send a user-friendly error message in a following task. The pod exits with a non-zero exit code, so Airflow appropriately updates the task as failed, but the XCOM values are not available.

What you expected to happen:

I expected XCOM variables to be available even on pod failure. We use this capability in other operators to signal error conditions and messages.

How to reproduce it:

Run a KubernetesPodOperator with a command like this in an alpine image.

/bin/bash -c 'echo "{'success': False}" > /airflow/xcom/return.json; exit 1'

Check the XCOM results, which should include the JSON dictionary.

Anything else we need to know:

boring-cyborg[bot] commented 4 years ago

Thanks for opening your first issue here! Be sure to follow the issue template!

Shivarp1 commented 4 years ago

@jvstein Have you tried it in Airflow 1.10.12 version? ( k8s server version v1.17.4; k8s client version v1.15.3) I am passing to KubernetesPodOperator cmds=["/bin/bash", "-cx"], arguments=['echo \'{"success": True}\' > /airflow/xcom/return.json; echo 0'],

the task returns success..and starts the dependent followup task however it still gets no value from /airflow/xcom/return.json ----- bash task log from the spawned k8spod {{pod_launcher.py:173}} INFO - Event: secondary-bash-62564f5389134383924eae918edb8ef6 had an event of type Succeeded {{pod_launcher.py:287}} INFO - Event with job id secondary-bash-62564f5389134383924eae918edb8ef6 Succeeded {{pod_launcher.py:156}} INFO - b'+ echo \'{"success": True}\'\n' {{pod_launcher.py:156}} INFO - b'/bin/bash: /airflow/xcom/return.json: No such file or directory\n' {{pod_launcher.py:156}} INFO - b'+ echo 0\n' {{pod_launcher.py:156}} INFO - b'0\n' -- followup task code.. _retVal = task_instance.xcom_pull(taskids='Secondary-bash-task') print("Printing Secondary Task Return value. ") print(retVal) --- followup task log.. {{logging_mixin.py:112}} INFO - Printing Secondary Task Return value. {{logging_mixin.py:112}} INFO - None {{python_operator.py:114}} INFO - Done. Returned value was: True

jvstein commented 4 years ago

@Shivarp1 - I have not tested in Airflow 1.10.12. Reading through the relevant section on the 1.10.12 tag, I suspect the same issue exists.

I just noticed that my repro steps had a bug in the command. It should have been exit 1 not echo 1 at the end. I updated the description.

We're currently using 1.10.9, with the following patch.

diff --git c/airflow/contrib/operators/kubernetes_pod_operator.py i/airflow/contrib/operators/kubernetes_pod_operator.py
index f692599d7..f4b970d7e 100644
--- c/airflow/contrib/operators/kubernetes_pod_operator.py
+++ i/airflow/contrib/operators/kubernetes_pod_operator.py
@@ -20,6 +20,7 @@ import warnings

 from airflow.exceptions import AirflowException
 from airflow.models import BaseOperator
+from airflow.models import XCOM_RETURN_KEY
 from airflow.utils.decorators import apply_defaults
 from airflow.contrib.kubernetes import kube_client, pod_generator, pod_launcher
 from airflow.contrib.kubernetes.pod import Resources
@@ -253,12 +254,13 @@ class KubernetesPodOperator(BaseOperator):  # pylint: disable=too-many-instance-
                 if self.is_delete_operator_pod:
                     launcher.delete_pod(pod)

+            if self.do_xcom_push:
+                self.xcom_push(context, XCOM_RETURN_KEY, result)
+
             if final_state != State.SUCCESS:
                 raise AirflowException(
                     'Pod returned a failure: {state}'.format(state=final_state)
                 )
-            if self.do_xcom_push:
-                return result
         except AirflowException as ex:
             raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
Shivarp1 commented 4 years ago

@jvstein one question.. How can we create /airflow/xcom folder when the user for the new worker pod spawned does not have the root access?

when I try this .. cmds=["RUN mkdir -p /airflow", "RUN chmod -R 777 /airflow", "/bin/bash", "-cx"], arguments=['mkdir /airflow/xcom','echo \'{\"success\": True}\' > /airflow/xcom/return.json', 'exit 0'], .. I get this error.. {{pod_launcher.py:156}} INFO - b'container_linux.go:235: starting container process caused "exec: \"RUN mkdir -p /airflow\": stat RUN mkdir -p /airflow: no such file or directory"\n'

Thanks

jvstein commented 4 years ago

@Shivarp1 - You're not allowed to pass RUN statements into the operator. You need to start with a pre-built image where those permissions are present, or just run a pod as root.

Try this task definition:

task = KubernetesPodOperator(
    dag=dag,
    task_id="xcom_test",
    name="test_xcom_failure",
    namespace="default",
    in_cluster=False,
    config_file="/path/to/kube/config",
    is_delete_operator_pod=True,
    image_pull_policy="IfNotPresent",
    image="alpine:3.12",
    do_xcom_push=True,
    retries=0,
    cmds=["/bin/sh", "-c"],
    arguments=["mkdir -p /airflow/xcom; echo '{\"success\": false}' | tee /airflow/xcom/return.json; exit 1"],
)
dstandish commented 2 years ago

@kaxil @jedcunningham is this desired behavior? i.e. to attempt to push xcom even in the case of failure?

SacredSkull commented 2 years ago

The above is still true as of Airflow 2.2.2 - I would love to see this working!

Perhaps as an option do_xcom_push_on_failure?

As a work-around I've had to let the internal script pass and handle/check for the error in a downstream task and mark that as a failure.

dstandish commented 2 years ago

Yeah I think we should go ahead and make this with no option -- just make it push xcom in a finally or something. But have to check on timing. There's a refactor of KPO in progress and may make sense to include the change as part of that.

kaxil commented 2 years ago

Plus one to what Daniel said

eladkal commented 1 year ago

Any operator can be resulted in failure. Currently as far as I can tell we don't push to xcom information about the failure. If we want to do this we should probably make it generic (as much as we can)

Personally I would prefer this push not to be the default behavior.

potiuk commented 1 year ago

Any operator can be resulted in failure. Currently as far as I can tell we don't push to xcom information about the failure. If we want to do this we should probably make it generic (as much as we can). Personally I would prefer this push not to be the default behavior.

TL;DR; I would be on having the push_on_failure as an option - but only for the few "generic" operators we have - KPO/Docker/Bash etc.

I thought a bit on that and this is very much philosophical issue :). Whie I have no "only one good" solution, intuitively I think pushing xcom on failure should be added as option for "generic" operators - like KPO/Docker/Bash/Python, but we shuold not do it for "specific" operator.

Let me explain my line of thoughts - maybe that will lead

Generally speaking default behaviour for "regular" operator is that they push to xcom whatever is returned by execute() (unless do_xcom_push is false). That's the current semantics. And when there is a failure we CANNOT push anything because the execute () method does not return anything, unless we change this semantics.

Also doing so is a bit superfluous if you consider that the author of the operator might choose to do it on their own:

def execute():
    try:
       ....
     except Exception():
          Xcom.push(...)
          raise

And they will be able to put a message if they really want to push something on failure. And it's up to the author of the operator to define the behaviour. As a user, when you get an operator that does specific thing, it is generally "closed" - it does what it tells you, you have not much freedom there as a user, the author already made some decisions for you. Of course as a user you can extend such operator and then you can change the behaviour and add similar try/except wrapper.

The thing with KPO (and few other generic operators like Docker, Bash, Python) is that this is generic operator - and as a user you have more freedom to decide what and where happens - by providing a bash script, image, pod_template etc. - and this is where you also might get to decide what to do in case of the failure. But ... you cannot really (as a user) currently make a decision whether to push it or not now (without extending the operator) - so suddenly the "generic" operators are not as generic any more. Yes you can extend it -but "generic" operator's philsophy is that they shoud not need to be extended, whereas for the "specific" operators, "extension" is the only way of changing the behaoviour that hte author of the operator made decisions on.

I am not super strong on it, but wanted to explain not only what I think but also what led me to thinking this is the best approach. Maybe this will be good for others to think that through.

hussein-awala commented 9 months ago

I'm interested in this proposed feature (btw it is not a bug). I tried to create something generic as @potiuk suggested, but it's too complicated since we don't return any result on failure, the only way to do that is through the exception, which is not a clean way, and implementing it in Airflow core will complicate the support in the providers.

Since it will only be supported by a small set of operators, IMHO we have to handle it on a case-by-case basis. I created #37079 to implement it in KPO and tested it in normal and deferrable mode. I can add some tests to make it ready to merge if you agree to the proposal.

hussein-awala commented 9 months ago

As a workaround, we can use the new callbacks class:

class XComCallbacks(KubernetesPodOperatorCallback):
    @staticmethod
    def on_pod_completion(*, pod: k8s.V1Pod, client: CoreV1Api, mode: str, **kwargs) -> None:
        from airflow.models.xcom import XCom

        def _construct_run_id(run_id: str):
            """re-construct the run_id from the safe label"""
            new_run_id = run_id.split("T")[0]
            rest = run_id.split("T")[1]
            new_run_id += "T"
            new_run_id += rest[:2] + ":" + rest[2:4] + ":" + rest[4:6]
            new_run_id += rest[6:13] + "+" + rest[13:15] + ":" + rest[15:17]
            return new_run_id

        if (pod.status.phase if hasattr(pod, "status") else None) != PodPhase.SUCCEEDED:
            pod_manager = PodManager(kube_client=client)
            pod_manager.await_xcom_sidecar_container_start(pod=pod)
            result = pod_manager.extract_xcom(pod=pod)
            XCom.set(
                key="failure_result",
                value=result,
                task_id=pod.metadata.labels["task_id"],
                dag_id=pod.metadata.labels["dag_id"],
                run_id=_construct_run_id(pod.metadata.labels["run_id"]),
            ) 

And for deferrable mode, we can implement the same logic in on_operator_resuming.

Happy to find a new use case for this feature 😄

marlena-hammond commented 9 months ago

I am interested in this feature. For, my usecase, it is very helpful being able to retry the task/DAG based on the exception that is returned.

ywang271828 commented 3 months ago

Another usage case of pushing results to XCOM at failure is to provide diagnostics that subsequent tasks can analyze and provide a graceful handling.

dstandish commented 3 months ago

I don't see any harm in just always pushing e.g. by using try / finally. there's no harm in pushing the xcom if there is something there. downstream tasks don't need to do anything with it unless they want to. that said, it's also not important to me personally that this is implemented.