procrastinate-org / procrastinate

PostgreSQL-based Task Queue for Python
https://procrastinate.readthedocs.io/
MIT License
800 stars 52 forks source link

`multiprocessing` timeout terminating Procrastinate worker #1062

Closed scttnlsn closed 2 months ago

scttnlsn commented 2 months ago

I have a use case that involves a multiprocessing.Pool inside of a single Procrastinate task. When a multiprocessing timeout happens I believe a signal is sent to the child process to terminate it and I suspect this is interfering with Procrastinate's own signal handling.

Here's a minimal reproduction:

import multiprocessing
import time

@app.task(name="test_multiprocessing")
def test_multiprocessing():
    with multiprocessing.Pool(maxtasksperchild=1) as pool:
        future = pool.apply_async(time.sleep, (10,))
        result = future.get(timeout=3)
        print("done")

In the Procrastinate worker logs I see:

{"created": 1715182331.4175937, "asctime": "2024-05-08 15:32:11,417", "levelname": "INFO", "name": "procrastinate.worker", "message": "Starting worker on all queues", "action": "start_worker", "worker": {"name": "worker", "id": null, "queues": null}, "queues": null}
{"created": 1715182335.6220078, "asctime": "2024-05-08 15:32:15,622", "levelname": "INFO", "name": "procrastinate.worker", "message": "Starting job test_multiprocessing[299]()", "action": "start_job", "worker": {"name": "worker", "id": 0, "queues": null}, "job": {"id": 299, "status": "doing", "queue": "default", "lock": null, "queueing_lock": null, "task_name": "test_multiprocessing", "task_kwargs": {}, "scheduled_at": null, "attempts": 0, "call_string": "test_multiprocessing[299]()"}, "start_timestamp": 1715182335.621973, "duration": 1.4781951904296875e-05}
{"created": 1715182338.7085516, "asctime": "2024-05-08 15:32:18,708", "levelname": "INFO", "name": "procrastinate.worker", "message": "Stop requested", "action": "stopping_worker", "worker": {"name": "worker", "id": null, "queues": null}, "start_timestamp": 1715182335.621973, "duration": 3.086524486541748}
{"created": 1715182338.7090163, "asctime": "2024-05-08 15:32:18,709", "levelname": "INFO", "name": "procrastinate.worker", "message": "Waiting for job to finish: worker 0: test_multiprocessing[299]() (started 3.087 s ago)", "action": "ending_job", "worker": {"name": "worker", "id": 0, "queues": null}, "job": {"id": 299, "status": "doing", "queue": "default", "lock": null, "queueing_lock": null, "task_name": "test_multiprocessing", "task_kwargs": {}, "scheduled_at": null, "attempts": 0, "call_string": "test_multiprocessing[299]()"}, "start_timestamp": 1715182335.621973, "duration": 3.087028741836548}
{"created": 1715182345.7092435, "asctime": "2024-05-08 15:32:25,709", "levelname": "ERROR", "name": "procrastinate.worker", "message": "Job test_multiprocessing[299]() ended with status: Error, lasted 10.087 s", "exc_info": "[FULL TRACEBACK OMITTED] multiprocessing.context.TimeoutError", "action": "job_error", "worker": {"name": "worker", "id": 0, "queues": null}, "job": {"id": 299, "status": "doing", "queue": "default", "lock": null, "queueing_lock": null, "task_name": "test_multiprocessing", "task_kwargs": {}, "scheduled_at": null, "attempts": 0, "call_string": "test_multiprocessing[299]()"}, "start_timestamp": 1715182335.621973, "duration": 10.087138175964355, "end_timestamp": 1715182345.7091112, "result": null}
{"created": 1715182345.7346044, "asctime": "2024-05-08 15:32:25,734", "levelname": "INFO", "name": "procrastinate.worker", "message": "Stopped worker on all queues", "action": "stop_worker", "worker": {"name": "worker", "id": null, "queues": null}, "start_timestamp": 1715182335.621973, "duration": 10.087138175964355, "end_timestamp": 1715182345.7091112, "result": null, "queues": null}

And then the worker process terminates.

Is there an obvious mistake I'm making? I know this is an unusual use case but I have a bunch of parallel computation to do and it's been easier to handle all that inside a single Procrastinate task so far (spreading it across multiple Procrastinate tasks and then collecting the results would require a bit of architecture changes - which I may do in the future).

I haven't dug into the nitty gritty of the signals yet but wanted to open this issue early in case you had any ideas about what specifically might be going on. If this turns out to be an actual bug I would be happy to attempt working on a fix with your guidance.

scttnlsn commented 2 months ago

FWIW I can change this to multiprocessing.get_context("spawn").Pool() and the Procrastinate worker does not die on a multiprocessing timeout. I pay a performance penalty though since now I need to copy a bunch of data into each child process.

scttnlsn commented 2 months ago

Another workaround is to call the following from the multiprocessing child task:

import multiprocessing
import time
import signal

def sleep(sec):
    signal.set_wakeup_fd(-1)
    signal.signal(signal.SIGTERM, signal.SIG_DFL)
    signal.signal(signal.SIGINT, signal.SIG_DFL)
    time.sleep(sec)

@app.task(name="test_multiprocessing")
def test_multiprocessing():
    with multiprocessing.Pool(maxtasksperchild=1) as pool:
        future = pool.apply_async(sleep, (10,))
        result = future.get(timeout=3)
        print("done")

That seems to decouple the signal handling between the parent (Procrastinate worker) and the children (processes in the multiprocessing.Pool). I don't completely understand the ramifications of this yet though so not sure if it's a proper solution.

EDIT: Nevermind - this is still terminating the parent process.

Anyway, I don't think this is anything related to Procrastinate so I'm going to close the issue.