python-arq / arq

Fast job queuing and RPC in python with asyncio and redis.
https://arq-docs.helpmanual.io/
MIT License
2.19k stars 175 forks source link

Synchronous Jobs are not getting cancelled after timeout has been reached #395

Open larsclaussen opened 1 year ago

larsclaussen commented 1 year ago

I'm not sure this can be fixed - maybe a warning in the docs is the least there can be done. As far as I understand there is no way to cancel a concurrent.futures.Future reliably. Not sure if arq just wraps the future returned by loop.run_in_executor with an asyncio.wait_for call? The underlying function, however, will not be stopped.

To reproduce

def _some_other():
    print("some other func")
    time.sleep(2)
    print("some other func DONE")

def _timeout_check():

    print("timeout check")
    time.sleep(1)
    _some_other()
    print("timeout check DONE")

async def timeout_check(ctx):
    tc_sync = functools.partial(_timeout_check)
    loop = asyncio.get_running_loop()
    loop.set_debug(True)
    return await loop.run_in_executor(ctx["pool"], tc_sync)

async def startup(ctx):
    ctx["pool"] = futures.ProcessPoolExecutor()

class WorkerSettings:
    job_function = func(timeout_check, name="timeout_check", timeout=3)  
    functions = [job_function]
    redis_settings = RedisSettings(
        host=settings.GPS_REDIS_HOST,
        port=settings.GPS_REDIS_PORT,
        database=settings.GPS_REDIS_DB,
    )
    on_startup = startup

Will give you this output

2023-05-05 07:23:36.291 | DEBUG    | report_worker.main:timeout_check:50 - async wrapper called
timeout check
some other func
07:23:39:   3.00s ! 8a354ca8e6524328bf0f1b16857e1f3b:timeout_check failed, TimeoutError: 
Traceback (most recent call last):
  File "/app/report_worker/main.py", line 54, in timeout_check
    return await loop.run_in_executor(ctx["pool"], tc_sync)
asyncio.exceptions.CancelledError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.10/asyncio/tasks.py", line 456, in wait_for
    return fut.result()
asyncio.exceptions.CancelledError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/basicuser/.local/lib/python3.10/site-packages/arq/worker.py", line 574, in run_job
    result = await asyncio.wait_for(task, timeout_s)
  File "/usr/local/lib/python3.10/asyncio/tasks.py", line 458, in wait_for
    raise exceptions.TimeoutError() from exc
asyncio.exceptions.TimeoutError
some other func DONE
timeout check DONE
rra commented 5 months ago

I just ran into this as well. A warning in the documentation would be much appreciated. Even more appreciated would be some way to cancel sync jobs, but I think this would require running them in a separate process with everything that entails, and not even ProcessPoolExecutor seems to support killing the worker processes. It looks like a hard problem.