cenkalti / kuyruk

⚙️ Simple task queue for Python
https://kuyruk.readthedocs.org/
MIT License
234 stars 17 forks source link

Reject/Fail delays are not respected #50

Closed frol closed 7 years ago

frol commented 7 years ago

@cenkalti We have just realized that the implementation for #49 is buggy.

Here is a simple reproduction:

import logging
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(threadName)s %(message)s')

from kuyruk import Kuyruk

kuyruk = Kuyruk()

@kuyruk.task(retry=5, fail_delay=10000, reject_delay=10000)
def echo(message):
    logging.info("ECHO: %s", message)
    if message == 'raise an exception':
        raise Exception()

Here is how I run it:

>>> import tasks
>>> tasks.echo('raise an exception')

And here is the output (notice that all the retries happen immediately with no delays and the traceback is printed at the very end after retries exceeded):

2017-03-13 15:03:02,510 MainThread Start from server, version: 0.9, properties: {'platform': 'Erlang/OTP', 'product': 'RabbitMQ', 'information': 'Licensed under the MPL.  See http://www.rabbitmq.com/', 'cluster_name': 'rabbit@211f2b25e5f9', 'copyright': 'Copyright (C) 2007-2016 Pivotal Software, Inc.', 'capabilities': {'consumer_cancel_notify': True, 'direct_reply_to': True, 'publisher_confirms': True, 'consumer_priorities': True, 'basic.nack': True, 'connection.blocked': True, 'authentication_failure_close': True, 'per_consumer_qos': True, 'exchange_exchange_bindings': True}, 'version': '3.6.6'}, mechanisms: ['PLAIN', 'AMQPLAIN'], locales: ['en_US']
2017-03-13 15:03:02,511 MainThread Open OK!
2017-03-13 15:03:02,511 MainThread Connected to RabbitMQ
2017-03-13 15:03:02,511 MainThread using channel_id: 1
2017-03-13 15:03:02,512 MainThread Channel open
2017-03-13 15:03:02,513 MainThread Opened new channel
2017-03-13 15:03:02,513 MainThread queue_declare: kuyruk
2017-03-13 15:03:02,514 MainThread basic_consume: kuyruk
2017-03-13 15:03:02,514 MainThread Consumer started

2017-03-13 15:03:10,875 MainThread Processing task: {'sender_cmd': '/mnt/storage/miniconda3/bin/ipython', 'id': '6903605007ed11e7a58928b2bd03b9d9', 'args': ['raise an exception'], 'module': 'tasks', 'sender_timestamp': '2017-03-13T13:03:10', 'function': 'echo', 'sender_pid': 26218, 'sender_hostname': 'redatopa', 'kwargs': {}}
2017-03-13 15:03:10,875 MainThread Importing module: tasks
2017-03-13 15:03:10,875 MainThread Applying <Task of 'tasks:echo'>, args=('raise an exception',), kwargs={}
2017-03-13 15:03:10,876 MainThread ECHO: raise an exception
2017-03-13 15:03:10,876 MainThread ECHO: raise an exception
2017-03-13 15:03:10,876 MainThread ECHO: raise an exception
2017-03-13 15:03:10,876 MainThread ECHO: raise an exception
2017-03-13 15:03:10,876 MainThread ECHO: raise an exception
2017-03-13 15:03:10,876 MainThread ECHO: raise an exception
2017-03-13 15:03:10,877 MainThread tasks:echo finished in 0 seconds.
2017-03-13 15:03:10,877 MainThread Task raised an exception
2017-03-13 15:03:10,877 MainThread Traceback (most recent call last):
  File "/mnt/storage/miniconda3/lib/python3.5/site-packages/kuyruk/worker.py", line 217, in _process_task
    task, args, kwargs)
  File "/mnt/storage/miniconda3/lib/python3.5/site-packages/kuyruk/worker.py", line 266, in _run_task
    return self._apply_task(task, args, kwargs)
  File "/mnt/storage/miniconda3/lib/python3.5/site-packages/kuyruk/worker.py", line 288, in _apply_task
    return task.apply(*args, **kwargs)
  File "/mnt/storage/miniconda3/lib/python3.5/site-packages/kuyruk/task.py", line 166, in apply
    return self.f(*args, **kwargs)
  File "/mnt/storage/experiments/tasks.py", line 13, in echo
    raise Exception()
Exception

2017-03-13 15:03:10,878 MainThread Task is processed

/cc @khorolets

frol commented 7 years ago

Reading the implementation, I realized that it works, but it works not the way I expected :)

I assumed these things:

frol commented 7 years ago

Is there a way to implement rejection/task stealing instead of discarding the failing tasks? In this case, the retry logic will be performed with fail_delay delay.

I am not an expert in RabbitMQ, but I think there is no way to implement a global countdown for the retry counter, but our use-case implies that we don't want to ever loose even a single task, so we are OK with having infinite retries.

frol commented 7 years ago

@cenkalti Any thoughts on this matter?

cenkalti commented 7 years ago

Hi @frol. Thanks for the detailed report. I couldn't have a chance to take a look at this issue. I will do it as soon as I find some time. Sorry about being late.

cenkalti commented 7 years ago

Hi @frol. Sorry for the loooong delay. I would like to clarify the retry, fail_delay and the reject_delay parameters first:

I guess global retry counter can be implemented in RabbitMQ with AMQP transactions but it can make the implementation more complex. I am against this if it is not necessary.

In Kuyruk, failed tasks are not requeued. If you don't want to lose any task, you can use a wrapper function that catches exceptions and raise Reject.

Please let me know if you have other questions.

frol commented 7 years ago

Hmm, fail_delay was actually useful, and replacing it with a hardcoded 0 doesn't seem to bring any benefit. In fact, we used fail_delay=sys.maxsize to simply lock the task until we restart the server (which we will do to release a fix, which caused the crash, anyway).

cenkalti commented 7 years ago

The idea is interesting. However, I don't recommend doing this because the task still consumes memory on the RabbitMQ server. If the worker does not get restarted, they will accumulate indefinitely.