PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
15.33k stars 1.5k forks source link

Task hangs due to multiprocessing.Pool.join() #13584

Open rob-safi opened 1 month ago

rob-safi commented 1 month ago

First check

Bug summary

I have seen several people have similar issues but non of their solutions are adapted to me. When runnnig the snippet below the code hangs. A modified version of this code runs in the v7labs darwin api and is needed for us.

Any ideas on when this type of hanging will be solved? (or maybe it already is and I didn't find it :) )

Reproduction

from prefect import flow, task
import time

import multiprocessing as mp

def _f(i):
    print(f"a-{i:02d}")
    time.sleep(0.1)

@task(name="small_task", log_prints=True)
def small_task(worker_count: int = 4, count: int = 10):
    responses = []
    with mp.Pool(worker_count) as pool:
        for f in range(count):
            responses.append(pool.apply_async(_f, args=(f,)))
        print(f"start: pool.close()")
        pool.close()
        print(f"end: pool.close()")
        print(f"start: pool.join()")
        pool.join()
        print(f"end: pool.join()")
    return responses

@flow(name="test")
def flow_test(worker_count: int = 4, count: int = 10):
    return small_task(worker_count=worker_count, count=count)

if __name__ == "__main__":
    flow_test(worker_count=4, count=10)

### Error

```python3
python prefect_test_small.py 
13:21:45.307 | INFO    | prefect.engine - Created flow run 'eggplant-heron' for flow 'test'
13:21:45.309 | INFO    | Flow run 'eggplant-heron' - View at https://app.prefect.cloud/account/1f336383-7b06-49d3-972f-4da33f5b093b/workspace/ab9810d2-863b-49cd-9759-0b08e3a764e5/flow-runs/flow-run/944038e5-1c47-4991-b73a-b563d46c6ca5
13:21:46.024 | INFO    | Flow run 'eggplant-heron' - Created task run 'small_task-0' for task 'small_task'
13:21:46.024 | INFO    | Flow run 'eggplant-heron' - Executing 'small_task-0' immediately...
13:21:46.493 | INFO    | Task run 'small_task-0' - start: pool.close()
13:21:46.495 | INFO    | Task run 'small_task-0' - end: pool.close()
13:21:46.496 | INFO    | Task run 'small_task-0' - start: pool.join()
13:21:46.495 | INFO    | Task run 'small_task-0' - a-02
13:21:46.495 | INFO    | Task run 'small_task-0' - a-03
13:21:46.495 | INFO    | Task run 'small_task-0' - a-01
13:21:46.495 | INFO    | Task run 'small_task-0' - a-00
13:21:46.597 | INFO    | Task run 'small_task-0' - a-05
13:21:46.597 | INFO    | Task run 'small_task-0' - a-06
13:21:46.597 | INFO    | Task run 'small_task-0' - a-04
13:21:46.597 | INFO    | Task run 'small_task-0' - a-07
13:21:46.698 | INFO    | Task run 'small_task-0' - a-08
13:21:46.698 | INFO    | Task run 'small_task-0' - a-09
^C13:22:05.215 | ERROR   | Task run 'small_task-0' - Crash detected! Execution was cancelled by the runtime environment.
13:22:05.382 | ERROR   | Flow run 'eggplant-heron' - Crash detected! Execution was aborted by an interrupt signal.
Traceback (most recent call last):
  File "/home/robin/safi/remote-inspection/prefect_test_small.py", line 29, in <module>
    flow_test(worker_count=4, count=10)
  File "/home/robin/safi/remote-inspection/.venv/lib/python3.10/site-packages/prefect/flows.py", line 1229, in __call__
    return enter_flow_run_engine_from_flow_call(
  File "/home/robin/safi/remote-inspection/.venv/lib/python3.10/site-packages/prefect/engine.py", line 293, in enter_flow_run_engine_from_flow_call
    retval = from_sync.wait_for_call_in_loop_thread(
  File "/home/robin/safi/remote-inspection/.venv/lib/python3.10/site-packages/prefect/_internal/concurrency/api.py", line 218, in wait_for_call_in_loop_thread
    return call.result()
  File "/home/robin/safi/remote-inspection/.venv/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 318, in result
    return self.future.result(timeout=timeout)
  File "/home/robin/safi/remote-inspection/.venv/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 179, in result
    return self.__get_result()
  File "/usr/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
  File "/home/robin/safi/remote-inspection/.venv/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 389, in _run_async
    result = await coro
  File "/home/robin/safi/remote-inspection/.venv/lib/python3.10/site-packages/prefect/client/utilities.py", line 100, in with_injected_client
    return await fn(*args, **kwargs)
  File "/home/robin/safi/remote-inspection/.venv/lib/python3.10/site-packages/prefect/engine.py", line 385, in create_then_begin_flow_run
    state = await begin_flow_run(
  File "/home/robin/safi/remote-inspection/.venv/lib/python3.10/site-packages/prefect/engine.py", line 551, in begin_flow_run
    terminal_or_paused_state = await orchestrate_flow_run(
  File "/home/robin/safi/remote-inspection/.venv/lib/python3.10/site-packages/prefect/engine.py", line 877, in orchestrate_flow_run
    result = await flow_call.aresult()
  File "/home/robin/safi/remote-inspection/.venv/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 327, in aresult
    return await asyncio.wrap_future(self.future)
  File "/home/robin/safi/remote-inspection/.venv/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 352, in _run_sync
    result = self.fn(*self.args, **self.kwargs)
  File "/home/robin/safi/remote-inspection/prefect_test_small.py", line 26, in flow_test
    return small_task(worker_count=worker_count, count=count)
  File "/home/robin/safi/remote-inspection/.venv/lib/python3.10/site-packages/prefect/tasks.py", line 689, in __call__
    return enter_task_run_engine(
  File "/home/robin/safi/remote-inspection/.venv/lib/python3.10/site-packages/prefect/engine.py", line 1421, in enter_task_run_engine
    return from_sync.wait_for_call_in_loop_thread(begin_run)
  File "/home/robin/safi/remote-inspection/.venv/lib/python3.10/site-packages/prefect/_internal/concurrency/api.py", line 217, in wait_for_call_in_loop_thread
    waiter.wait()
  File "/home/robin/safi/remote-inspection/.venv/lib/python3.10/site-packages/prefect/_internal/concurrency/waiters.py", line 173, in wait
    self._handle_waiting_callbacks()
  File "/home/robin/safi/remote-inspection/.venv/lib/python3.10/site-packages/prefect/_internal/concurrency/waiters.py", line 140, in _handle_waiting_callbacks
    callback: Call = self._queue.get()
  File "/usr/lib/python3.10/queue.py", line 171, in get
    self.not_empty.wait()
  File "/usr/lib/python3.10/threading.py", line 320, in wait
    waiter.acquire()
KeyboardInterrupt

### Versions

```Text
prefect version
Version:             2.19.2
API version:         0.8.4
Python version:      3.10.14
Git commit:          e42a3971
Built:               Thu, May 23, 2024 2:45 PM
OS/Arch:             linux/x86_64
Profile:             default
Server type:         cloud


### Additional context

Error comes from using the `Darwin`/`v7` library (specifically when pulling datasets remotly) so we cannot change the source code...

[Error line](https://github.com/v7labs/darwin-py/blob/f8296729e145d6c2b43f42d685af4f4d6f168f61/darwin/dataset/utils.py#L282)
rob-safi commented 1 month ago

Hi, any update on this, it's definetly a blocker for us using Prefect...

jakekaplan commented 2 weeks ago

Hi @rob-safi thanks for filing this, apologies for the delayed response! Unfortunately I am not able to reproduce this issue with your example. I ran it a number of times and wasn't able to reproduce the hanging behavior:

(demo-flows310) ➜  demo-flows git:(main) ✗ python my_flow.py
09:51:42.659 | INFO    | prefect.engine - Created flow run 'platinum-curassow' for flow 'test'
09:51:42.662 | INFO    | Flow run 'platinum-curassow' - View at https://app.prefect.cloud/account/3cf6b38f-5244-474a-9554-302144506e43/workspace/ce8b1412-01b7-4700-a508-8dbd1f43f623/flow-runs/flow-run/d922ab9b-29d5-42f8-bd0e-e84f2936fc80
09:51:43.028 | INFO    | Flow run 'platinum-curassow' - Created task run 'small_task-0' for task 'small_task'
09:51:43.030 | INFO    | Flow run 'platinum-curassow' - Executing 'small_task-0' immediately...
09:51:43.288 | INFO    | Task run 'small_task-0' - start: pool.close()
09:51:43.289 | INFO    | Task run 'small_task-0' - end: pool.close()
09:51:43.290 | INFO    | Task run 'small_task-0' - start: pool.join()
a-00
a-01
a-02
a-03
a-04
a-05
a-06
a-07
a-08
a-09
09:51:45.044 | INFO    | Task run 'small_task-0' - end: pool.join()
09:51:45.170 | INFO    | Task run 'small_task-0' - Finished in state Completed()
09:51:45.298 | INFO    | Flow run 'platinum-curassow' - Finished in state Completed()

Does this crash every time for you? Theres unfortunately not much we can do here without a working MRE.