apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
36.71k stars 14.21k forks source link

celery worker can't read dags from db #15687

Closed luoyuliuyin closed 10 months ago

luoyuliuyin commented 3 years ago

Apache Airflow version: 2.0.1

Kubernetes version (if you are using kubernetes) (use kubectl version):

Environment:

My environment configuration: executor=CeleryExecutor broker_url=amqp://user:password@xxx:5672 result_backend=db+mysql://user:password@xxx:3306/airflow sql_alchemy_conn=mysql://user:password@xxx:3306/airflow donot_pickle=False store_dag_code = True

What happened: celery worker can't read dags from db

What you expected to happen: celery worker read dags from db, not from local file

How to reproduce it:

  1. I create a dag name a_weixiang_test in the dags_folder directory (the dag file only create in scheduler, not sync to web and worker). image

  2. Then access the dag through ui, i can visit the dag. This means that the webServer can get dag through db. image

  3. Trigger a new dag run via ui, i get the following error message: Failed to execute task dag_id could not be found: a_weixiang_test. Either the dag did not exist or it failed to parse.. image

the worker should read dags from db not from local file

Anything else we need to know:

I noticed that the data in the database does not meet expectations, all pickle-related fields of dag table are empty, these fields are: last_pickled, last_expired, pickle_id,but the dag_pickle have records, does this matter? image image

PxTruy commented 3 years ago

Any update on it? I have the same issue.

microhuang commented 3 years ago

me too, +1.

airflow==2.1.2 and airflow==2.1.3

AlessioC31 commented 3 years ago

Any news? I have the same problem, here they say The worker needs to have access to its DAGS_FOLDER, and you need to synchronize the filesystems by your own means. A common setup would be to store your DAGS_FOLDER in a Git repository and sync it across machines using Chef, Puppet, Ansible, or whatever you use to configure machines in your environment. If all your boxes have a common mount point, having your pipelines files shared there should work as well

So I guess this is intended?

leonsmith commented 3 years ago

So after tweaking a couple of things from my work in progress merge request (to link the pickle to the dag) this is still throwing an error when the task is picked up from the worker.


ModuleNotFoundError: No module named 'unusual_prefix_d83562dd8d4afa96cb7d254b46b193a7106a58ef_test_logging'
  File "airflow/executors/celery_executor.py", line 121, in _execute_in_fork
    args.func(args)
  File "airflow/cli/cli_parser.py", line 48, in command
    return func(*args, **kwargs)
  File "airflow/utils/cli.py", line 92, in wrapper
    return f(*args, **kwargs)
  File "airflow/cli/commands/task_command.py", line 274, in task_run
    dag = get_dag_by_pickle(args.pickle)
  File "airflow/utils/session.py", line 70, in wrapper
    return func(*args, session=session, **kwargs)
  File "airflow/utils/cli.py", line 221, in get_dag_by_pickle
    dag_pickle = session.query(DagPickle).filter(DagPickle.id == pickle_id).first()
  File "sqlalchemy/orm/query.py", line 3429, in first
    ret = list(self[0:1])
  File "sqlalchemy/orm/query.py", line 3203, in __getitem__
    return list(res)
  File "sqlalchemy/orm/loading.py", line 100, in instances
    cursor.close()
  File "sqlalchemy/util/langhelpers.py", line 68, in __exit__
    compat.raise_(
  File "sqlalchemy/util/compat.py", line 182, in raise_
    raise exception
  File "sqlalchemy/orm/loading.py", line 80, in instances
    rows = [proc(row) for row in fetch]
  File "sqlalchemy/orm/loading.py", line 80, in <listcomp>
    rows = [proc(row) for row in fetch]
  File "sqlalchemy/orm/loading.py", line 579, in _instance
    _populate_full(
  File "sqlalchemy/orm/loading.py", line 725, in _populate_full
    dict_[key] = getter(row)
  File "sqlalchemy/sql/sqltypes.py", line 1723, in process
    return loads(value)
  File "dill/_dill.py", line 327, in loads
    return load(file, ignore, **kwds)
  File "dill/_dill.py", line 313, in load
    return Unpickler(file, ignore=ignore, **kwds).load()
  File "dill/_dill.py", line 525, in load
    obj = StockUnpickler.load(self)
  File "dill/_dill.py", line 515, in find_class
    return StockUnpickler.find_class(self, module, name)```
potiuk commented 3 years ago

This issue is misunderstanding of how Airflow Works (currently) workers ALWAYS read DAGs from the DAGS_FOLDER. There is no way currently to make them read the DAGs from DB. Worker MUST have Dags folder locally mounted and there is currently other way.

The DAGs stored in the DB are only used to be displayed in the Webserver - not to execute the DAGs from the DB. It is impossible to read them from DB for many reasons - for example because you do not have in the DB all the potential files those DAG import.

Getting read of workers having to access DAGS_FOLDER is not even in plans yet (though ther are AIPs like DAG fetcher which are still in Draft and try to address this new feature (but it's not a prority currently to implement it).

https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-5+Remote+DAG+Fetcher

ashb commented 3 years ago

@potiuk That is only true by default, but there is a setting called donot_pickle which as a default value of True, but if that is set to false then the dag should be pickled in to the database, and --pickle-id passed to the worker meaning that it doesn't read it from disk:

https://github.com/apache/airflow/blob/e7925d8255e836abd8912783322d61b3a9ff657a/airflow/cli/commands/task_command.py#L272-L276

potiuk commented 3 years ago

Ah TIL. I was not aware at all that we can have worker without DAG files mounted locally

ashb commented 3 years ago

It's not well used, and because it uses stock Pickle it probably doesn't work all that well (even bugs like this aside) :D

kaxil commented 3 years ago

FWIW -- We should remove this too in favour of DAG Versioning + Remote DAG Fetcher work sometime early next year.

potiuk commented 3 years ago

Agree with @kaxil

joshzana commented 3 years ago

I just spent time digging through this, came up with the same local fix as in #18584 and then hit the same ModuleNotFoundError. Can the immediate action here be to remove the pickle information from the docs? That would save time for other uses in the meantime.

See https://airflow.apache.org/docs/apache-airflow/stable/cli-and-env-variables-ref.html#scheduler https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html?highlight=pickle#donot-pickle

For context, we run a high number of tasks in parallel, and our DAGs are generated dynamically based on queries to our own Postgres DB. We have observed an increase in load to our DBs when many tasks start up in parallel, which we have root caused down to the way that task workers fill up the dag bag on launch. Was hoping to take advantage of the DAG pickling support to reduce the load. I hope that the future feature will satisfy this use case.

derkuci commented 2 years ago

Same here, dynamic DAGs (relatively slow to generate), and was hoping to use the "dag pickle" feature but found #18584 and this.

github-actions[bot] commented 11 months ago

This issue has been automatically marked as stale because it has been open for 365 days without any activity. There has been several Airflow releases since last activity on this issue. Kindly asking to recheck the report against latest Airflow version and let us know if the issue is reproducible. The issue will be closed in next 30 days if no further activity occurs from the issue author.

github-actions[bot] commented 10 months ago

This issue has been closed because it has not received response from the issue author.