apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
36.28k stars 14.08k forks source link

DetachedInstanceError when using Variable in custom timetable plugin #41523

Open yi-cheng-chen-taiwan opened 4 weeks ago

yi-cheng-chen-taiwan commented 4 weeks ago

Apache Airflow version

Other Airflow 2 version (please specify below)

If "Other Airflow 2 version" selected, which one?

2.9.0

What happened?

I want to build a custom timetable plugin that can get holidays from meta database variables. The data can fetch from variables. But there is an exception that throw from dag.py and the dag register failure.

Traceback (most recent call last): File "/home/airflow/.local/lib/python3.12/site-packages/airflow/dag_processing/processor.py", line 182, in _run_file_processor _handle_dag_file_processing() File "/home/airflow/.local/lib/python3.12/site-packages/airflow/dag_processing/processor.py", line 163, in _handle_dag_file_processing result: tuple[int, int] = dag_file_processor.process_file( ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/session.py", line 79, in wrapper return func(*args, session=session, kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.12/site-packages/airflow/dag_processing/processor.py", line 859, in process_file serialize_errors = DagFileProcessor.save_dag_to_db( ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.12/site-packages/airflow/api_internal/internal_api_call.py", line 115, in wrapper return func(*args, *kwargs) ^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/session.py", line 79, in wrapper return func(args, session=session, kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.12/site-packages/airflow/dag_processing/processor.py", line 895, in save_dag_to_db import_errors = DagBag._sync_to_db(dags=dags, processor_subdir=dag_directory, session=session) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/session.py", line 76, in wrapper return func(*args, *kwargs) ^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/dagbag.py", line 659, in _sync_to_db for attempt in run_with_db_retries(logger=log): File "/home/airflow/.local/lib/python3.12/site-packages/tenacity/init.py", line 347, in iter do = self.iter(retry_state=retry_state) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.12/site-packages/tenacity/init.py", line 314, in iter return fut.result() ^^^^^^^^^^^^ File "/usr/local/lib/python3.12/concurrent/futures/_base.py", line 449, in result return self.get_result() ^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.12/concurrent/futures/_base.py", line 401, in get_result raise self._exception File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/dagbag.py", line 675, in _sync_to_db DAG.bulk_write_to_db(dags.values(), processor_subdir=processor_subdir, session=session) File "/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/session.py", line 76, in wrapper return func(args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/dag.py", line 3191, in bulk_write_to_db orm_dag_links = orm_dag.dag_ownerlinks or [] ^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/attributes.py", line 487, in get return self.impl.get(state, dict) ^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/attributes.py", line 959, in get value = self._fire_loader_callables(state, key, passive) File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/attributes.py", line 995, in _fire_loadercallables return self.callable(state, passive) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/strategies.py", line 863, in _load_for_state raise orm_exc.DetachedInstanceError( sqlalchemy.orm.exc.DetachedInstanceError: Parent instance <DagModel at 0x7f579f911010> is not bound to a Session; lazy load operation of attribute 'dag_owner_links' cannot proceed (Background on this error at: https://sqlalche.me/e/14/bhk3)

Test Dag code

with DAG(
    dag_id="test",
    description = 'Testing',
    start_date = datetime.datetime(2024, 8, 1, 0, 0),
    schedule=CustomTimetable(),
    catchup=False
) as dag:
    start = EmptyOperator(task_id="start")
    end = EmptyOperator(task_id="end")

start >> end

Plugin code

class CustomTimetable(Timetable):
    def test_variable(self):
        holiday = Variable.get("HolidaysFor2024")
        print("Test: " + holiday)

it still have the same error If i use the session directly.

class CustomTimetable(Timetable):
    def test_variable(self):
        with create_session() as session:
            holiday = session.scalar(select(Variable).where(Variable.key == "HolidaysFor2024").limit(1))
            print("Test: " + holiday.get_val())

What you think should happen instead?

No response

How to reproduce

class CustomTimetable(Timetable):
    def test_variable(self):
        holiday = Variable.get("HolidaysFor2024")
        print("Test: " + holiday)

    def infer_manual_data_interval(
            self,
            run_after: DateTime
    ) -> DataInterval:
        return DataInterval(start=run_after, end=run_after)

    def next_dagrun_info(
        self,
        *,
        last_automated_data_interval: DataInterval | None,
        restriction: TimeRestriction,
    ) -> DagRunInfo | None:

        self.test_variable()

        now = DateTime.utcnow().replace(tzinfo=UTC)
        return DagRunInfo.interval(start=now, end=now)

class CustomTimetablePlugin(AirflowPlugin):
    name = "custom_timetable_plugin"
    timetables = [CustomTimetable]

or

class CustomTimetable(Timetable):
    def test_variable(self):
        with create_session() as session:
            holiday = session.scalar(select(Variable).where(Variable.key == "HolidaysFor2024").limit(1))
            print("Test: " + holiday.get_val())

   ....

Operating System

linux

Versions of Apache Airflow Providers

No response

Deployment

Other Docker-based deployment

Deployment details

No response

Anything else?

No response

Are you willing to submit PR?

Code of Conduct

boring-cyborg[bot] commented 4 weeks ago

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.