apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
36.16k stars 14.04k forks source link

Support setting inlets/outlets during task execution #34206

Open tatiana opened 1 year ago

tatiana commented 1 year ago

Description

It would be great if users could set datasets during task execution, similar to:

from datetime import datetime
from airflow import DAG, Dataset
from airflow.models.baseoperator import BaseOperator
from airflow.utils.context import Context

class CustomOperator(BaseOperator):

    def execute(self, context: Context):
        new_outlets = [Dataset("something", is_dynamic=True)]
        self.add_outlets(new_outlets)

with DAG("dag_outlet", start_date=datetime(2023, 4, 20)) as dag:
    do_something = CustomOperator(task_id="do_something")
    do something

It would be very good if the Airflow UI displayed this information and the inlets/outlets in the Datasets view without being deleted during DAG parsing.

Use case/motivation

As of Airflow 2.7, users are expected to set inlets and outlets during DAG parsing time. If Datasets are set during task execution, they get overridden by the scheduler when the DAGs are parsed again. This was a design decision to avoid poor Dataset URI designs based on execution time, for instance.

There are several circumstances when users could benefit from being able to set inlets/outlets during task execution. Users could want to set inlets/outlets depending on the objects in an S3 folder or SQL table. This feature could be particularly useful for dynamic task mapping. A concrete example of this use case was discussed in this Airflow library to run dbt: https://github.com/astronomer/astronomer-cosmos/issues/305 and follow up issue https://github.com/astronomer/astronomer-cosmos/issues/522.

Below is an attempt to set inlets and outlets during task execution (called from the operator execute method). Things are set momentarily until the next scheduler DAG parsing.

    def register_dataset(self, new_inlets: list[Dataset], new_outlets: list[Dataset]) -> None:
        with create_session() as session:
            self.outlets.extend(new_outlets)
            self.inlets.extend(new_inlets)
            for task in self.dag.tasks:
                if task.task_id == self.task_id:
                    task.outlets.extend(new_outlets)
                    task.inlets.extend(new_inlets)
            DAG.bulk_write_to_db([self.dag], session=session)
            session.commit()

Related issues

No response

Are you willing to submit a PR?

Code of Conduct

ashb commented 1 year ago

Yeah, having some way of doing this would be useful.

I think we'll need two things here:

  1. A "nice" way of adding a dataset from inside task execution (such that it makes it to the DB)
  2. A way of marking those Datasets as "execution_time"/dynamic such that the dataset orphan cleanup code doesn't remove them again.

We probably to document this clearly that the datasets should be "complete" -- i.e. whatever outlets/inlets you set at execution time will be taken as the total set, and also document it clearly that the datasets for this Task will be over-written each time a tasks runs. I.e. this does not allow the datasets to "change" from execution to execution.

jscheffl commented 11 months ago

I was thinking about this a bit. I like more "dynamics" in data sets but was thinking about the use case.

Let's assume the dynamic comes from an S3 bucket where you want to have the dataset being "flexible" based on the object dropped in the bucket. Dynamically defining the outlet would give the burden hat you also need to create a consuming DAG dynamically or you need to glob a dataset pattern. But this also would generate a huge list of datasets over time which might be temporary.

How about using additional "meta data" like information and a rather high level dataset? For example the dataset itself just points to the S3 bucket and the outlet is triggered to this but can leave some meta data with the event about which file (or whatsoever details) are triggering in the bucket. Think about a JSON structure describing for example { "filename": "my_test_record.csv" }. Such (dict?) kind of meta data could be used as dag_run.conf for the DAGs triggered by the Dataset. This would use existing mecahnisms and no need to dynamically register DAGs for (might be flexible but not known at point of authoring) datasets.

mpgreg commented 9 months ago

This is something that will be necessary for MLops use cases.

@jscheffl if I'm understanding your suggestion correctly couldn't we just use extras to store dynamic content?

I tried this and for some reason my downstream DAGs' tasks cannot read extras from triggering_dataset_events. I can see the triggering event but extra is empty.

[2023-12-03, 12:20:40 UTC] {logging_mixin.py:154} INFO - defaultdict(<class 'list'>, {'USAGE_FEATURES': [DatasetEvent(id=2, dataset_id=1, extra={}, source_task_id='build_features', source_dag_id='feature_engineering', source_run_id='dataset_triggered__2023-12-03T12:20:31.045106+00:00', source_map_index=-1)]})

even though the task in upstream DAG clearly put out the correct pipeline_outlets

['airflow.datasets.Dataset@version=1(uri=USAGE_FEATURES,extra={\'cutoff_date\': "\'2022-11-15\'", \'cost_categories\': [\'compute\', \'storage\']})']

Does anyone know if it is supposed to work this way?

tokoko commented 9 months ago

@mpgreg triggering_dataset_events returns DatasetEvent objects, not Dataset objects. they both have a field named extra, but those two are unrelated to one another. Currently there's no way to set extra field for a dataset event. see #35297

jaklan commented 2 weeks ago

Are there any updates on that? I was just reading docs for Dataset Aliases, but they don't seem to resolve the issue.

Our use-case is - we are triggering a dbt Cloud job using DbtCloudRunJobOperator. dbt Cloud jobs are aimed to be triggered for e.g. all the resources in the project or for specific resource tags, but not for single tables (such workflow is not recommended by dbt Labs team, although it was our initial idea). In other words - one job is triggering a build of multiple tables.

We would like to allow people maintaining downstream DAGs to utilise data-driven scheduling in a way that they only specify tables which they actually need as dependencies. It has two implications:

  1. the dbt Cloud task has to have all tables specified as its outlets - that's the most problematic part, because each time you run that job - the list of executed tables can be different (e.g. there was a new table added with the specific tag). If we have dynamic outlets - we would be able to pull metadata from dbt Cloud, retrieve info about executed tables and define them as outlets (either by extending the operator or having a separate task for that)
  2. we should be able to control triggering Dataset events - if dbt Cloud job fails, it could mean that e.g. 9 tables were executed successfully, and only the 10th table failed. In such scenario we would like to trigger events for these 9 models, although task itself failed

Reading the docs I saw that part:

The downstream DAG is not triggered if no datasets are associated to the alias for a particular given task run.

So correct me if I'm wrong - but the second point seems to be already possible in Airflow 2.10, right? I'm just not sure if events are triggered if task fails after calling outlet_events["<outlet>"].add(), but even if not - we could just handle that in a separate task responsible for parsing dbt Cloud job metadata and triggering events for datasets related to successful tables (so that one will succeed even if dbt Cloud task fails). However, the first point is still the main blocker.

I would be also grateful for any suggestions how we can tackle the above problem until dynamic outlets are possible.