apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
36.71k stars 14.21k forks source link

Can't pickle <class 'celery.utils.log.ProcessAwareLogger'> #859

Closed JordyMoos closed 4 years ago

JordyMoos commented 8 years ago

Hi,

My remote worker seems to fail after a while with this message ERROR - (pickle.PicklingError) Can't pickle <class 'celery.utils.log.ProcessAwareLogger'>: it's not found as celery.utils.log.ProcessAwareLogger

It seems that it is trying to picke the logger but i have no clue why. I have a two server test setup where one server runs airflow webserver and scheduler And the other server is running the worker. The dags are only located on server the webserver/scheduler and with pickly they should go to the worker. And i use Celery as the executor.

Now when i added the tutorial (as of the offical docs) to the dags folder, then everything seems to work fine (aka i see about 24 successfull tasks) But after a little while nothing seem to happen anymore. If i check Flower then it tells me thats tasks fail with this message: AirflowException: Celery command failed

And if i then check the workers log i get the Cant picke the logger error.

Hopefully anyone knows what its going wrong

Cheers

iivvaall commented 8 years ago

We also faced the issue. The error is on the stderr of the scheduler

ERROR:airflow.jobs.SchedulerJob:This Session's transaction has been rolled back due to a previous exception during flush. To begin a new transaction with this Session, first issue Session.rollback(). Original exception was: (pickle.PicklingError) Can't pickle <class 'celery.utils.log.ProcessAwareLogger'>: it's not found as celery.utils.log.ProcessAwareLogger [SQL: u'INSERT INTO dag_pickle (pickle, created_dttm, pickle_hash) VALUES (%(pickle)s, now(), %(pickle_hash)s) RETURNING dag_pickle.id'] [parameters: [{'pickle_hash': -1909267389068563253, 'pickle': <DAG: check_users>}]]
Traceback (most recent call last):
  File "/home/ivanov/src/airflow/airflow/jobs.py", line 624, in _execute
    dag = dagbag.get_dag(dag.dag_id)
  File "/home/ivanov/src/airflow/airflow/models.py", line 155, in get_dag
    orm_dag = DagModel.get_current(dag_id)
  File "/home/ivanov/src/airflow/airflow/models.py", line 1901, in get_current
    obj = session.query(cls).filter(cls.dag_id == dag_id).first()
  File "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2634, in first
    ret = list(self[0:1])
  File "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2457, in __getitem__
    return list(res)
  File "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2736, in __iter__
    return self._execute_and_instances(context)
  File "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2749, in _execute_and_instances
    close_with_result=True)
  File "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2740, in _connection_from_session
    **kw)
  File "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 905, in connection
    execution_options=execution_options)испортиться
  File "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 910, in _connection_for_bind
    engine, execution_options)
  File "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 313, in _connection_for_bind
    self._assert_active()
  File "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 214, in _assert_active
    % self._rollback_exception
InvalidRequestError: This Session's transaction has been rolled back due to a previous exception during flush. To begin a new transaction with this Session, first issue Session.rollback(). Original exception was: (pickle.PicklingError) Can't pickle <class 'celery.utils.log.ProcessAwareLogger'>: it's not found as celery.utils.log.ProcessAwareLogger [SQL: u'INSERT INTO dag_pickle (pickle, created_dttm, pickle_hash) VALUES (%(pickle)s, now(), %(pickle_hash)s) RETURNING dag_pickle.id'] [parameters: [{'pickle_hash': -1909267389068563253, 'pickle': <DAG: check_users>}]]

ProcessAwareLogger defined in celery.utils.log inside function and can not be created during unpickling. Airflow does not handle this error correctly (rollback should be explicitly called) and scheduler db session becomes unusable.

Disabling pickle just works

bolkedebruin commented 8 years ago

can you please create a jira for this?