apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
36.47k stars 14.13k forks source link

"'module' object is not callable" error when starting celery worker #708

Closed turtlemonvh closed 8 years ago

turtlemonvh commented 8 years ago

This is the stacktrace I am getting

[2015-12-01 20:41:56,616: ERROR/Worker-1] Process Worker-1
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/billiard/process.py", line 292, in _bootstrap
    self.run()
  File "/usr/local/lib/python2.7/dist-packages/billiard/pool.py", line 292, in run
    self.after_fork()
  File "/usr/local/lib/python2.7/dist-packages/billiard/pool.py", line 395, in after_fork
    self.initializer(*self.initargs)
  File "/usr/local/lib/python2.7/dist-packages/celery/concurrency/prefork.py", line 81, in process_initializer
    app=app)
  File "/usr/local/lib/python2.7/dist-packages/celery/app/trace.py", line 161, in build_tracer
    backend = task.backend
  File "/usr/local/lib/python2.7/dist-packages/celery/app/task.py", line 924, in backend
    return self.app.backend
  File "/usr/local/lib/python2.7/dist-packages/kombu/utils/__init__.py", line 322, in __get__
    value = obj.__dict__[self.__name__] = self.__get(obj)
  File "/usr/local/lib/python2.7/dist-packages/celery/app/base.py", line 626, in backend
    return self._get_backend()
  File "/usr/local/lib/python2.7/dist-packages/celery/app/base.py", line 445, in _get_backend
    return backend(app=self, url=url)
TypeError: 'module' object is not callable
 * Running on http://0.0.0.0:8001/ (Press CTRL+C to quit)
[2015-12-01 20:41:57,696: ERROR/MainProcess] Unrecoverable error: TypeError("'module' object is not callable",)
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/celery/worker/__init__.py", line 206, in start
    self.blueprint.start(self)
  File "/usr/local/lib/python2.7/dist-packages/celery/bootsteps.py", line 123, in start
    step.start(parent)
  File "/usr/local/lib/python2.7/dist-packages/celery/bootsteps.py", line 374, in start
    return self.obj.start()
  File "/usr/local/lib/python2.7/dist-packages/celery/worker/consumer.py", line 278, in start
    blueprint.start(self)
  File "/usr/local/lib/python2.7/dist-packages/celery/bootsteps.py", line 123, in start
    step.start(parent)
  File "/usr/local/lib/python2.7/dist-packages/celery/worker/consumer.py", line 595, in start
    c.update_strategies()
  File "/usr/local/lib/python2.7/dist-packages/celery/worker/consumer.py", line 439, in update_strategies
    app=self.app)
  File "/usr/local/lib/python2.7/dist-packages/celery/app/trace.py", line 161, in build_tracer
    backend = task.backend
  File "/usr/local/lib/python2.7/dist-packages/celery/app/task.py", line 924, in backend
    return self.app.backend
  File "/usr/local/lib/python2.7/dist-packages/kombu/utils/__init__.py", line 322, in __get__
    value = obj.__dict__[self.__name__] = self.__get(obj)
  File "/usr/local/lib/python2.7/dist-packages/celery/app/base.py", line 626, in backend
    return self._get_backend()
  File "/usr/local/lib/python2.7/dist-packages/celery/app/base.py", line 445, in _get_backend
    return backend(app=self, url=url)
TypeError: 'module' object is not callable

 -------------- celery@6b67180cee15 v3.1.19 (Cipater)
---- **** ----- 
--- * ***  * -- Linux-3.13.0-24-generic-x86_64-with-Ubuntu-14.04-trusty
-- * - **** --- 
- ** ---------- [config]
- ** ---------- .> app:         airflow.executors.celery_executor.CeleryExecutor:0x7fac9a154490
- ** ---------- .> transport:   amqp://username:**@rabbitmq:5672/celery
- ** ---------- .> results:     psycopg2://username:password@postgres:5432/default
- *** --- * --- .> concurrency: 1 (prefork)
-- ******* ---- 
--- ***** ----- [queues]
 -------------- .> default          exchange=celery(direct) key=celery

I've tried running this a variety of ways:

But none of these changes have made a difference.

Running a task locally (like airflow run -l dag_id task_id 2015-11-30) works fine but running the same task over celery (like airflow run dag_id task_id 2015-11-30) gives the same warning.

The configuration I am using is below, with a few passwords changed

[core]
# The home folder for airflow, default is ~/airflow
airflow_home = /home/airflow

# The folder where your airflow pipelines live, most likely a
# subfolder in a code repository
dags_folder = /home/airflow/dags

# The folder where airflow should store its log files
base_log_folder = /home/airflow/logs

# The executor class that airflow should use. Choices include
# SequentialExecutor, LocalExecutor, CeleryExecutor
executor = CeleryExecutor

# The SqlAlchemy connection string to the metadata database.
# SqlAlchemy supports many different database engine, more information
# their website
# http://docs.sqlalchemy.org/en/latest/dialects/postgresql.html#module-sqlalchemy.dialects.postgresql.psycopg2
sql_alchemy_conn = postgresql+psycopg2://username:password@postgres:5432/default

# The amount of parallelism as a setting to the executor. This defines
# the max number of task instances that should run simultaneously
# on this airflow installation
parallelism = 32

# The number of task instances allowed to run concurrently by the scheduler
dag_concurrency = 16

# The maximum number of active DAG runs per DAG
max_active_runs_per_dag = 16

# Whether to load the examples that ship with Airflow. It's good to
# get started, but you probably want to set this to False in a production
# environment
load_examples = False

# Where your Airflow plugins are stored
plugins_folder = /home/airflow/plugins

# Secret key to save connection passwords in the db
fernet_key = SECRET_KEY

[webserver]
# The base url of your website as airflow cannot guess what domain or
# cname you are using. This is use in automated emails that
# airflow sends to point links to the right web server
base_url = http://localhost:8080
web_server_host = 0.0.0.0
web_server_port = 8080

# Secret key used to run your flask app
secret_key = SECRET

# Expose the configuration file in the web server
expose_config = true

# Set to true to turn on authentication : http://pythonhosted.org/airflow/installation.html#web-authentication
authenticate = False

# Filter the list of dags by owner name (requires authentication to be enabled)
filter_by_owner = False

[smtp]
# If you want airflow to send emails on retries, failure, and you want to
# the airflow.utils.send_email function, you have to configure an smtp
# server here
smtp_host = localhost
smtp_starttls = True
smtp_user = airflow
smtp_port = 25
smtp_password = airflow
smtp_mail_from = analytics-airflow@company.com

[celery]
# The app name that will be used by celery
celery_app_name = airflow.executors.celery_executor.CeleryExecutor

# The concurrency that will be used when starting workers with the
# "airflow worker" command. This defines the number of task instances that
# a worker will take, so size up your workers based on the resources on
# your worker box and the nature of your tasks
celeryd_concurrency = 2

# The Celery broker URL. Celery supports RabbitMQ, Redis and experimentally
# a sqlalchemy database. Refer to the Celery documentation for more
# information.
broker_url = amqp://username:password@rabbitmq:5672/celery

# Another key Celery setting
celery_result_backend = psycopg2://username:password@postgres:5432/default

# Celery Flower is a sweet UI for Celery. Airflow has a shortcut to start
# it `airflow flower`. This defines the port that Celery Flower runs on
flower_port = 8090

# Default queue that tasks get assigned to and that worker listen on.
default_queue = default

[scheduler]
# Task instances listen for external kill signal (when you clear tasks
# from the CLI or the UI), this defines the frequency at which they should
# listen (in seconds).
job_heartbeat_sec = 5

# The scheduler constantly tries to trigger new tasks (look at the
# scheduler section in the docs for more information). This defines
# how often the scheduler should run (in seconds).
scheduler_heartbeat_sec = 5

These are the exact versions of the libraries I am using with Python 2.7.6

Babel==2.1.1
Flask==0.10.1
Flask-Admin==1.2.0
Flask-Cache==0.13.1
Flask-Login==0.2.11
Flask-WTF==0.12
Jinja2==2.8
Mako==1.0.3
Markdown==2.6.5
MarkupSafe==0.23
Pygments==2.0.2
SQLAlchemy==1.0.9
WTForms==2.0.2
Werkzeug==0.11.2
airflow==1.6.1
alembic==0.8.3
amqp==1.4.7
anyjson==0.3.3
argparse==1.2.1
backports-abc==0.4
backports.ssl-match-hostname==3.4.0.2
billiard==3.3.0.21
celery==3.1.19
certifi==2015.11.20.1
cffi==1.3.1
chardet==2.0.1
chartkick==0.4.2
colorama==0.2.5
croniter==0.3.10
cryptography==1.1.1
dill==0.2.4
elasticsearch==2.1.0
enum34==1.1.1
flower==0.8.3
future==0.15.2
futures==3.0.3
gunicorn==19.4.1
html5lib==0.999
idna==2.0
ipaddress==1.0.15
itsdangerous==0.24
kombu==3.0.29
meld3==0.6.10
numpy==1.10.1
pandas==0.17.1
psycopg2==2.6.1
pyasn1==0.1.9
pycparser==2.14
python-dateutil==2.4.2
python-editor==0.4
pytz==2015.7
requests==2.8.1
setproctitle==1.1.9
singledispatch==3.4.0.3
six==1.5.2
supervisor==3.0b2
thrift==0.9.3
tornado==4.3
urllib3==1.12
wheel==0.24.0
wsgiref==0.1.2

Thanks.

turtlemonvh commented 8 years ago

OK - figured it out. This one has to win the award for one of the most misleading error messages of all time.

The problem was this setting:

# Was
celery_result_backend = psycopg2://username:password@postgres:5432/default
# Should be
celery_result_backend = db+postgresql+psycopg2://username:password@postgres:5432/default

I originally had the same string I was using for my sql_alchemy_conn setting:

celery_result_backend = postgresql+psycopg2://username:password@postgres:5432/default

But that one failed in a different (more obvious) way.

Boo non-standard connection urls.

turtlemonvh commented 8 years ago

For those who come in the future, here are the relevent docs: http://celery.readthedocs.org/en/latest/configuration.html#database-url-examples

turtlemonvh commented 8 years ago

Here's a feature request to celery to make the error messages more helpful: https://github.com/celery/celery/issues/2945