celery / kombu

Messaging library for Python.
http://kombu.readthedocs.org/
BSD 3-Clause "New" or "Revised" License
2.82k stars 922 forks source link

Unable to start celery worker when BROKER is set to SqlAlchemy with Oracle DB #1221

Open ricco386 opened 4 years ago

ricco386 commented 4 years ago

I have created a simple Celery script to test the Oracle DB as a Broker.

tasks.py

from celery import Celery

app = Celery('tasks')
app.config_from_object('celeryconfig')

@app.task
def add(x, y):
    return x + y

with configuration:

BROKER_HOST = 'sqla+oracle+cx_oracle://user:password@dbhost:1521/schema'

I do start worker:

celery -A tasks worker --loglevel=debug

I can see that celery has connected, I do also see newly create tables: KOMBU_MESSAGE, KOMBU_QUEUE

But the worker ends with an exception:

sqlalchemy.exc.DatabaseError: (cx_Oracle.DatabaseError) ORA-02014: cannot select FOR UPDATE from view with DISTINCT, GROUP BY, etc.
[SQL: SELECT kombu_message_timestamp, kombu_message_id, kombu_message_visible, kombu_message_payload, kombu_message_version, kombu_message_queue_id 
FROM (SELECT kombu_message.timestamp AS kombu_message_timestamp, kombu_message.id AS kombu_message_id, kombu_message.visible AS kombu_message_visible, kombu_message.payload AS kombu_message_payload, kombu_message.version AS kombu_message_version, kombu_message.queue_id AS kombu_message_queue_id

Here is the log from terminal:

[2020-07-02 16:07:32,657: DEBUG/MainProcess] | Worker: Starting Pool
[2020-07-02 16:07:32,679: DEBUG/MainProcess] ^-- substep ok
[2020-07-02 16:07:32,680: DEBUG/MainProcess] | Worker: Starting Consumer
[2020-07-02 16:07:32,680: DEBUG/MainProcess] | Consumer: Starting Connection
[2020-07-02 16:07:32,742: INFO/MainProcess] Connected to :********
[2020-07-02 16:07:32,742: DEBUG/MainProcess] ^-- substep ok
[2020-07-02 16:07:32,742: DEBUG/MainProcess] | Consumer: Starting Events
[2020-07-02 16:07:32,759: DEBUG/MainProcess] ^-- substep ok
[2020-07-02 16:07:32,759: DEBUG/MainProcess] | Consumer: Starting Tasks
[2020-07-02 16:07:33,269: DEBUG/MainProcess] ^-- substep ok
[2020-07-02 16:07:33,269: DEBUG/MainProcess] | Consumer: Starting Heart
[2020-07-02 16:07:33,270: DEBUG/MainProcess] ^-- substep ok
[2020-07-02 16:07:33,270: DEBUG/MainProcess] | Consumer: Starting event loop
[2020-07-02 16:07:33,270: INFO/MainProcess] celery@notebook ready.
[2020-07-02 16:07:33,270: DEBUG/MainProcess] basic.qos: prefetch_count->16
[2020-07-02 16:07:33,285: CRITICAL/MainProcess] Unrecoverable error: DatabaseError('(cx_Oracle.DatabaseError) ORA-02014: cannot select FOR UPDATE from view with DISTINCT, GROUP BY, etc.')
Traceback (most recent call last):
  File "/home/richard/Projects/simple-backed/envs3/lib64/python3.8/site-packages/sqlalchemy/engine/base.py", line 1277, in _execute_context
    self.dialect.do_execute(
  File "/home/richard/Projects/simple-backed/envs3/lib64/python3.8/site-packages/sqlalchemy/engine/default.py", line 593, in do_execute
    cursor.execute(statement, parameters)
cx_Oracle.DatabaseError: ORA-02014: cannot select FOR UPDATE from view with DISTINCT, GROUP BY, etc.

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/richard/Projects/simple-backed/envs3/lib64/python3.8/site-packages/celery/worker/worker.py", line 208, in start
    self.blueprint.start(self)
  File "/home/richard/Projects/simple-backed/envs3/lib64/python3.8/site-packages/celery/bootsteps.py", line 119, in start
    step.start(parent)
  File "/home/richard/Projects/simple-backed/envs3/lib64/python3.8/site-packages/celery/bootsteps.py", line 369, in start
    return self.obj.start()
  File "/home/richard/Projects/simple-backed/envs3/lib64/python3.8/site-packages/celery/worker/consumer/consumer.py", line 318, in start
    blueprint.start(self)
  File "/home/richard/Projects/simple-backed/envs3/lib64/python3.8/site-packages/celery/bootsteps.py", line 119, in start
    step.start(parent)
  File "/home/richard/Projects/simple-backed/envs3/lib64/python3.8/site-packages/celery/worker/consumer/consumer.py", line 599, in start
    c.loop(*c.loop_args())
  File "/home/richard/Projects/simple-backed/envs3/lib64/python3.8/site-packages/celery/worker/loops.py", line 113, in synloop
    connection.drain_events(timeout=2.0)
  File "/home/richard/Projects/simple-backed/envs3/lib64/python3.8/site-packages/kombu/connection.py", line 324, in drain_events
    return self.transport.drain_events(self.connection, **kwargs)
  File "/home/richard/Projects/simple-backed/envs3/lib64/python3.8/site-packages/kombu/transport/virtual/base.py", line 963, in drain_events
    get(self._deliver, timeout=timeout)
  File "/home/richard/Projects/simple-backed/envs3/lib64/python3.8/site-packages/kombu/utils/scheduling.py", line 56, in get
    return self.fun(resource, callback, **kwargs)
  File "/home/richard/Projects/simple-backed/envs3/lib64/python3.8/site-packages/kombu/transport/virtual/base.py", line 1001, in _drain_channel
    return channel.drain_events(callback=callback, timeout=timeout)
  File "/home/richard/Projects/simple-backed/envs3/lib64/python3.8/site-packages/kombu/transport/virtual/base.py", line 745, in drain_events
    return self._poll(self.cycle, callback, timeout=timeout)
  File "/home/richard/Projects/simple-backed/envs3/lib64/python3.8/site-packages/kombu/transport/virtual/base.py", line 402, in _poll
    return cycle.get(callback)
  File "/home/richard/Projects/simple-backed/envs3/lib64/python3.8/site-packages/kombu/utils/scheduling.py", line 56, in get
    return self.fun(resource, callback, **kwargs)
  File "/home/richard/Projects/simple-backed/envs3/lib64/python3.8/site-packages/kombu/transport/virtual/base.py", line 405, in _get_and_deliver
    message = self._get(queue)
  File "/home/richard/Projects/simple-backed/envs3/lib64/python3.8/site-packages/kombu/transport/sqlalchemy/__init__.py", line 117, in _get
    msg = self.session.query(self.message_cls) \
  File "/home/richard/Projects/simple-backed/envs3/lib64/python3.8/site-packages/sqlalchemy/orm/query.py", line 3397, in first
    ret = list(self[0:1])
  File "/home/richard/Projects/simple-backed/envs3/lib64/python3.8/site-packages/sqlalchemy/orm/query.py", line 3171, in __getitem__
    return list(res)
  File "/home/richard/Projects/simple-backed/envs3/lib64/python3.8/site-packages/sqlalchemy/orm/query.py", line 3503, in __iter__
    return self._execute_and_instances(context)
  File "/home/richard/Projects/simple-backed/envs3/lib64/python3.8/site-packages/sqlalchemy/orm/query.py", line 3528, in _execute_and_instances
    result = conn.execute(querycontext.statement, self._params)
  File "/home/richard/Projects/simple-backed/envs3/lib64/python3.8/site-packages/sqlalchemy/engine/base.py", line 1014, in execute
    return meth(self, multiparams, params)
  File "/home/richard/Projects/simple-backed/envs3/lib64/python3.8/site-packages/sqlalchemy/sql/elements.py", line 298, in _execute_on_connection
    return connection._execute_clauseelement(self, multiparams, params)
  File "/home/richard/Projects/simple-backed/envs3/lib64/python3.8/site-packages/sqlalchemy/engine/base.py", line 1127, in _execute_clauseelement
    ret = self._execute_context(
  File "/home/richard/Projects/simple-backed/envs3/lib64/python3.8/site-packages/sqlalchemy/engine/base.py", line 1317, in _execute_context
    self._handle_dbapi_exception(
  File "/home/richard/Projects/simple-backed/envs3/lib64/python3.8/site-packages/sqlalchemy/engine/base.py", line 1511, in _handle_dbapi_exception
    util.raise_(
  File "/home/richard/Projects/simple-backed/envs3/lib64/python3.8/site-packages/sqlalchemy/util/compat.py", line 178, in raise_
    raise exception
  File "/home/richard/Projects/simple-backed/envs3/lib64/python3.8/site-packages/sqlalchemy/engine/base.py", line 1277, in _execute_context
    self.dialect.do_execute(
  File "/home/richard/Projects/simple-backed/envs3/lib64/python3.8/site-packages/sqlalchemy/engine/default.py", line 593, in do_execute
    cursor.execute(statement, parameters)
sqlalchemy.exc.DatabaseError: (cx_Oracle.DatabaseError) ORA-02014: cannot select FOR UPDATE from view with DISTINCT, GROUP BY, etc.
[SQL: SELECT kombu_message_timestamp, kombu_message_id, kombu_message_visible, kombu_message_payload, kombu_message_version, kombu_message_queue_id 
FROM (SELECT kombu_message.timestamp AS kombu_message_timestamp, kombu_message.id AS kombu_message_id, kombu_message.visible AS kombu_message_visible, kombu_message.payload AS kombu_message_payload, kombu_message.version AS kombu_message_version, kombu_message.queue_id AS kombu_message_queue_id 
FROM kombu_message 
WHERE kombu_message.queue_id = :queue_id_1 AND kombu_message.visible != 0 ORDER BY kombu_message.timestamp, kombu_message.id) 
WHERE ROWNUM <= :param_1 FOR UPDATE]
[parameters: {'queue_id_1': 2, 'param_1': 1}]
(Background on this error at: http://sqlalche.me/e/13/4xp6)
[2020-07-02 16:07:33,291: DEBUG/MainProcess] | Worker: Closing Pool...
[2020-07-02 16:07:33,480: DEBUG/MainProcess] | Worker: Closing Consumer...
[2020-07-02 16:07:33,480: DEBUG/MainProcess] | Worker: Stopping Consumer...

List of installed libs with versions:

pip freeze
amqp==2.6.0
billiard==3.6.3.0
celery==4.4.6
cx-Oracle==8.0.0
future==0.18.2
kombu==4.6.11
psycopg2==2.8.5
pytz==2020.1
SQLAlchemy==1.3.18
vine==1.3.0

If I do change configuration to postgres:

BROKER_HOST = 'sqla+postgres://user:password@dbhost:5432/schema'

Worker seems to be running fine...

thedrow commented 3 years ago

Apparently we don't support Oracle DB as a message broker. Since Celery 4.x we don't support using a database as a broker anyway the code is not maintained.

Feel free to send a PR if you really really need a database as a message broker. It's usually not a good idea.

auvipy commented 3 years ago

so far PostgreSQL is supported with SQLAlchemy. never tested or had any report about oracle. if you are eager to collaborate then we might be able to help with some feedback etc.