Open gerazenobi opened 1 year ago
I'll have to look into this further tomorrow, as I have never had the need to cancel jobs myself.
Running job.abort() with or without timemout seems to have no effect in neither queued or in_progress jobs.
This seems like a bug.
Hey, apologize for the lack of response. I haven't forgot, just not found the time. I've got off on Friday, and I'll use that day for open source/personal projects and get back to you then.
@JonasKs no problem! and thanks ā¤ļø
Hi again @gerazenobi š First, in case someone else reads this issue at a later point:
The abort()
function requires the allow_abort_jobs
flag has been set on the worker:
:param
allow_abort_jobs
: whether to abort jobs on a call to :func:arq.jobs.Job.abort
The docs also state this here.
I can see you've done this! š
The next thing I did was to check out your implementation. Knowing that this library has some quirkiness to queue-names, I tried removing queue_name
from your implementation, and abort
seems to work fine. I started working on a queue-name fix here, which I just commited (a very old branch though, it don't work, and I don't remember why š). The issue is somewhat explained in #346.
So, short workaround: Use default queue.
Best workaround: Attempt to fix queue names. I won't have time to do this atm, I'm flooded with work at work - hence I took time off to answer my 50 GitHub notifications š PR very welcome.
Hi @JonasKs š
Thanks so much for having taken a look at this and providing a workaround š I will test on my side and get back to you.
@JonasKs I tried again with the code provided previously and it didn't work for me even though I removed queue name and also custom ids in order to have the bare minimuim:
#the job
async def compute(ctx):
await asyncio.sleep(30)
return ctx["job_id"]
#enqueuing jobs
async def main():
redis = await create_pool(RedisSettings())
for job_index in range(1, 50):
job = await redis.enqueue_job("compute")
if job:
print(job.job_id)
class WorkerSettings:
functions = [compute]
keep_result = 3600 * 24
allow_abort_jobs = True
how I am aborting:
JOB_ID = "d85e5e5b62074cf59aea481783e8200a"
async def main():
redis = await create_pool(RedisSettings())
job = Job(JOB_ID, redis)
job_info = await job.info()
job_status = await job.status()
print(f"job info: {job_info}")
print(f"status: {job_status}")
print(f"result_info: {job_info}")
try:
await job.abort(timeout=0)
except Exception as e:
print("job abort raised exception", e)
if __name__ == "__main__":
asyncio.run(main())
I enqueued the jobs, and run the abort script above multiple times: it prints status: queued
always.
Using your example from the first post results in this output:
queued jobs 10
job info: JobDef(function='compute', args=(), kwargs={}, job_try=None, enqueue_time=datetime.datetime(2023, 5, 25, 7, 14, 55, 350000, tzinfo=datetime.timezone.utc), score=1684998895350)
status: JobStatus.queued
result_info: None
exception raised when aborting job
exception when aborting: <class 'TimeoutError'>
Removing queue names:
import asyncio
from arq import create_pool
from arq.connections import RedisSettings
async def compute(ctx):
print(f'compute called {ctx["job_id"]}')
await asyncio.sleep(3600)
return ctx["job_id"]
async def main():
redis = await create_pool(RedisSettings())
for job_index in range(1, 11):
job = await redis.enqueue_job("compute", _job_id=f"my_id_{job_index}")
if job:
print(job.job_id)
class WorkerSettings:
functions = [compute]
max_jobs = 3
keep_result = 3600 * 24
allow_abort_jobs = True
health_check_interval = 5
if __name__ == "__main__":
asyncio.run(main())
import asyncio
import asyncio
from arq import create_pool
from arq.connections import RedisSettings
from arq.jobs import Job
JOB_ID = "my_id_10"
async def main():
redis = await create_pool(RedisSettings())
jobs = await redis.queued_jobs()
print(f"queued jobs {len(jobs)}")
job = Job(JOB_ID, redis)
print(f"job info: {await job.info()}")
job_status = await job.status()
print(f"status: {job_status}")
print(f"result_info: {await job.result_info()}")
try:
aborted = await job.abort(timeout=1)
print(f"aborted: {aborted}")
except Exception as e:
print("exception raised when aborting job")
print(f"exception when aborting: {type(e)}")
if __name__ == "__main__":
asyncio.run(main())
Results in this output:
queued jobs 0
job info: JobResult(function='compute', args=(), kwargs={}, job_try=1, enqueue_time=datetime.datetime(2023, 5, 25, 7, 14, 55, 350000, tzinfo=datetime.timezone.utc), score=None, success=False, result=CancelledError(), start_time=datetime.datetime(2023, 5, 25, 7, 29, 55, 591000, tzinfo=datetime.timezone.utc), finish_time=datetime.datetime(2023, 5, 25, 7, 29, 55, 594000, tzinfo=datetime.timezone.utc), queue_name='example_queue', job_id=None)
status: JobStatus.complete
result_info: JobResult(function='compute', args=(), kwargs={}, job_try=1, enqueue_time=datetime.datetime(2023, 5, 25, 7, 14, 55, 350000, tzinfo=datetime.timezone.utc), score=None, success=False, result=CancelledError(), start_time=datetime.datetime(2023, 5, 25, 7, 29, 55, 591000, tzinfo=datetime.timezone.utc), finish_time=datetime.datetime(2023, 5, 25, 7, 29, 55, 594000, tzinfo=datetime.timezone.utc), queue_name='example_queue', job_id=None)
aborted: True
Aha, I see, with the new example there's something iffy going on.
With the job:
async def compute(ctx):
print(f'starting job {ctx}')
await asyncio.sleep(30)
print('slept')
return ctx["job_id"]
Then finding the log from the worker:
starting job {'redis': ArqRedis<ConnectionPool<Connection<host=localhost,port=6379,db=0>>>, 'job_id': 'afc5d7518819482eaaedffb0a82372f8', 'job_try': 1, 'enqueue_time': datetime.datetime(2023, 5, 25, 8, 17, 15, 800000, tzinfo=datetime.timezone.utc), 'score': 1685002635800}
Then aborting the job:
job info: JobDef(function='compute', args=(), kwargs={}, job_try=None, enqueue_time=datetime.datetime(2023, 5, 25, 8, 17, 15, 815000, tzinfo=datetime.timezone.utc), score=1685002635815)
status: JobStatus.in_progress
result_info: JobDef(function='compute', args=(), kwargs={}, job_try=None, enqueue_time=datetime.datetime(2023, 5, 25, 8, 17, 15, 815000, tzinfo=datetime.timezone.utc), score=1685002635815)
job abort raised exception
Then waiting for job to complete:
job info: JobResult(function='compute', args=(), kwargs={}, job_try=1, enqueue_time=datetime.datetime(2023, 5, 25, 8, 17, 15, 815000, tzinfo=datetime.timezone.utc), score=None, success=True, result='f859323bff434573850b10b5363d015d', start_time=datetime.datetime(2023, 5, 25, 8, 18, 16, 173000, tzinfo=datetime.timezone.utc), finish_time=datetime.datetime(2023, 5, 25, 8, 18, 46, 177000, tzinfo=datetime.timezone.utc), queue_name='arq:queue', job_id=None)
status: JobStatus.complete
result_info: JobResult(function='compute', args=(), kwargs={}, job_try=1, enqueue_time=datetime.datetime(2023, 5, 25, 8, 17, 15, 815000, tzinfo=datetime.timezone.utc), score=None, success=True, result='f859323bff434573850b10b5363d015d', start_time=datetime.datetime(2023, 5, 25, 8, 18, 16, 173000, tzinfo=datetime.timezone.utc), finish_time=datetime.datetime(2023, 5, 25, 8, 18, 46, 177000, tzinfo=datetime.timezone.utc), queue_name='arq:queue', job_id=None)
I agree, this seems like a bug. I'll try to look into it this weekend
I managed to close issue instead of comment, sorry, reopened
Thanks @JonasKs.
Not related to this issue but thought I would share for the sake of knowledge sharing in case someone else arrives to it: at the moment, to workaround the fact we can't abort and hence use custom IDs, I was using queued_jobs
method and then querying the jobs' kwargs in JobDef
to manage/track which jobs are in the queue:
async def queued_jobs(self, *, queue_name: str = default_queue_name) -> List[JobDef]:
"""
Get information about queued, mostly useful when testing.
"""
jobs = await self.zrange(queue_name, withscores=True, start=0, end=-1)
return await asyncio.gather(*[self._get_job_def(job_id, int(score)) for job_id, score in jobs])
However this method will quickly exhaust redis available connections (doesn't scale) when dealing with concurrent requests/jobs as it launches a concurrent _get_job_def
, for each job in the queue:
async def _get_job_def(self, job_id: bytes, score: int) -> JobDef:
key = job_key_prefix + job_id.decode()
v = await self.get(key) <================ new connection
# ...
I was starting to see redis.exceptions.ConnectionError: max number of clients reached
All that to say: queued_jobs
should be used with care and perhaps only for testing š¤ .
Hi there š
I am starting to use Arq and it looks wonderful, thank you so much for this much needed framework !
I was having the following 2 issues:
job.abort()
is not working.Here are the details:
Context
queued
,in_progress
orcomplete
) I need to overwrite with this fresher onequeued
orin_progress
scenario: I understand I'd need to abort the existing job (as I don't want to wait for its completion) and enqueue the new one with fresher arguments.complete
, overwrite it: enqueue the new job with fresher arguments.The issues I was having then:
A) Running
job.abort()
with or withouttimemout
seems to have no effect in neitherqueued
orin_progress
jobs. FWIW: I have observed 1 thing though: If I abort providing a0
timeout, only when the worker dequeues that particular job, then it will decide to not run it:10:56:34: 131.05s ā my_id_10:compute aborted before start
Question: is there anything I am missing regarding aborting jobs? Any idea why it isn't working for me?
B) If I enqueue a job with the same ID as a
complete
one, it will not enqueue it and instead would returnNone
as per job uniqueness. Question: How do I delete the job then so that I am able to enqueue the fresher one ?Worker settings and job generation
```python import asyncio from arq import create_pool from arq.connections import RedisSettings async def compute(ctx): print(f'compute called {ctx["job_id"]}') await asyncio.sleep(3600) return ctx["job_id"] async def main(): redis = await create_pool(RedisSettings()) for job_index in range(1, 11): job = await redis.enqueue_job("compute", _job_id=f"my_id_{job_index}", _queue_name="example_queue") if job: print(job.job_id) class WorkerSettings: functions = [compute] max_jobs = 3 keep_result = 3600 * 24 queue_name = "example_queue" allow_abort_jobs = True health_check_interval = 5 if __name__ == "__main__": asyncio.run(main()) ```Code trying to abort job
```python import asyncio import asyncio from arq import create_pool from arq.connections import RedisSettings from arq.jobs import Job JOB_ID = "my_id_10" async def main(queue_name): redis = await create_pool(RedisSettings()) jobs = await redis.queued_jobs(queue_name=queue_name) print(f"queued jobs {len(jobs)}") job = Job(JOB_ID, redis, _queue_name=queue_name) print(f"job info: {await job.info()}") job_status = await job.status() print(f"status: {job_status}") print(f"result_info: {await job.result_info()}") try: aborted = await job.abort(timeout=1) print(f"aborted: {aborted}") except Exception as e: print("exception raised when aborting job") print(f"exception when aborting: {type(e)}") if __name__ == "__main__": asyncio.run(main("example_queue")) ```Any help would be much appreciated š