python-arq / arq

Fast job queuing and RPC in python with asyncio and redis.
https://arq-docs.helpmanual.io/
MIT License
2.1k stars 173 forks source link

Grab Job in Queue and Cancel it #449

Closed waseemhnyc closed 4 months ago

waseemhnyc commented 4 months ago

So I see how you can cancel/abort a job like this: https://arq-docs.helpmanual.io/#retrying-jobs-and-cancellation

import asyncio
from arq import create_pool
from arq.connections import RedisSettings

async def do_stuff(ctx):
    print('doing stuff...')
    await asyncio.sleep(10)
    return 'stuff done'

async def main():
    redis = await create_pool(RedisSettings())
    job = await redis.enqueue_job('do_stuff')
    await asyncio.sleep(1)
    await job.abort()

class WorkerSettings:
    functions = [do_stuff]
    allow_abort_jobs = True

if __name__ == '__main__':
    asyncio.run(main())

In this example you are saving the job after adding it to the queue (returns back a Job object) and then calling .abort().

How could someone grab items in a queue and cancel/abort them. I guess my question is how to grab the Job object. I see there is arq.jobs.Job and there is a redis param but not sure if theres an easier way to set this.

class Job:
    """
    Holds data a reference to a job.
    """

    __slots__ = 'job_id', '_redis', '_queue_name', '_deserializer'

    def __init__(
        self,
        job_id: str,
        redis: 'Redis[bytes]',
        ....

Also I'm trying to get use to this typed Python world - what would 'Redis[bytes]', even look like?

Appreciate any help in advance.

Use Case

If a user wants to cancel a long task and re-trigger another. Or maybe I want to cancel a old task because the user triggerd an update.

waseemhnyc commented 4 months ago

Never mind its just the Redis pool. I see it here: https://arq-docs.helpmanual.io/#job-uniqueness

job5 = Job(job_id='my_job', redis=redis)