apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
37.07k stars 14.29k forks source link

CloudRunJobFinishedTrigger does not validate if tasks finished successfully #38028

Open wiktorn opened 8 months ago

wiktorn commented 8 months ago

Apache Airflow Provider(s)

google

Versions of Apache Airflow Providers

apache-airflow-providers-google==10.14.0

Apache Airflow version

airflow==2.6.3

Operating System

composer-2.6.2-airflow-2.6.3

Deployment

Google Cloud Composer

Deployment details

No response

What happened

When using CloudRunExecuteJobOperator(...., deferrable=True) when Cloud Run job exits with non-0 status, the DAG continues.

If you check the CloudRunJobFinishedTrigger.run code, you see, that it only check if operation.error.SerializeToString() is None or not, and based on that setting the TriggerEvent values.

If you cross-check that with CloudRunExecuteJobOperator._fail_if_execution_failed you noticed that in synchronous mode, there is a lot of other checks done.

What you think should happen instead

If you cross-check that with CloudRunExecuteJobOperator._fail_if_execution_failed you noticed that in synchronous mode, there is a lot of other checks done. This logic should be also replicated in CloudRunJobFinishedTrigger.

How to reproduce

Create a CloudRunExecuteJob on a container that always fails (like exit 1) and run it in deferrable mode.

Anything else

No response

Are you willing to submit PR?

Code of Conduct

boring-cyborg[bot] commented 8 months ago

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

wiktorn commented 8 months ago

I'd love guidance on how to make the logic in CloudRunExecuteJobOperator._fail_if_execution_failed shared between CloudRunExecuteJobOperator and CloudRunJobFinishedTrigger.

Lee-W commented 6 months ago

I'd love guidance on how to make the logic in CloudRunExecuteJobOperator._fail_if_execution_failed shared between CloudRunExecuteJobOperator and CloudRunJobFinishedTrigger.

I think we'll need to find out a way to get result in async manner. Rest of the logic should be the same as sync mode. I guess the logic will be added somewhere around https://github.com/apache/airflow/blob/e979eccf63bcc3757681a17376d101a62e7d87cf/airflow/providers/google/cloud/triggers/cloud_run.py#L106-L107