apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
Apache License 2.0
35.37k stars 13.81k forks source link

DAGs are able to see historical dataset events when created new #39456

Open tosheer opened 2 months ago

tosheer commented 2 months ago

Apache Airflow version


If "Other Airflow 2 version" selected, which one?

No response

What happened?

If a new dataset triggreed DAG is created for an already existing dataset. (Dataset has already existing dataset events) DAG see all dataset events from very first event for dataset.

What you think should happen instead?

If a new dataset triggreed DAG is created for an already existing dataset. (Dataset has already existing dataset events). DAG should see all the dataset events from the time when DAG was added.

How to reproduce

import pendulum

from airflow.datasets import Dataset
from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator

# [START dataset_def]
tosheer_dag1_dataset = Dataset("s3://tosheer-dag1/output_1.txt", extra={"partition": "bye"})

with DAG(
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    tags=["produces", "dataset-scheduled"],
) as dag1:
    # [START task_outlet]
    BashOperator(outlets=[tosheer_dag1_dataset], task_id="producing_task_1", bash_command="sleep 5")
    # [END task_outlet]

After this add

# [START dag_dep]
with DAG(
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    tags=["consumes", "dataset-scheduled"],
) as dag3:
    # [END dag_dep]
        bash_command='echo "ti_key={{ triggering_dataset_events }}"',

Operating System

Debian GNU/Linux 12 (bookworm)

Versions of Apache Airflow Providers

No response



Deployment details

Plain vanilla airflow.

Anything else?

This issue is associated with an earlier issue https://github.com/apache/airflow/issues/38826. Fix of that issue just fixed disabled - enabled / deleted - recreated.

Are you willing to submit PR?

Code of Conduct

RNHTTR commented 2 months ago

I'm not able to reproduce -- I think the context object in the post execute function is incorrect. context["dataset_events"]["test-cluster/test-schema/test-table"] should be (I think) context["triggered_dataset_events"]["test-cluster/test-schema/test-table"]. Even with that updated, it's coming back as an empty List.

tosheer commented 2 months ago
Screenshot 2024-05-09 at 11 54 49 PM Screenshot 2024-05-09 at 11 53 15 PM

@RNHTTR i have updated the dag definition as i was reproducing issue on local with a more simpler DAG definition. can you please try again and see if you can reproduce the issue. For more details see attached images.

tosheer commented 2 months ago


[2024-05-09, 18:24:03 UTC] {local_task_job_runner.py:120} ▶ Pre task execution logs
[2024-05-09, 18:24:04 UTC] {subprocess.py:63} INFO - Tmp dir root location: /tmp
[2024-05-09, 18:24:04 UTC] {subprocess.py:75} INFO - Running command: ['/usr/bin/bash', '-c', 'echo "ti_key=defaultdict(<class \'list\'>, {\'s3://tosheer-dag1/output_1.txt\': [DatasetEvent(id=1, dataset_id=11, extra={}, source_task_id=\'producing_task_1\', source_dag_id=\'tosheer_dataset_produces_1\', source_run_id=\'scheduled__2024-05-08T00:00:00+00:00\', source_map_index=-1), DatasetEvent(id=2, dataset_id=11, extra={}, source_task_id=\'producing_task_1\', source_dag_id=\'tosheer_dataset_produces_1\', source_run_id=\'manual__2024-05-09T18:18:06.483814+00:00\', source_map_index=-1), DatasetEvent(id=3, dataset_id=11, extra={}, source_task_id=\'producing_task_1\', source_dag_id=\'tosheer_dataset_produces_1\', source_run_id=\'manual__2024-05-09T18:18:27.256324+00:00\', source_map_index=-1), DatasetEvent(id=4, dataset_id=11, extra={}, source_task_id=\'producing_task_1\', source_dag_id=\'tosheer_dataset_produces_1\', source_run_id=\'manual__2024-05-09T18:23:54.937647+00:00\', source_map_index=-1)]})"']
[2024-05-09, 18:24:04 UTC] {subprocess.py:86} INFO - Output:
[2024-05-09, 18:24:04 UTC] {subprocess.py:93} INFO - ti_key=defaultdict(<class 'list'>, {'s3://tosheer-dag1/output_1.txt': [DatasetEvent(id=1, dataset_id=11, extra={}, source_task_id='producing_task_1', source_dag_id='tosheer_dataset_produces_1', source_run_id='scheduled__2024-05-08T00:00:00+00:00', source_map_index=-1), DatasetEvent(id=2, dataset_id=11, extra={}, source_task_id='producing_task_1', source_dag_id='tosheer_dataset_produces_1', source_run_id='manual__2024-05-09T18:18:06.483814+00:00', source_map_index=-1), DatasetEvent(id=3, dataset_id=11, extra={}, source_task_id='producing_task_1', source_dag_id='tosheer_dataset_produces_1', source_run_id='manual__2024-05-09T18:18:27.256324+00:00', source_map_index=-1), DatasetEvent(id=4, dataset_id=11, extra={}, source_task_id='producing_task_1', source_dag_id='tosheer_dataset_produces_1', source_run_id='manual__2024-05-09T18:23:54.937647+00:00', source_map_index=-1)]})
[2024-05-09, 18:24:04 UTC] {subprocess.py:97} INFO - Command exited with return code 0