apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
35.37k stars 13.81k forks source link

Allow DAGs to trigger on Datasets with wildcard/regex #39017

Open w0ut0 opened 2 months ago

w0ut0 commented 2 months ago

Description

I would like to have the possibility to have a DAG trigger on ANY dataset matching a wildcard/regex.

Use case/motivation

We have 2 (similar) use cases.

Trigger ingestion process on file arrival.

Files land in blob storage, which triggers events like:

{
"filename": "/raw/my/folder1/2024/04/15/file1.csv",
"event": "fileCreated"
}

and

{
"filename": "/raw/my/folder1/2024/04/15/file1.json",
"event": "fileCreated"
}

and

{
"filename": "/raw/my/folder1/2024/02/10/file1.json",
"event": "fileCreated"
}

These events are converted to Dataset update events, either by calling the Airflow API, or a DAG that polls a queue that contains the above events. In the above case, three Datasets would be updated:

Image that we have a DAG ingestCSVFromRawRealTime. I would like to be able to trigger that DAG on datasets with wildcards ingestionblob://raw/my/folder1/{today.year}/{today.month}/{today.day}/*.csv.

Similarly for some DAGs like ingestJsonFromRawRealTime or ingestBackfilledCsvFromRaw.

Ideally, we can create capture groups in the regex of the Dataset trigger, and use these capture groups as DAG parameter.

Monitor database updates

Our (Databricks) DWH, like almost all other dbms's, has a 3-level namespace: database.schema.table. We will create a mechanism to emit dataset events like

Sometimes, we want to be notified on updates to the whole database, schema or table. There are some use cases where having wildcard triggers would be beneficial:

Related issues

Are you willing to submit a PR?

Code of Conduct

lotrias17 commented 2 months ago

Hello, could a colleague (@TiDeane) and I be assigned to this issue? We are working for a university project and would be grateful to implement this feature. Thank you in advance.

lotrias17 commented 2 months ago

Also, we would be very grateful for any guidance from anyone if possible.

kovla commented 2 months ago

The original proposal seconded.

Lee-W commented 2 months ago

Hi @lotrias17, I just assigned this one to you. I can not assign it to @TiDeane. Once a comment has been left from @TiDeane, I should be able to assign it

Lee-W commented 2 months ago

@sunank200 As you submitted a bunch of dataset related PRs these days, I think you might be interested in join the discussion 🙂

TiDeane commented 2 months ago

Hello, just commenting so you can assign me as well @Lee-W. I'm going to be working on this with @lotrias17.

Lee-W commented 2 months ago

Just assigned @TiDeane . Thanks !

lotrias17 commented 1 month ago

We are running into problems with Session. How could we initialize a session in a function inside class DAG (in models/dag.py), with the intention of searching the db for the datasets related to the DAG? Thank you in advance.

Lee-W commented 1 month ago

Is @provider_session something you're looking for?

TiDeane commented 4 weeks ago

We haven't been able to get much progress on this feature, so unfortunately we're going to drop it so others with more experience can pick it up. We're very sorry, and hope for the best for future contributors.