apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
37.21k stars 14.32k forks source link

Change signature of `cleanup_stuck_queued_tasks` to return TIs or TI Keys #40490

Open dstandish opened 4 months ago

dstandish commented 4 months ago

Body

This has gotten to an awkward place where we are checking that repr(ti) is in the list of "readable versions of ti"

There's no guarantee that the executor will use repr and, just why not use the TI object.

Here's the relevant bit of code https://github.com/apache/airflow/blob/main/airflow/jobs/scheduler_job_runner.py#L1572-L1580

Reproduced here for convenience:

                cleaned_up_task_instances = set(executor.cleanup_stuck_queued_tasks(tis=stuck_tis))
                for ti in stuck_tis:
                    if repr(ti) in cleaned_up_task_instances:
                        self._task_context_logger.warning(
                            "Marking task instance %s stuck in queued as failed. "
                            "If the task instance has available retries, it will be retried.",
                            ti,
                            ti=ti,
                        )

Should be something like this instead:

                for ti in executor.cleanup_stuck_queued_tasks(tis=stuck_tis):
                    self._task_context_logger.warning(
                        "Marking task instance %s stuck in queued as failed. "
                        "If the task instance has available retries, it will be retried.",
                        ti,
                        ti=ti,
                    )

But it will take some effort to figure out how to evolve the executor interface (deprecation warning, updating providers, etc)

I suppose this is a cost of having executor be a public interface.

just fyisies @o-nikolas @potiuk @pankajastro @sunank200 @vincbeck

Committer

vincbeck commented 4 months ago

What would be the change in the executor interface here?

dstandish commented 4 months ago

What would be the change in the executor interface here?

cleanup_stuck_queued_tasks would return tis instead of strings

dstandish commented 4 months ago

it could also probably use a rename.

cleanup is a bit vague.

seems maybe fail is a better name...

but maybe not worth the change

aritra24 commented 4 months ago

Hi @dstandish if you could guide me on some high level pointers on how to go about this I would like to take this up if it's still up for taking?

dstandish commented 4 months ago

Hi @dstandish if you could guide me on some high level pointers on how to go about this I would like to take this up if it's still up for taking?

I think the goal is to change the signature of cleanup_stuck_queued_tasks to be a generator that yields the TIs as you fail them.

The only issue is backcompat. If someone is using new airflow version with an old version of the executor, we have to handle that.

One option would be to just not emit the task context logger message in this scenario.

To achieve that, we could wrap self._task_context_logger.warning calls inside a try / except, or a with suppress(...) and then if ti is not actually a ti, then just don't send the message.

Actually you probably don't need to wrap each call. You could just add another except block after except NotImplementedError

Another option would be to add full backcompat logic. I.e. either inspect the signature of the function, or look at what is actually returned and if you're getting strings, look up the TI.

I don't have a strong opinion about it.

aritra24 commented 4 months ago

Ack, let me work on it over the next few days