apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
37.39k stars 14.35k forks source link

Removing DAG dataset dependency when it is already ready results in SQLAlchemy cascading delete error #27509

Closed dhatch-niv closed 2 years ago

dhatch-niv commented 2 years ago

Apache Airflow version

2.4.2

What happened

I have a DAG that is triggered by three datasets. When I remove one or more of these datasets, the web server fails to update the DAG, and airflow dags reserialize fails with an AssertionError within SQLAlchemy. Full stack trace below:

docker-airflow-scheduler-1  |   File "/home/airflow/.local/lib/python3.9/site-packages/airflow/utils/session.py", line 75, in wrapper
docker-airflow-scheduler-1  |     return func(*args, session=session, **kwargs)
docker-airflow-scheduler-1  |   File "/home/airflow/.local/lib/python3.9/site-packages/airflow/dag_processing/processor.py", line 781, in process_file
docker-airflow-scheduler-1  |     dagbag.sync_to_db(processor_subdir=self._dag_directory, session=session)
docker-airflow-scheduler-1  |   File "/home/airflow/.local/lib/python3.9/site-packages/airflow/utils/session.py", line 72, in wrapper
docker-airflow-scheduler-1  |     return func(*args, **kwargs)
docker-airflow-scheduler-1  |   File "/home/airflow/.local/lib/python3.9/site-packages/airflow/models/dagbag.py", line 644, in sync_to_db
docker-airflow-scheduler-1  |     for attempt in run_with_db_retries(logger=self.log):
docker-airflow-scheduler-1  |   File "/home/airflow/.local/lib/python3.9/site-packages/tenacity/__init__.py", line 382, in __iter__
docker-airflow-scheduler-1  |     do = self.iter(retry_state=retry_state)
docker-airflow-scheduler-1  |   File "/home/airflow/.local/lib/python3.9/site-packages/tenacity/__init__.py", line 349, in iter
docker-airflow-scheduler-1  |     return fut.result()
docker-airflow-scheduler-1  |   File "/usr/local/lib/python3.9/concurrent/futures/_base.py", line 439, in result
docker-airflow-scheduler-1  |     return self.__get_result()
docker-airflow-scheduler-1  |   File "/usr/local/lib/python3.9/concurrent/futures/_base.py", line 391, in __get_result
docker-airflow-scheduler-1  |     raise self._exception
docker-airflow-scheduler-1  |   File "/home/airflow/.local/lib/python3.9/site-packages/airflow/models/dagbag.py", line 658, in sync_to_db
docker-airflow-scheduler-1  |     DAG.bulk_write_to_db(
docker-airflow-scheduler-1  |   File "/home/airflow/.local/lib/python3.9/site-packages/airflow/utils/session.py", line 72, in wrapper
docker-airflow-scheduler-1  |     return func(*args, **kwargs)
docker-airflow-scheduler-1  |   File "/home/airflow/.local/lib/python3.9/site-packages/airflow/models/dag.py", line 2781, in bulk_write_to_db
docker-airflow-scheduler-1  |     session.flush()
docker-airflow-scheduler-1  |   File "/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/orm/session.py", line 3345, in flush
docker-airflow-scheduler-1  |     self._flush(objects)
docker-airflow-scheduler-1  |   File "/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/orm/session.py", line 3485, in _flush
docker-airflow-scheduler-1  |     transaction.rollback(_capture_exception=True)
docker-airflow-scheduler-1  |   File "/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
docker-airflow-scheduler-1  |     compat.raise_(
docker-airflow-scheduler-1  |   File "/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/util/compat.py", line 207, in raise_
docker-airflow-scheduler-1  |     raise exception
docker-airflow-scheduler-1  |   File "/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/orm/session.py", line 3445, in _flush
docker-airflow-scheduler-1  |     flush_context.execute()
docker-airflow-scheduler-1  |   File "/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/orm/unitofwork.py", line 456, in execute
docker-airflow-scheduler-1  |     rec.execute(self)
docker-airflow-scheduler-1  |   File "/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/orm/unitofwork.py", line 577, in execute
docker-airflow-scheduler-1  |     self.dependency_processor.process_deletes(uow, states)
docker-airflow-scheduler-1  |   File "/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/orm/dependency.py", line 552, in process_deletes
docker-airflow-scheduler-1  |     self._synchronize(
docker-airflow-scheduler-1  |   File "/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/orm/dependency.py", line 610, in _synchronize
docker-airflow-scheduler-1  |     sync.clear(dest, self.mapper, self.prop.synchronize_pairs)
docker-airflow-scheduler-1  |   File "/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/orm/sync.py", line 86, in clear
docker-airflow-scheduler-1  |     raise AssertionError(
docker-airflow-scheduler-1  | AssertionError: Dependency rule tried to blank-out primary key column 'dataset_dag_run_queue.dataset_id' on instance '<DatasetDagRunQueue at 0xffff5d213d00>'

What you think should happen instead

The DAG does not properly load in the UI, and no error is displayed. Instead, the old datasets that have been removed should be removed as dependencies and the DAG should be updated with the new dataset dependencies.

How to reproduce

Initial DAG:

def foo():
    pass

@dag(
    dag_id="test",
    start_date=pendulum.datetime(2022, 1, 1),
    catchup=False,
    schedule=[
        Dataset('test/1'),
        Dataset('test/2'),
        Dataset('test/3'),
    ]
)
def test_dag():
    @task
    def test_task():
        foo()

    test_task()

test_dag()

At least one of the datasets should be 'ready'. Now dataset_dag_run_queue will look something like below:

airflow=# SELECT * FROM dataset_dag_run_queue ;
 dataset_id |            target_dag_id            |          created_at
------------+-------------------------------------+-------------------------------
         16 | test | 2022-11-02 19:47:53.938748+00
(1 row)

Then, update the DAG with new datasets:

def foo():
    pass

@dag(
    dag_id="test",
    start_date=pendulum.datetime(2022, 1, 1),
    catchup=False,
    schedule=[
        Dataset('test/new/1'),      # <--- updated 
        Dataset('test/new/2'),
        Dataset('test/new/3'),
    ]
)
def test_dag():
    @task
    def test_task():
        foo()

    test_task()

test_dag()

Now you will observe the error in the web server logs or when running airflow dags reserialize.

I suspect this issue is related to handling of cascading deletes on the dataset_id foreign key for the run queue table. Dataset id = 16 is one of the datasets that has been renamed.

Operating System

docker image - apache/airflow:2.4.2-python3.9

Versions of Apache Airflow Providers

apache-airflow-providers-amazon==6.0.0
apache-airflow-providers-celery==3.0.0
apache-airflow-providers-cncf-kubernetes==4.4.0
apache-airflow-providers-common-sql==1.2.0
apache-airflow-providers-docker==3.2.0
apache-airflow-providers-elasticsearch==4.2.1
apache-airflow-providers-ftp==3.1.0
apache-airflow-providers-google==8.4.0
apache-airflow-providers-grpc==3.0.0
apache-airflow-providers-hashicorp==3.1.0
apache-airflow-providers-http==4.0.0
apache-airflow-providers-imap==3.0.0
apache-airflow-providers-microsoft-azure==4.3.0
apache-airflow-providers-mysql==3.2.1
apache-airflow-providers-odbc==3.1.2
apache-airflow-providers-postgres==5.2.2
apache-airflow-providers-redis==3.0.0
apache-airflow-providers-sendgrid==3.0.0
apache-airflow-providers-sftp==4.1.0
apache-airflow-providers-slack==6.0.0
apache-airflow-providers-sqlite==3.2.1
apache-airflow-providers-ssh==3.2.0

Deployment

Docker-Compose

Deployment details

Running using docker-compose locally.

Anything else

To trigger this problem the dataset to be removed must be in the "ready" state so that there is an entry in dataset_dag_run_queue.

Are you willing to submit PR?

Code of Conduct

boring-cyborg[bot] commented 2 years ago

Thanks for opening your first issue here! Be sure to follow the issue template!

ephraimbuddy commented 2 years ago

Reproduced with:

 def test_dataset_cascade_deletion(self,dag_maker, session):
      from airflow.decorators import task

      with dag_maker(schedule=None, serialized=True) as dag1:
          @task(outlets=Dataset('test/1'))
          def test_task1():
              print(1)
          test_task1()

      dr1 = dag_maker.create_dagrun()
      test_task1 = dag1.get_task('test_task1')

      with dag_maker(dag_id='testdag', schedule=[Dataset('test/1')], serialized=True) as dag2:
          @task
          def test_task2():
              print(1)
          test_task2()

      ti = dr1.get_task_instance(task_id='test_task1')
      ti.run()
      # Change the dataset.
      with dag_maker(dag_id='testdag', schedule=[Dataset('test2/1')], serialized=True) as dag2:
          @task
          def test_task2():
              print(1)
          test_task2()

on it.

dhatch-niv commented 2 years ago

Thanks for the quick fix on this @uranusjr 🔥