apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
37.12k stars 14.3k forks source link

[Bug] Run `airflow dags test` with `DatasetAlias`/`Dataset` raises `sqlalchemy.orm.exc.FlushError` #42495

Open tatiana opened 1 month ago

tatiana commented 1 month ago

Apache Airflow version

2.10.0, 2.10,1, 2.10.2

If "Other Airflow 2 version" selected, which one?

No response

What happened?

Given the DAG:

from datetime import datetime

from airflow import DAG, Dataset
from airflow.datasets import DatasetAlias
from airflow.models.baseoperator import BaseOperator
from airflow.utils.context import Context

ALIAS_NAME = "some-alias"

class CustomOperator(BaseOperator):

    def __init__(self, *args, **kwargs):
        kwargs["outlets"] = [DatasetAlias(name=ALIAS_NAME)]
        super().__init__(*args, **kwargs)

    def execute(self, context: Context):
        new_outlets = [Dataset("something")]
        for outlet in new_outlets:
            context["outlet_events"][ALIAS_NAME].add(outlet)

with DAG("dataset_alias_dag", start_date=datetime(2023, 4, 20)) as dag:
    do_something = CustomOperator(task_id="do_something")
    do_something

When I try to run:

airflow dags test dataset_alias_dag  `date -Iseconds`

I get the error:

[2024-09-26T11:46:37.012+0300] {dag.py:3060} ERROR - Task failed; ti=<TaskInstance: dataset_alias_dag.do_something manual__2024-09-26T11:46:32+03:00 [success]>
Traceback (most recent call last):
  File "/Users/tati/Library/Application Support/hatch/env/virtual/astronomer-cosmos/4VBJdS-x/tests.py3.11-2.10/lib/python3.11/site-packages/airflow/models/dag.py", line 3053, in test
    _run_task(
  File "/Users/tati/Library/Application Support/hatch/env/virtual/astronomer-cosmos/4VBJdS-x/tests.py3.11-2.10/lib/python3.11/site-packages/airflow/models/dag.py", line 4357, in _run_task
    ti._run_raw_task(session=session, raise_on_defer=inline_trigger, mark_success=mark_success)
  File "/Users/tati/Library/Application Support/hatch/env/virtual/astronomer-cosmos/4VBJdS-x/tests.py3.11-2.10/lib/python3.11/site-packages/airflow/utils/session.py", line 94, in wrapper
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/Users/tati/Library/Application Support/hatch/env/virtual/astronomer-cosmos/4VBJdS-x/tests.py3.11-2.10/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 2995, in _run_raw_task
    return _run_raw_task(
           ^^^^^^^^^^^^^^
  File "/Users/tati/Library/Application Support/hatch/env/virtual/astronomer-cosmos/4VBJdS-x/tests.py3.11-2.10/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 363, in _run_raw_task
    ti._register_dataset_changes(events=context["outlet_events"], session=session)
  File "/Users/tati/Library/Application Support/hatch/env/virtual/astronomer-cosmos/4VBJdS-x/tests.py3.11-2.10/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 3058, in _register_dataset_changes
    dataset_manager.register_dataset_change(
  File "/Users/tati/Library/Application Support/hatch/env/virtual/astronomer-cosmos/4VBJdS-x/tests.py3.11-2.10/lib/python3.11/site-packages/airflow/api_internal/internal_api_call.py", line 139, in wrapper
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/Users/tati/Library/Application Support/hatch/env/virtual/astronomer-cosmos/4VBJdS-x/tests.py3.11-2.10/lib/python3.11/site-packages/airflow/utils/session.py", line 94, in wrapper
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/Users/tati/Library/Application Support/hatch/env/virtual/astronomer-cosmos/4VBJdS-x/tests.py3.11-2.10/lib/python3.11/site-packages/airflow/datasets/manager.py", line 145, in register_dataset_change
    session.flush()
  File "/Users/tati/Library/Application Support/hatch/env/virtual/astronomer-cosmos/4VBJdS-x/tests.py3.11-2.10/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 3449, in flush
    self._flush(objects)
  File "/Users/tati/Library/Application Support/hatch/env/virtual/astronomer-cosmos/4VBJdS-x/tests.py3.11-2.10/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 3588, in _flush
    with util.safe_reraise():
  File "/Users/tati/Library/Application Support/hatch/env/virtual/astronomer-cosmos/4VBJdS-x/tests.py3.11-2.10/lib/python3.11/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
    compat.raise_(
  File "/Users/tati/Library/Application Support/hatch/env/virtual/astronomer-cosmos/4VBJdS-x/tests.py3.11-2.10/lib/python3.11/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
    raise exception
  File "/Users/tati/Library/Application Support/hatch/env/virtual/astronomer-cosmos/4VBJdS-x/tests.py3.11-2.10/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 3549, in _flush
    flush_context.execute()
  File "/Users/tati/Library/Application Support/hatch/env/virtual/astronomer-cosmos/4VBJdS-x/tests.py3.11-2.10/lib/python3.11/site-packages/sqlalchemy/orm/unitofwork.py", line 456, in execute
    rec.execute(self)
  File "/Users/tati/Library/Application Support/hatch/env/virtual/astronomer-cosmos/4VBJdS-x/tests.py3.11-2.10/lib/python3.11/site-packages/sqlalchemy/orm/unitofwork.py", line 579, in execute
    self.dependency_processor.process_saves(uow, states)
  File "/Users/tati/Library/Application Support/hatch/env/virtual/astronomer-cosmos/4VBJdS-x/tests.py3.11-2.10/lib/python3.11/site-packages/sqlalchemy/orm/dependency.py", line 1136, in process_saves
    if not self._synchronize(
           ^^^^^^^^^^^^^^^^^^
  File "/Users/tati/Library/Application Support/hatch/env/virtual/astronomer-cosmos/4VBJdS-x/tests.py3.11-2.10/lib/python3.11/site-packages/sqlalchemy/orm/dependency.py", line 1252, in _synchronize
    self._verify_canload(child)
  File "/Users/tati/Library/Application Support/hatch/env/virtual/astronomer-cosmos/4VBJdS-x/tests.py3.11-2.10/lib/python3.11/site-packages/sqlalchemy/orm/dependency.py", line 257, in _verify_canload
    raise exc.FlushError(
sqlalchemy.orm.exc.FlushError: Can't flush None value found in collection DatasetModel.aliases

This DAG successfully executes when not being triggered via the dags test command.

What you think should happen instead?

I should be able to run dags test for this DAG without seeing this error message.

How to reproduce

Already described.

Operating System

Any

Versions of Apache Airflow Providers

No response

Deployment

Other

Deployment details

It is not happening during deployment (tested in Astronomer, and it worked fine). The issue happens when running the airflow dags test command locally

Anything else?

No response

Are you willing to submit PR?

Code of Conduct

uranusjr commented 1 month ago

Note that this only happens if the DAG is not already parsed by the DAG processor. Once parsed, the DatasetAlias will have an entry in the database, and the DAG will run as expected.

Running this with a plain Dataset under the same condition also produces an unexpected result:

with DAG("dataset_dag", ...):
    BashOperator(bash_command=":", outlets=Dataset("some_dataset"))

The task will succeed, but no event is emitted.

$ airflow dags test dataset_dag
...
[2024-09-26T09:50:38.635+0000] {manager.py:96} WARNING - DatasetModel Dataset(uri='some_dataset', extra=None) not found
...

Ultimately this is due to airflow dags test is essentially out-of-band execution that does not involve in the normal DAG parsing and scheduling. Dataset event emission is entirely implemented around the database, and thus does not work well in this mode.

I think we should make a decision how the design of this entire function going forward. Since the situation is sort of similar to airflow dags backfill, maybe we should make this also go through the scheduler instead? (Basically make the CLI behave like triggering a manual run in web UI.) This is a relatively large undertaking though. What should we do for 2.x?

uranusjr commented 1 month ago

I thought about this a bit and arrived at the conclusion that test should simply act entirely differently from other forms of execution: Airflow already provides a way to run a DAG outside of its schedule with manual runs (via the web UI or airflow dags trigger), and test (either from CLI as in this issue, or dag.test() in a Python file) should do something different to be meaningful.

Judging by the word test, I’m thinking this operation should further differ from trigger in that the result should be completely ephemeral and idendempotent (as long as the tasks themselves are). This means that the test run should leave no trace in the database after execution, including run records, task logs, etc. (I believe this is already almost the case.) And equally, _outlets defined in tasks should not leave a DatasetEvent entry in the database, and threfore do not trigger downstream DAGs.

Does this sound reasonable?

tatiana commented 1 month ago

@uranusjr It seems sensible for the dags test not to leave traces in the database after it is run. Another option could be to give users a configuration option and allow them to run a cleanup task as they please.

That said, I believe it is essential that people can use this command to validate somehow:

These two areas are particularly important when we take into account dynamic DAG generation tools.

Since the command is erroring, I understand that we cannot currently do this.

Currently, users of dags test can already:

If we want datasets/dataset aliases/assets to be first-class citizens in Airflow, we must also have a way of testing them efficiently.

tatiana commented 1 month ago

Agreed on the next steps with @uranusjr :

As a mid-long-term task, we must rethink how we want the dags test to work. @ashb suggested that the command be written to a "test" database. Before we close this ticket, we should have a proposed plan for this longer term as a GitHub ticket.