Flared / dramatiq-abort

Dramatiq extension to abort message
http://flared.github.io/dramatiq-abort
GNU Lesser General Public License v3.0
40 stars 15 forks source link

Tasks are re-scheduled #30

Open jonashaag opened 2 weeks ago

jonashaag commented 2 weeks ago
@actor()
def good():
    print("good start")
    time.sleep(1)
    print("good end")
    return "good"

If you abort the task between good start and good end, it is aborted but re-scheduled (retried?). Then, it is skipped on retry, so the result will be None. Is this expected behavior?

[Thread-3] [dramatiq.worker.ConsumerThread(default)] [DEBUG] Pushing message 'de88ce39-124b-4c92-92fa-e605deb64749' onto work queue.
[Thread-5] [dramatiq.worker.WorkerThread] [DEBUG] Received message good() with id 'de88ce39-124b-4c92-92fa-e605deb64749'.
[Thread-5] [services.task_group.good] [DEBUG] Received args=() kwargs={}.

good start

[Thread-2 (_watcher)] [dramatiq_abort.middleware.Abortable] [INFO] Aborting task. Raising exception in worker thread <_WorkerThread(Thread-5, started daemon 6176419840)>.
[Thread-5] [services.task_group.good] [DEBUG] Completed after 1003.33ms.
[Thread-5] [dramatiq.worker.WorkerThread] [ERROR] Failed to process message good() with unhandled exception.
Traceback (most recent call last):
  File ".../dramatiq/worker.py", line 487, in process_message
    res = actor(*message.args, **message.kwargs)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File ".../dramatiq/actor.py", line 185, in __call__
    return self.fn(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "...", line 113, in good
    time.sleep(1)
dramatiq_abort.abort_manager.Abort
[Thread-5] [dramatiq.middleware.retries.Retries] [INFO] Retrying message 'de88ce39-124b-4c92-92fa-e605deb64749' in 9678 milliseconds.
[Thread-5] [dramatiq.broker.RedisBroker] [DEBUG] Enqueueing message 'de88ce39-124b-4c92-92fa-e605deb64749' on queue 'default.DQ'.
[Thread-5] [dramatiq.worker.ConsumerThread(default)] [DEBUG] Acknowledging message 'de88ce39-124b-4c92-92fa-e605deb64749'.
[Thread-4] [dramatiq.worker.ConsumerThread(default.DQ)] [DEBUG] Pushing message 'de88ce39-124b-4c92-92fa-e605deb64749' onto delay queue.
[Thread-4] [dramatiq.broker.RedisBroker] [DEBUG] Enqueueing message 'de88ce39-124b-4c92-92fa-e605deb64749' on queue 'default'.
[Thread-4] [dramatiq.worker.ConsumerThread(default.DQ)] [DEBUG] Acknowledging message 'de88ce39-124b-4c92-92fa-e605deb64749'.
[Thread-3] [dramatiq.worker.ConsumerThread(default)] [DEBUG] Pushing message 'de88ce39-124b-4c92-92fa-e605deb64749' onto work queue.
[Thread-5] [dramatiq.worker.WorkerThread] [DEBUG] Received message good() with id 'de88ce39-124b-4c92-92fa-e605deb64749'.
[Thread-5] [dramatiq.worker.WorkerThread] [WARNING] Message good() was skipped.
[Thread-5] [dramatiq.worker.ConsumerThread(default)] [DEBUG] Acknowledging message 'de88ce39-124b-4c92-92fa-e605deb64749'.