apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
37.41k stars 14.36k forks source link

Wrong timeout value for ExternalTaskSensor running in deferrable mode #43948

Open kien-truong opened 2 weeks ago

kien-truong commented 2 weeks ago

Apache Airflow version

2.10.3

If "Other Airflow 2 version" selected, which one?

No response

What happened?

The WorkflowTrigger used by ExternalTaskSensor should have a time limit set from timeout attribute instead of execution_timeout

https://github.com/apache/airflow/blob/bb234dc316f8421760255d85a37a10fd508de14f/airflow/sensors/external_task.py#L349

What you think should happen instead?

No response

How to reproduce

  1. Start an ExternalTaskSensor in deferrable mode to monitor a non-running DAG, using default arguments.
  2. The Sensor never timeout because the monitored DAGs do not update, and the timeout value is wrongly set from execution_timeout instead of timeout

Operating System

Linux

Versions of Apache Airflow Providers

No response

Deployment

Google Cloud Composer

Deployment details

No response

Anything else?

No response

Are you willing to submit PR?

Code of Conduct

boring-cyborg[bot] commented 2 weeks ago

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

kien-truong commented 2 weeks ago

I can only contribute at weekend, so feel free to pick this up if anyone feels like it.

venkateshwaracholan commented 1 week ago

@kien-truong I would like to work on this.

potiuk commented 1 week ago

Assigned you

karakanb commented 9 hours ago

I looked into this a bit but it seems like there's a fundamental issue here, I'll try to explain below.

The expected behavior would be to have a sensor that can run with retries, in case something fails during the sensor check, e.g. infra issues. The retries are not about the sensor not finding what it was supposed to, e.g. "the task is not there", but to recover from infra failures, e.g. the database being temporarily unavailable. This behavior works as expected with sensors in general.

However, when combining retries on sensors with timeouts, that's where things start getting interesting:

It seems like the user would want the same behavior between deferred and non-deferred versions of the sensor for the timeouts with retries, but I couldn't find a way to solve it without adding a new table to airflow. is the original first start time information saved somewhere?

kien-truong commented 9 hours ago

When the user sets a timeout, the intention is "wait this long from the beginning of the first try

Yeah, even in poke mode sensor does not respect this behavior but use the first poke of the current try as the starting point, which I think is a bug in itself.

The document said,

In case that the mode is poke (see below), both of them _(timeout and executiontimeout | emphasis mine) are equivalent (as the sensor is never rescheduled)

However, this is only correct if the sensor doesn't fail and retry.

karakanb commented 8 hours ago

I think the document is correct with regards to what it says: it says "the execution_timeout and the timeout behave the same for poke", which means that they don't do what we'd expect them to do, and instead just set the timeout for the specific retry attempt instead.

I agree with you that they should behave the same way. I think it'd be a relatively simple fix, but there needs to be a state that stores the attempts.