mrn-aglic / airflow-data-aware

0 stars 0 forks source link

issue on using data-aware scheduling for back-filling cases #1

Open zeddit opened 1 month ago

zeddit commented 1 month ago

Hi, I have followed your medium post of 'Problem with using data intervals when backfilling dataset scheduled DAGs' and I learned a lot about it. great thanks for your sharing.

I have confused if airflow nowadays have solved this problem and how could I use it correctly. My use case is same as yours where we use data-aware scheduling to break up large pipelines to multiple DAGs, and we also use templated variables like data_interval_start and data_interval_end to decide the data range we want to process.

more specifically, if the consumer dag has the same data_interval_start and data_interval_end with producer dag that would be perfect.

could you give me some advices on how to achieve that or have you changed to another solution to manage large scale dags? great thanks.

mrn-aglic commented 1 month ago

Hi @zeddit , sorry for the late response.

Actually, we switched from using data-aware scheduling to just schedule the pipelines at times where we know the previous one will succeed.

As for the newest Airflow versions, I'm not sure. In Airflow 2.10 (which should be the newest version), they added:

A triggered DAG can fetch information from the dataset that triggered it using the triggering_dataset_events template or parameter.

Check out this link: https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/datasets.html#fetching-information-from-a-triggering-dataset-event

However, I'm not sure whether the issue of aggregating the data interval starts and ends still persists. I would need to update the example to run with Airflow 2.10. (I'm not sure when I would be able to do this).

If you are using a version older than Airflow 2.10 - you could try playing with TriggerDagRunOperator to trigger the next DAG. Of course, you need to be careful with the scenarios in which you want the next DAG to run. E.g. do you really want to trigger the next DAG if you're doing a backfill? You might also need to extend the triggered DAG to check the type of DAG run that triggered it. It all depends on whether all DAG runs are treated equally or not.

I'm not sure if I made too much sense or overcomplicated the answer.

EDIT: From the Airflow 2.10 link I provided:

Note that this example is using (.values() | first | first) to fetch the first of one dataset given to the DAG, and the first of one DatasetEvent for that dataset.

Which makes me believe that you would need to somehow iterate over the batch of DatasetEvent and have an instance of the task for each data_interval_start and data_interval_end that came from the previous DAG.