apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
37.01k stars 14.27k forks source link

CLI failures "_TimetableNotRegistered" on the newly added example DAG(s) #42133

Closed zachliu closed 1 month ago

zachliu commented 1 month ago

Apache Airflow version

2.10.0

If "Other Airflow 2 version" selected, which one?

2.10.1

What happened?

This PR does make sure the plugin is loaded when load_examples = True. And the example DAG itself runs ok. No "Broken Dag" import errors or anything. The UI -> Admin -> Plugins shows the 3 plugins from the example_dags/plugins

However, both the sync-perm cli and the plugins cli don't agree with it. When executing airflow sync-perm --include-dags -v, it always errors out on this example dag:

{serialized_dag.py:200} DEBUG - Deserializing DAG: example_workday_timetable
{plugins_manager.py:351} DEBUG - Plugins are already loaded. Skipping.
{plugins_manager.py:484} DEBUG - Initialize extra timetables plugins
{cli_action_loggers.py:98} DEBUG - Calling callbacks: []
Traceback (most recent call last):
  File "/usr/local/airflow/.local/bin/airflow", line 8, in <module>
    sys.exit(main())
             ^^^^^^
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/__main__.py", line 62, in main
    args.func(args)
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/cli/cli_config.py", line 49, in command
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/utils/cli.py", line 115, in wrapper
    return f(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/utils/providers_configuration_loader.py", line 55, in wrapped_function
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/providers/fab/auth_manager/cli_commands/sync_perm_command.py", line 39, in sync_perm
    appbuilder.sm.create_dag_specific_permissions()
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/providers/fab/auth_manager/security_manager/override.py", line 1072, in create_dag_specific_permissions
    dagbag.collect_dags_from_db()
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/models/dagbag.py", line 628, in collect_dags_from_db
    self.dags = SerializedDagModel.read_all_dags()
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/utils/session.py", line 97, in wrapper
    return func(*args, session=session, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/models/serialized_dag.py", line 201, in read_all_dags
    dag = row.dag
          ^^^^^^^
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/models/serialized_dag.py", line 235, in dag
    return SerializedDAG.from_dict(data)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/serialization/serialized_objects.py", line 1776, in from_dict
    return cls.deserialize_dag(serialized_obj["dag"])
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/serialization/serialized_objects.py", line 1704, in deserialize_dag
    v = decode_timetable(v)
        ^^^^^^^^^^^^^^^^^^^
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/serialization/serialized_objects.py", line 329, in decode_timetable
    raise _TimetableNotRegistered(importable_string)
airflow.serialization.serialized_objects._TimetableNotRegistered: Timetable class 'airflow.example_dags.plugins.workday.AfterWorkdayTimetable' is not registered or you have a top level database access that disrupted the session. Please check the airflow best practices documentation.

and when executing airflow plugins -o table -v, the 3 plugins from example_dags/plugins are not in the output table.

When I set load_examples = False, the sync-perm and plugin work fine without any errors.

The only way that I can run both this example DAG and the cli commands is:

  1. set load_examples = False
  2. copy the source code airflow/example_dags/plugins/workday.py to my own airflow/plugins (in my airflow.cfg, i have plugins_folder = /usr/local/airflow/plugins)
  3. re-create the example as one of my own dags but import from my own plugins: from workday import AfterWorkdayTimetable

What you think should happen instead?

Airflow CLI sync-perm should be able to process this example DAG.

How to reproduce

  1. Load the example DAGs by setting load_examples = True
  2. Run airflow sync-perm --include-dags -v

Operating System

Debian GNU/Linux 10 (buster)

Versions of Apache Airflow Providers

No response

Deployment

Other Docker-based deployment

Deployment details

No response

Anything else?

No response

Are you willing to submit PR?

Code of Conduct

nikhilkarve commented 1 month ago

Can I pick this up? This would be my first issue

potiuk commented 1 month ago

Assigned.

zachliu commented 1 month ago

i want to help fix this because i need both the dag examples and the airflow sync-perm --include-dags -v command can someone point me to the right direction?

it looks like airflow cli such as sync-perm evaluates the plugins differently? or simply because this import https://github.com/apache/airflow/blob/ab3429c3189ceb244eb3d78062159859dbe611ce/airflow/example_dags/example_workday_timetable.py#L21 is in the wrong place?

zachliu commented 1 month ago

oh :hankey::hankey::hankey: the example dags are loaded everywhere else (webserver, scheduler, triggerer) EXCEPT my worker nodes :sweat_smile: