python-arq / arq

Fast job queuing and RPC in python with asyncio and redis.
https://arq-docs.helpmanual.io/
MIT License
2.17k stars 174 forks source link

How to retry a timed out job? #401

Open ross-nordstrom opened 1 year ago

ross-nordstrom commented 1 year ago

Context

I'm trying to add some fail-safes around a resource-intensive job with a lot of external dependencies, so it can sometimes hang or OOM. It usually works on the next retry.

Issue

I'd like to set a job-specific timeout and have it retry after a TimeoutError, but I can't figure out how to do that. The TimeoutError seems to be terminal and I can't get it to retry... any advice on how to make this work?

See related issue: #402

Reproduction

  1. run the worker python script.py worker
  2. queue jobs by running python script.py client
  3. Watch the job run, timeout, and NOT retry in the worker
import asyncio
import random

import arq.worker
from arq import create_pool
from arq.connections import RedisSettings
from arq.typing import WorkerSettingsBase
from future.moves import sys

from settings import redis_settings

async def __flaky_job(_ctx):
    latency = random.uniform(1.0, 60.0)
    print(f"Starting with that will take {latency:.2f}s to run (and is allowed to run up to 3s)...")
    await asyncio.sleep(latency)
    print("Done!")

flaky_job_with_timeout = arq.worker.func(__flaky_job, name='flaky_job', timeout=3)

class WorkerSettings(WorkerSettingsBase):
    redis_settings = redis_settings
    functions = [flaky_job_with_timeout]

def worker():
    print('Starting worker')
    arq.run_worker(WorkerSettings)

async def client():
    print('Running client')
    redis = await create_pool(RedisSettings())
    await redis.enqueue_job('flaky_job')
    print('Enqueued job')

if __name__ == '__main__':
    match sys.argv[1]:
        case 'client':
            asyncio.run(client())
        case 'worker':
            worker()
JonasKs commented 1 year ago

There's many settings to the worker class, and you can specify max retry on specific jobs. Please see the documentation. I also suggest reading this part: https://arq-docs.helpmanual.io/#retrying-jobs-and-cancellation

ross-nordstrom commented 1 year ago

Right, I've been using Retry and job aborting successfully, but am struggling with timeouts-vs-retries.

Is it a correct expectation that a timed-out job (job runs longer than timeout) will be automatically retried? If not, is there a way to do this?

I tried catching the error, but it doesn't propagate up to where I invoke run_worker:

try:
    arq.run_worker(WorkerSettings)
except Exception as e:
    # Never hit on TimeoutError
    logging.exception('worker error')

By the way, this is what the worker logs when you run my reproduction steps:

Starting worker
Starting with that will take 30.57s to run (and is allowed to run up to 3s)...
  3.00s ! 0de71325829747abbbd608ec97fc1f4c:flaky_job failed, TimeoutError: