apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
36.28k stars 14.08k forks source link

Scheduler unable to process large number of orphan Datasets #41185

Open NBardelot opened 1 month ago

NBardelot commented 1 month ago

Apache Airflow version

2.9.3

If "Other Airflow 2 version" selected, which one?

No response

What happened?

When it starts, the scheduler starts marking Datasets as orphans in airflow/jobs/scheduler_job_runner.py in the method _orphan_unreferenced_datasets.

A large number of Datasets is queried from the Metadata DB (~200K). It seems the query does not filter Datasets that are already orphans (it queries with HAVING both no DAG nor Task reference; thus selecting also Datasets already flagged as orphans).

As we use Kubernetes, we have a livenessProbe failing for the Scheduler since it helplessly tries to orphan everything in a single run (Note: AIRFLOWSCHEDULERPARSING_CLEANUP_INTERVAL is 60s, but changing the value would not change this behaviour).

A lot of INFO - Orphaning unreferenced dataset are logged in the Scheduler output.

Kubernetes kills the unhealthy pod. The pod starts anew, and again fails to orphan everything at once. Etc. Etc.

What you think should happen instead?

Several thing could be done to improve the situation:

  1. Limit the number of Datasets selected in the query in _orphan_unreferenced_datasets, so that the _set_orphaned works by batches of Datasets to orphan (new config AIRFLOW__SCHEDULER__UNREFERENCED_DATASETS_BATCH_SIZE for example)
  2. Do not select already orphaned Datasets, by improving the query in _orphan_unreferenced_datasets
  3. Add an optional TTL to Datasets in the Metadata DB so that old Datasets can be ignored by the query and/or cleaned up without the need to orphan them beforehand (if the TTL has passed, then orphan or not is not relevant and the computation can be skipped)

How to reproduce

Create a very large number of Datasets, that the Scheduler cannot process in one round, faster than its livenessProbe, so that the livenessProbe is not processed and the pod is considered unhealthy.

Operating System

Kubernetes

Versions of Apache Airflow Providers

apache-airflow-providers-amazon apache-airflow-providers-common-sql apache-airflow-providers-elasticsearch apache-airflow-providers-hashicorp apache-airflow-providers-http apache-airflow-providers-microsoft-winrm apache-airflow-providers-microsoft-azure apache-airflow-providers-opsgenie apache-airflow-providers-postgres apache-airflow-providers-redis apache-airflow-providers-sftp apache-airflow-providers-smtp apache-airflow-providers-ssh

with versions as of Airflow 2.9.3 constraints.

Deployment

Official Apache Airflow Helm Chart

Deployment details

AIRFLOWSCHEDULERPARSING_CLEANUP_INTERVAL = 60s (default value)

Anything else?

FYI we are also working on guidelines for our DAG developers, in order to promote best practices. We do not have one for Datasets at this stage, and I think having a large number of Datasets should be avoided; but I'm not yet sure we can shrink to a point where the requests will work...

And it would be very much appreciated if you know some guidelines. I usually look after the community's and Astronomer's guidelines and best practices, but for Datasets there seem to be few.

Are you willing to submit PR?

Code of Conduct

NBardelot commented 1 month ago

We've found the root cause of the 200k Datasets.

We have a guidline for developers so that they do not put code in the DAG global scope, but here a developer was computing a date in the global scope which was used in the Jinja template of the outlets of a task (so 2 bad practices colliding). Thus, every time the Dag Processor was running, a new Dataset was created. Plus, each new Dataset created this way was rendering the previous Dataset orphan.

We're going to check the dataset.orphaned and throw alerts from now on if the number of orphans skyrockets.

That being said, the proposals in the issue could still be useful to protect the Scheduler against such development errors in one DAG. Moreover, is it really a good idea that the Dag Processor is the one creating Datasets, while the Scheduler is the one computing the orphans? And shouldn't those processes be more isolated?

NBardelot commented 3 weeks ago

See https://github.com/apache/airflow/pull/40806.