apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
36.41k stars 14.11k forks source link

Datasets: Allow time based force dependency when dataset is not ready. #30974

Open eladkal opened 1 year ago

eladkal commented 1 year ago

Body

The current state: If DAG is defined with 4 datasets. Airflow will wait for all of them to be ready before scheduling the DAG. This works well and serve use cases where all 4 datasets are curial and must be ready.

The use case we don't currently handle: It is common for datasets not to be equally important. Sometimes the core datasets are ready yet some minor ones are not (for example if one of the datasets is used as enrichment) in that case DAG author may want to define "grace period" which means how much time he is willing to continue to wait before DAG should be scheduled regardless if dataset is ready or not. With pipeline sometimes "good enough" is OK. The worst that can happen is that one major pipeline (which has wide downstream depended DAGs) is stuck on some minor dataset.

Suggested ideas:

  1. Introduce the ability to skip dataset dependency check after grace period has passed.
  2. Add DatasetSensor? That can be used as a workaround (minor dependencies can be set within the DAG and not be used with the dataset feature) I don't like this one so much but this is an option.

(Inspired by https://apache-airflow.slack.com/archives/CCQ7EGB1P/p1682791720916639 )

Committer

jscheffl commented 1 year ago

Have you thought about other options as alternatives?

e.g.:

But maybe it depends n the use case when multiple data sources as data set should be scheduled together. I can imagine only a few but probably I am not aware of all :-) Anyway I would rate it "cooler" if some dependency and logic rather is being taken care of Scheduling logic and does not need special logic in the DAG via a sensor.

dakshin-k commented 1 year ago

Hi, just to provide a bit more info on one of the use cases that inspired this issue:

Our current plan to handle this is to mark only A as an inlet to C, and handle the dependency on B within the DAG itself. Disadvantage would be that B doesn't show up in the Datasets view at all, which we're not too happy about - A and B are both crucial to the system, it's just that B has a higher TTL.

potiuk commented 1 year ago

Just a reminder on that one (I personally have no opinion enough to chime in) but - since this is a change How airflow architecture is designed around datasets and their dependencies, whatever is discussed here shoudl be brought to the devlist as proposal: https://airflow.apache.org/community/ -> people might not follow this discussion / feature request here and only a devlist discussion will make it into "implementable" change. I guess it's fine to disucss it here to get the idea and concepts hashed out, but whoever would likel to implement, it should be summarised there, and the actual decision making should happen on the devlist - possibly in the form of AIP (Airflow Improvement Proposal) if the scope of it will turn to be big.

For me this is at the border of feature/discussion. Usually feature is something that adds a small thing that does not change the internals, in this case we are talking about changing the ways how dependencies are resolved by scheduler, when the scheduler considers them fulfilled, and when they are "eligible" for running, so something rather critical in terms of "core of airflow". I could see that one converted to discsussion, but I leave it up to @eladkal to decide.

eladkal commented 1 year ago

Usually feature is something that adds a small thing that does not change the internals, in this case we are talking about changing the ways how dependencies are resolved by scheduler

We are discussing possible enhancement. Should the implementation plan requires new entity (for example like happened with Timetables) or alter the feature beyond it's original function then AIP is required. I think it's too early to know if AIP is required or not.

chuxiangfeng commented 1 year ago

Hi, just to provide a bit more info on one of the use cases that inspired this issue:

  • We have two datasets, say A and B, both of which are provided by third parties (and therefore fail occasionally)
  • A is updated daily, B is updated weekly
  • Dataset C is effectively a join of A and B

    • Whenever A is updated, C needs to be updated
    • We take the newly available A, and the latest available version of B to compute C

Our current plan to handle this is to mark only A as an inlet to C, and handle the dependency on B within the DAG itself. Disadvantage would be that B doesn't show up in the Datasets view at all, which we're not too happy about - A and B are both crucial to the system, it's just that B has a higher TTL.

Yes, we will have this dilemma and expect to enhance it in later versions with algorithms like Dagster freshness scheduling.

dakshin-k commented 1 year ago

I think it might be nice if we had something similar to the TriggerRule for task instances, on datasets?

Not just an enum though, I was thinking something similar to timetables, where we could provide a few default rules like ALL_DEPENDENCIES_REFRESHED and AT_LEAST_ONE_REFRESHED, and allow users to create custom rules if needed.

Any thoughts?

cmarteepants commented 7 months ago

Hi, just to provide a bit more info on one of the use cases that inspired this issue:

  • We have two datasets, say A and B, both of which are provided by third parties (and therefore fail occasionally)
  • A is updated daily, B is updated weekly
  • Dataset C is effectively a join of A and B

    • Whenever A is updated, C needs to be updated
    • We take the newly available A, and the latest available version of B to compute C

Our current plan to handle this is to mark only A as an inlet to C, and handle the dependency on B within the DAG itself. Disadvantage would be that B doesn't show up in the Datasets view at all, which we're not too happy about - A and B are both crucial to the system, it's just that B has a higher TTL.

@dakshin-k I know it's been awhile, but I am curious re your example:

Let's say you had the ability to compute C whenever A or B is updated, allowing you to set A and B as inlets to C - would that work for your use-case? It would allow for C to be computed whenever A is updated using the most recent version of B, but it also would mean that C would be computed an extra time - whenever B is updated.

dakshin-k commented 7 months ago

@cmarteepants Yes that would solve the problem at hand, however it's not ideal as re-computing C multiple times could cause an issue as there's no guarantee that operation is idempotent.

For example, in our case the update toC is a bit like appending the newly computed dataset to the existing one. There's a separate pipeline which reads the last 30 C datasets and performs a monthly aggregation. So we'd now have to make changes to the way C itself works, or perform mitigations like de-duping the outputs.

All of that unnecessarily increases the compute and storage cost for C, not to mention the additional effort needed to switch to Datasets when the old cron scheduling works just fine 😅

cmarteepants commented 7 months ago

Gotcha, makes sense.

Paraphrasing:

DAG C: DAG that contains Task that outputs to dataset C

The crux of the problem is that DAG C cannot leverage dataset events from dataset B without setting dataset B as a scheduling dependency to DAG C. This is due to the strict coupling between data dependencies and scheduling dependencies.

eladkal commented 5 months ago

I think we are getting a bit too broad here. We should start somewhere...

Lets try to focus on two use cases:


  1. In the eye of a specific DAG not all Datasets are equally important. We should have a mechanism that says "I'd like to wait for a dataset but no more than X min"

Something like:

with DAG(
    dag_id="multiple_datasets_example",
    schedule=[
        Dataset("s3://dataset/example1.csv"),
        Dataset("s3://dataset/example2.csv"),
        Dataset("s3://dataset/example3.csv", wait_no_longer_than=timedelta(hours=1)),
    ],
    ...,
):

Which means that if there is update event for example1.csv and example2.csv but not for example3.csv we allow up to 1 hour wait which after a dagrun will be created even if example3.csv did not have update event.


  1. Some dependencies are for enrichment. They are not so important to be fresh and sometimes some datasets are even updated on monthly/quarterly. We should have the ability to specify per dataset what freshness considered OK and legit. For example:
with DAG(
    dag_id="multiple_datasets_example",
    schedule=[
        Dataset("s3://dataset/example1.csv"),
        Dataset("s3://dataset/example2.csv"),
        Dataset("s3://dataset/example3.csv", freshness=timedelta(months=1)),
    ],
    ...,
):

In this example as long example3.csv has update event from the last month it's considered valid.


The two cases I presented here can also be used together thus giving users more control over advanced datasets scheduling actions.

cmarteepants commented 5 months ago

@uranusjr you're closer to this than I am. What is your opinion?

I think there's a lot of value in supporting these scenarios - but there's something about the approach that is bothering me. Should these rules be defined as Dataset args?

A common pattern that I have seen amongst Astronomer customers is for data producers to define datasets in a consolidated file in order to make them discoverable for data consumers. Data consumers will then import them to use for scheduling purposes.

Intuitively, it's the data consumer that decides how important upstream datasets are to their process and how fresh that data needs to be. Even for the same dataset, the level of tolerance can be different across DAGs. What if we were to do something like this instead?

with DAG(
    dag_id="multiple_datasets_example",
    schedule=[
        Dataset("s3://dataset/example1.csv"),
        Dataset("s3://dataset/example2.csv"),
        wait_no_longer_than(timedelta(hours=1), Dataset("s3://dataset/example3.csv")),
    ],
    ...,
):

My other concern is specific to the second scenario. While some dependencies are for enrichment, for production there will still be some sort of expectation that multiple_datasets_example will run on some sort of cadence. How will someone know that the reason their DAG isn't running is because the 3rd dataset has gone stale? I personally would not be comfortable using a freshness rule unless I could setup a notification to tell me that it's been more than a month since "s3://dataset/example3.csv" was updated. Is that possible with today's functionality?

uranusjr commented 4 months ago

I’ve been thinking about this. Yeah the above wait_no_longer_than interface seems to be the way to go (maybe with a different name, say DatasetTimeout).

I initially considered using a timetable for this, but ultimately this does not work since we may want to configure the “timeout” of each dataset differently, and a timetable would combine very awkwardly since it carries too much other information.

(Side note: I think we will need to clean this up a lot as a part of Airflow 3, likely completely redesign the schedule API including how both timetables and datasets are passed in.)

Other than timedelta, I think maybe a cron schedule might make sense? Or even more sense? I am not exactly sure how we should interpret a timedelta. Say I expect a dataset to fire every day, so say I set timedelta(days=2). Things generally fire at midnight, but one event got delayed a little and fired on 2am. The next one missed. Should the timeout trigger on 2am or midnight? That’s a minor design decision we can figure out later.

Adding that flag on Dataset itself feels wrong to me since it’d force everything that depends on a dataset to have the same timeout. It’s not an entirely unreasonable requirement, but is a bit unnecessary to me. The idea of freshness ultimately does not live on the dataset itself IMO.

Another thing we need to consider (when we implement this) is, how should we signal a timeout event? Should it emit a DatasetEvent with a flag (what)? Should it just be implied by the user? Should it be another kind of event?

eladkal commented 4 months ago

Should these rules be defined as Dataset args?

It's a property of a specific DAG that uses the dataset. It's not a property of the dataset itself.

A common pattern that I have seen amongst Astronomer customers is for data producers to define datasets in a consolidated file in order to make them discoverable for data consumers. Data consumers will then import them to use for scheduling purposes.

I agree this is a common pattern.

(Side note: I think we will need to clean this up a lot as a part of Airflow 3, likely completely redesign the schedule API including how both timetables and datasets are passed in.)

Possibly but right now we don't know if we are having Airflow 3 and when...

Adding that flag on Dataset itself feels wrong to me since it’d force everything that depends on a dataset to have the same timeout.

I agree. What I actually meant is:

schedule=[
        Dataset("s3://dataset/example1.csv"),
        Dataset("s3://dataset/example2.csv"),
        wait_no_longer_than(Dataset("s3://dataset/example3.csv"), timedelta(hours=1),
    ]

In my point of view the wait_no_longer_than is not a property of the dataset but an option of how to treat a dataset when DAG takes dependency on it.