asmodehn / celeros

Celery ROS python interface
0 stars 2 forks source link

Celery workers receive multiple tasks even though prefetch is set to 1 #5

Closed heuristicus closed 8 years ago

heuristicus commented 9 years ago

When multiple tasks are sent to the robots, instead of ignoring any task after the first they receive, they instead prefetch another task, which we do not want.

Current behaviour observed is that one worker (simulated robot) will accept a task, and then when another task is sent accept that as well. Once all workers have two tasks they no longer accept more. Once a task is completed, any tasks that were waiting in the queue are accepted.

heuristicus commented 9 years ago

The fact that 2 tasks are received when we have CELERYD_PREFETCH_MULTIPLIER=1 and CELERYD_CONCURRENCY=1 might be coming from this line.

heuristicus commented 9 years ago

Looks like the QoS object might be the problem, since it determines whether the task should be accepted (?). It has something to do with the prefetch count, at least.

heuristicus commented 9 years ago

The qos object has a can_consume function which defines whether a message can be received from the incoming queue (or wherever it comes from).

Not quite sure how the prefetch_count variable in the object works, but it looks like it is incremented and decremented when a message is received. If the prefetch count is zero, then the function will return true, because having a prefetch count of zero should result in the consumer consuming however many messages come in.

Alternatively, the prefetch count can be nonzero, but the sizes of the delivered and dirty queues may result in the second part of the condition being fulfilled. It seems as though once a message is accepted by a worker, its hash is added to the dirty queue - if there is only a single message in the delivered queue and prefetch is 1, then we will also return true from the can_consume function.

In a way, this is expected behaviour, since one of the tasks has already been started, and we want to prefetch a single task. As such, we expect to have one task running and one task waiting to be run. In actuality, we want zero prefetched tasks, just a single task that starts running as soon as it is accepted (?).

heuristicus commented 9 years ago

http://stackoverflow.com/questions/16040039/understanding-celery-task-prefetching

heuristicus commented 9 years ago

The following can be used to ensure that a task is only executed once:

In celeryconfig.py:

# if a task is not acknowledged within 30 seconds, by the worker that received it, send it to another worker
BROKER_TRANSPORT_OPTIONS = {'visibility_timeout': 30}
from celery.task.control import revoke

In delivery feedback init of tasks.py

# revoke the task once it has been started - this should mean that
# it will not be executed by any other worker.
revoke(task.request.id, terminate=False)`
heuristicus commented 9 years ago

Some more digging into where exactly this is set.

The value read is triggered by the _getopt function of the worker.__init__ file.

The _getopt function then calls find_value_for_key in app.utils. This function fetches a default value from app.defaults.

Looks like this does not look into the config file for settings?

asmodehn commented 8 years ago

Did we try this : https://celery.readthedocs.org/en/latest/userguide/optimizing.html#prefork-pool-prefetch-settings On Aug 26, 2015 10:02 AM, "Michal Staniaszek" notifications@github.com wrote:

Some more digging into where exactly this is set.

The value read is triggered by the _getopt https://github.com/celery/celery/blob/320ae3c971738f53022bb2b44e5cb6c0afa94e84/celery/worker/__init__.py#L394 function of the worker.init file.

The _getopt function then calls find_value_for_key https://github.com/celery/celery/blob/320ae3c971738f53022bb2b44e5cb6c0afa94e84/celery/app/utils.py#L110 in app.utils. This function fetches a default value from app.defaults.

Looks like this does not look into the config file for settings?

— Reply to this email directly or view it on GitHub https://github.com/asmodehn/celeros/issues/5#issuecomment-134781816.

asmodehn commented 8 years ago

So the solution is to setup ACKS_LATE. More details there : https://github.com/celery/celery/issues/2788