python-arq / arq

Fast job queuing and RPC in python with asyncio and redis.
https://arq-docs.helpmanual.io/
MIT License
2.15k stars 174 forks source link

Scheduling repeated unique jobs #457

Open davidhuser opened 5 months ago

davidhuser commented 5 months ago

Thanks for the great library. Is it possible to schedule repeated unique jobs? My goal is to enqueue a job, ensure only one instance runs at a time, and re-enqueue it immediately after it finishes, possibly with a configurable interval.

I noticed that RQ offers rq-scheduler for this purpose.

Self-enqueuing, as mentioned in https://github.com/samuelcolvin/arq/issues/432 might not be ideal.

Currently, I'm using a cron job with the second parameter set to every 10 seconds. However, the job duration varies:

async def repeated_async_job(ctx):
    async with get_session(ctx) as db:
         do_db_stuff(db)
    return 'success!'

class WorkerSettings:
    cron_jobs: list[CronJob] = [
        cron(
            repeated_async_job,
            second=set(range(0, 59, 10)),
            unique=True,
            run_at_startup=False,
            timeout=60,
            max_tries=1,
            keep_result=0
        )
    ]
    redis_settings = RedisSettings()

Would it be better to handle scheduling with a different tool, like APScheduler's AsyncIOScheduler? If so, how would the scheduler know when the job is finished, and would it matter if it runs with multiple workers (e.g. Gunicorn)?

epicwhale commented 5 months ago

I loosely recollect having to do this in the past... wouldn't defining a job_id for the cron(..) enforce uniqueness at run time? and to re-enqueue it immediately after it finishes, I believe the repeated_async_job(..) wrapper could enqueue it with the same job_id before it returns?

davidhuser commented 5 months ago

I'd prefer not to mix cron with func although the cron are converted to func. Below example is func-only.

Regarding your job_id suggestion: The first job can/should have a job_id, but all subsequent jobs cannot have the same job_id because it cannot re-enqueue a job within the job because it is not yet finished.

I tried to workaround this by querying the amount of queued jobs of this task, if it's exactly 1 (the current job) then re-enqueue the next:


import asyncio

from arq import create_pool
from arq.connections import RedisSettings

async def repeated_async_job(ctx):
    # do stuff
    asyncio.sleep(3)

    # re-enqueue if there is exactly one job running (this job)
    redis = ctx['redis']
    queued_jobs = await redis.queued_jobs()
    queued_jobs_len = len([job for job in queued_jobs if job.function == 'repeated_async_job'])
    if queued_jobs_len == 0:
        print("ERROR: should not happen")
    elif queued_jobs == 1:
        # the current job so we can enqueue the next, but without a job_id
        await redis.enqueue_job('repeated_async_job', _job_try=1)
    else:
        print("ERROR: too many jobs")

async def main():
    redis = await create_pool(RedisSettings())
    # startup job with unique ID
    await redis.enqueue_job('repeated_async_job', _job_id='app.main.startup', _job_try=1)

class WorkerSettings:
    functions = [repeated_async_job]

if __name__ == '__main__':
    asyncio.run(main())

it seems to work, but not sure if there is a better/native way to do this.

I was also wondering what _job_try really does, in the docs it says:

enqueue_job _job_try – useful when re-enqueueing jobs within a job

but how is it useful?

epicwhale commented 5 months ago

Ah, now I remember that's where I got stuck, how do you re-enqueue a job_id when one is already running with the same id? (or if its result is saved, but not explicitly retrieved/deleted).

Regarding _job_try: My understanding is that _job_try is to explicitly set which 'retry attempt' number do you want to treat the enqueued job as, which should be accessible in the ctx['job_try'] in the job, and used by arq wherever job_try is referenced: https://github.com/search?q=repo%3Asamuelcolvin%2Farq+job_try&type=code - I do not think it helps in anyway with the _job_id situation here, to the best of my knowledge.

If you find a better solution, keen to learn too!

davidhuser commented 5 months ago

since a job can have more states than queued I'm using this check now before enqueuing:

import asyncio

from arq import create_pool
from arq.connections import RedisSettings

async def repeated_async_job(ctx):
    # do stuff
    asyncio.sleep(3)

    # Check if any job with the same function is deferred, queued, or in progress
    pool = ctx['redis']
    all_jobs = await pool.all_job_results()
    in_progress_jobs = [
        job for job in all_jobs
        if job.status in {JobStatus.deferred, JobStatus.queued, JobStatus.in_progress}
        and job.function == 'repeated_async_job'
    ]

    if in_progress_jobs:
        return 'done'

    await pool.enqueue_job('repeated_async_job')
    return 'done'

async def main():
    redis = await create_pool(RedisSettings())
    await redis.enqueue_job('repeated_async_job')

class WorkerSettings:
    functions = [repeated_async_job]

if __name__ == '__main__':
    asyncio.run(main())

it does not account for params (i.e. same job but different parameters), but for this job I don't need it.

epicwhale commented 5 months ago

@davidhuser are you facing the issue I've filed here by any chance, or know how to solve it? https://github.com/samuelcolvin/arq/issues/459 where there's an in-progress key created for 60 seconds, even for a cron which I want to run every 5 or 10 seconds?