NicolasLM / spinach

Modern Redis task queue for Python 3
https://spinach.readthedocs.io
BSD 2-Clause "Simplified" License
63 stars 4 forks source link

Idempotent tasks are not retried when worker catastrophically dies #5

Closed 0xDEC0DE closed 3 years ago

0xDEC0DE commented 4 years ago

Environment

Python 3.8.3 gunicorn version 20.0.4 spinach 0.0.11

However, it is possible to demonstrate the issue with a "toy" problem:

Setup

docker run --publish 6379:6379 --rm redis:latest

Run

import time
from spinach import Engine, RedisBroker
spin = Engine(RedisBroker())
@spin.task(name='compute', max_retries=3)
def compute(a, b):
    time.sleep(30)
    print(f'Computed {a} + {b} = {a + b}')

spin.schedule('compute', 2, 2)
spin.start_workers()

...then, before the job can return, destroy the Python process, and restart it. After restarting, it will sit idle, and never pick up the enqueued job. Inspecting the Redis queue will show the job effectively orphaned, claiming it's in a RUNNING state, with no TTL, and no obvious way to transition it to the new broker.

What is the recommended means for canceling/re-enqueuing a running job within Spinach when the worker gets a signal?

juledwar commented 4 years ago

The issue title might be a bit misleading. The bug appears to be an orphaned task if the process that's running it unexpectedly and catastrophically dies.

NicolasLM commented 4 years ago

You are right, when an idempotent task (max_retries > 0) vanishes in the middle of its execution it is currently lost. Obviously the expected behavior is that a new broker would realize the task got lost and would reschedule it.

It's a problem I started tackling but never got around to finishing. Currently each instance of Spinach gets a unique ID when started and when a job is taken from the queue it is associated with this unique ID (see _running-jobs-on-broker-{uuid} in Redis). The missing part is implementing a heartbeat that would detect that a broker is dead and would reschedule its running jobs.

NicolasLM commented 3 years ago

I finished implementing this feature. Basically workers send a keepalive to Redis at least every minute. If a worker does not send a keepalive for 30 minutes, another worker will mark it as dead and reschedule its retryable jobs.

30 minutes may be too long in practice, but is configurable and a hook exists if someone wants to try plugging a more robust system to detect a dead worker (something like Consul for instance).

juledwar commented 3 years ago

Oh nice work! I'll test this out and let you know how I get on.

juledwar commented 3 years ago

I think we need to be able to tweak that 30 minute value. Is it easy to make it a task parameter?

NicolasLM commented 3 years ago

A task parameter is not possible because it doesn't depend on the task but it's easy to change it on the Broker:

class BrokerWithShortThreshold(RedisBroker):

    broker_dead_threshold_seconds = 600 

Alternatively:

broker = RedisBroker()
broker.broker_dead_threshold_seconds = 600
0xDEC0DE commented 3 years ago

Confirmed working in a dev environment over here. Good work.