python-arq / arq

Fast job queuing and RPC in python with asyncio and redis.
https://arq-docs.helpmanual.io/
MIT License
2.15k stars 174 forks source link

Cannot enqueue job with a _job_id again, despite execution of the job being fully completed? #432

Open epicwhale opened 8 months ago

epicwhale commented 8 months ago

The docs for _job_id state: "It guarantees that a job with a particular ID cannot be enqueued again until its execution has finished."

But I am get a None response from enqueue_job() for the same job_id even after the execution of the job is complete? (the output of job execution is visible both in worker output and in a result key in redis. If I delete the redis key, I am able to enqueue_job again....

Is this working as intended?

To reproduce:

demo.py

import asyncio
from arq import create_pool
from arq.connections import RedisSettings

async def my_task(ctx):
    return "HELLO-2"

async def main():
    redis = await create_pool(RedisSettings())
    result = await redis.enqueue_job('my_task', _job_id='foobar')
    print(result)

class WorkerSettings:
    functions = [my_task]

if __name__ == '__main__':
    asyncio.run(main())

Console:

(.conda) (base) ➜  arq_try python demo.py
<arq job foobar>
(.conda) (base) ➜  arq_try python demo.py
None                                         <--- despite job being completed
(.conda) (base) ➜  arq_try python demo.py
None                                         <--- despite job being completed

Worker output:

(.conda) (base) ➜  arq_try arq demo.WorkerSettings
16:12:32: Starting worker for 1 functions: my_task
16:12:32: redis_version=7.2.4 mem_usage=1.56M clients_connected=3 db_keys=16
16:12:37:   0.31s → foobar:my_task()
16:12:37:   0.00s ← foobar:my_task ● 'HELLO-2' <--- job complete 
joshwilson-dbx commented 8 months ago

Seems like this is working as intended: arq/connections.py#L148-L150

I remember getting caught by this, too. I was able to work around it by having it not store results at all, but I don't know if that's an option in your case. Seems like you'll need to clear the previous run's results before enqueueing again.

epicwhale commented 8 months ago

@joshwilson-dbx I ended up with the same workaround, configuring the worker job to not save any results at all.

I guess the issue here then is that the docs need to be rephrased from "It guarantees that a job with a particular ID cannot be enqueued again until its execution has finished." to probably something like ... "It guarantees that a job with a particular ID cannot be enqueued again while it is still running or until the result of the job is cleared"

Soures888 commented 8 months ago

I encountered the same issue in #430. So yes, it can work without saving the results

epicwhale commented 8 months ago

Looks like there's a pull request on this doc update, that is waiting to be published. Could someone help publish it?

https://github.com/samuelcolvin/arq/commit/e0cd916988ebed6d01c26a4d3e9128aa2bf22a7d

epicwhale commented 7 months ago

I encountered a bug with trying to use keep_result=0 in this scenario.

If the job has max_tries=1 set, and it's retried - the results end up getting saved in redis. This means, the job won't get queued again, as long as that result is not cleared :-/