PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
15.96k stars 1.57k forks source link

Workflow hangs when tasks or sub-flows using ProcessPool with start method `fork` #9229

Open tyong920 opened 1 year ago

tyong920 commented 1 year ago

First check

Bug summary

Fork is the default on Linux (it isn’t available on Windows), while Windows and MacOS use spawn by default.

When calling a user defined task or sub-flow from entry point flow, if it invokes a Process Pool, Prefect might hangs forever.

Reproduction

"""
Verion 0: hangs on Linux but works on MacOS.
"""
from concurrent import futures

from prefect import flow, task

def say_hi(name: str):
    print(f"Hi {name}")

# @flow # Same issue here
@task
def func():
    with futures.ProcessPoolExecutor(2) as executor:
        executor.map(say_hi, ("Prefect", "Orion"))

@flow
def this_works():
    func.fn() # Call function directly

@flow
def this_hangs():
    func() # Call task or sub-flow

if __name__ == "__main__":
    this_hangs() # ... ON Linux, e.g., Ubuntu
    # this_works() # ... ON Mac, e.g., MacOS Ventura

"""
Version 1: works on both Linux and MacOS. Set start method to spawn
"""
from concurrent import futures
from multiprocessing import get_context

from prefect import flow, task

def say_hi(name: str):
    print(f"Hi {name}")

# @flow # Same issue here
@task
def func():
    context = get_context("spawn")
    with futures.ProcessPoolExecutor(2, mp_context=context) as executor:
        executor.map(say_hi, ("Prefect", "Orion"))

@flow
def this_works():
    func.fn() # Call function directly

@flow
def this_hangs():
    func() # Call task or sub-flow

if __name__ == "__main__":
    this_hangs() # ... [WORKS Now] ON Linux, e.g., Ubuntu
    # this_works() # ... ON Mac, e.g., MacOS Ventura

Error

12:03:13.761 | INFO    | prefect.engine - Created flow run 'shapeless-yak' for flow 'this-hangs'
12:03:13.821 | INFO    | Flow run 'shapeless-yak' - Created task run 'func-0' for task 'func'
12:03:13.822 | INFO    | Flow run 'shapeless-yak' - Executing 'func-0' immediately...
Hi Prefect
Hi Orion
^C12:03:21.965 | ERROR   | Task run 'func-0' - Crash detected! Execution was cancelled by the runtime environment.
12:03:21.979 | ERROR   | Flow run 'shapeless-yak' - Crash detected! Execution was aborted by an interrupt signal.
12:03:21.992 | ERROR   | prefect._internal.concurrency.services - Service 'APILogWorker' failed.
Traceback (most recent call last):
  File "/private/tmp/prefect-issue/.venv/lib/python3.11/site-packages/prefect/_internal/concurrency/services.py", line 114, in _run
    await self._main_loop()
  File "/private/tmp/prefect-issue/.venv/lib/python3.11/site-packages/prefect/_internal/concurrency/services.py", line 254, in _main_loop
    item = await self._queue.get()
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Cellar/python@3.11/3.11.3/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyncio/queues.py", line 158, in get
    await getter
asyncio.exceptions.CancelledError
Traceback (most recent call last):
  File "/private/tmp/prefect-issue/flow_processpool.py", line 30, in <module>
    this_hangs() # ... ON Linux, e.g., Ubuntu
    ^^^^^^^^^^^^
  File "/private/tmp/prefect-issue/.venv/lib/python3.11/site-packages/prefect/flows.py", line 468, in __call__
    return enter_flow_run_engine_from_flow_call(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/private/tmp/prefect-issue/.venv/lib/python3.11/site-packages/prefect/engine.py", line 184, in enter_flow_run_engine_from_flow_call
    retval = from_sync.wait_for_call_in_loop_thread(
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/private/tmp/prefect-issue/.venv/lib/python3.11/site-packages/prefect/_internal/concurrency/api.py", line 137, in wait_for_call_in_loop_thread
    return call.result()
           ^^^^^^^^^^^^^
  File "/private/tmp/prefect-issue/.venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 173, in result
    return self.future.result(timeout=timeout)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Cellar/python@3.11/3.11.3/Frameworks/Python.framework/Versions/3.11/lib/python3.11/concurrent/futures/_base.py", line 449, in result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Cellar/python@3.11/3.11.3/Frameworks/Python.framework/Versions/3.11/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
    raise self._exception
  File "/private/tmp/prefect-issue/.venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 218, in _run_async
    result = await coro
             ^^^^^^^^^^
  File "/private/tmp/prefect-issue/.venv/lib/python3.11/site-packages/prefect/client/utilities.py", line 40, in with_injected_client
    return await fn(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/private/tmp/prefect-issue/.venv/lib/python3.11/site-packages/prefect/engine.py", line 251, in create_then_begin_flow_run
    state = await begin_flow_run(
            ^^^^^^^^^^^^^^^^^^^^^
  File "/private/tmp/prefect-issue/.venv/lib/python3.11/site-packages/prefect/engine.py", line 388, in begin_flow_run
    terminal_or_paused_state = await orchestrate_flow_run(
                               ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/private/tmp/prefect-issue/.venv/lib/python3.11/site-packages/prefect/engine.py", line 669, in orchestrate_flow_run
    result = await flow_call.aresult()
             ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/private/tmp/prefect-issue/.venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 181, in aresult
    return await asyncio.wrap_future(self.future)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/private/tmp/prefect-issue/.venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 194, in _run_sync
    result = self.fn(*self.args, **self.kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/private/tmp/prefect-issue/flow_processpool.py", line 26, in this_hangs
    func() # Call task or sub-flow
    ^^^^^^
  File "/private/tmp/prefect-issue/.venv/lib/python3.11/site-packages/prefect/tasks.py", line 485, in __call__
    return enter_task_run_engine(
           ^^^^^^^^^^^^^^^^^^^^^^
  File "/private/tmp/prefect-issue/.venv/lib/python3.11/site-packages/prefect/engine.py", line 972, in enter_task_run_engine
    return from_sync.wait_for_call_in_loop_thread(begin_run)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/private/tmp/prefect-issue/.venv/lib/python3.11/site-packages/prefect/_internal/concurrency/api.py", line 136, in wait_for_call_in_loop_thread
    waiter.wait()
  File "/private/tmp/prefect-issue/.venv/lib/python3.11/site-packages/prefect/_internal/concurrency/waiters.py", line 124, in wait
    self._handle_waiting_callbacks(ctx)
  File "/private/tmp/prefect-issue/.venv/lib/python3.11/site-packages/prefect/_internal/concurrency/waiters.py", line 88, in _handle_waiting_callbacks
    callback: Call = self._queue.get()
                     ^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Cellar/python@3.11/3.11.3/Frameworks/Python.framework/Versions/3.11/lib/python3.11/queue.py", line 171, in get
    self.not_empty.wait()
  File "/opt/homebrew/Cellar/python@3.11/3.11.3/Frameworks/Python.framework/Versions/3.11/lib/python3.11/threading.py", line 320, in wait
    waiter.acquire()
KeyboardInterrupt

Versions

Version:             2.10.4
API version:         0.8.4
Python version:      3.10.9
Git commit:          b6d0433a
Built:               Thu, Apr 13, 2023 5:34 PM
OS/Arch:             linux/x86_64
Profile:             default
Server type:         ephemeral
Server:
  Database:          sqlite
  SQLite version:    3.41.2

Additional context

No response

github-actions[bot] commented 1 year ago

This issue is stale because it has been open 30 days with no activity. To keep this issue open remove stale label or comment.

rmorshea commented 1 year ago

I have experienced this as well.

sync_compatible seems to be at fault in my case. I can be reasonable certain of this because forcing the offending sync_compatible decorated function to run async via asyncio.run works around this issue:

from prefect.something import offending_function

def call_offending_function_sync():
    async def wrapper():
        await offending_function()
    asyncio.run(wrapper())

It's rather surprising to me that the sync compatibility decorator doesn't take a similar approach with asyncio.run. Perhaps there should be some heuristic that attempts to do so before submitting a task to the global thread loop.

rmorshea commented 1 year ago

@zanieb here's the shortest example I've been able to come up with:

import multiprocessing as mp
from prefect_aws import S3Bucket

S3Bucket.load("bucket-name")

if __name__ == "__main__":
    proc = mp.get_context("fork").Process(target=lambda: None, daemon=True)
    proc.start()
    proc.join()
rmorshea commented 1 year ago

@zanieb if you comment out this line it does not hang:

https://github.com/PrefectHQ/prefect/blob/c924f42433448c9421cdfd16cd0965113df68081/src/prefect/blocks/core.py#L639

rmorshea commented 1 year ago

My earlier statement about this being related to sync_compatible seem to be wrong. This still hangs:

import asyncio
import multiprocessing as mp
from prefect_aws import S3Bucket

async def func():
    await S3Bucket.load("global-storage")

asyncio.run(func())

if __name__ == "__main__":
    proc = mp.get_context("fork").Process(target=lambda: None, daemon=True)
    proc.start()
    proc.join()
rmorshea commented 1 year ago

Going one layer deeper, removing usage of this lock also prevents the hang:

https://github.com/PrefectHQ/prefect/blob/c924f42433448c9421cdfd16cd0965113df68081/src/prefect/_internal/concurrency/services.py#L100

Seems related to the problem in this article - the lock is likely being copied in an acquired state. The subprocess thus, can never acquire the lock.

rmorshea commented 1 year ago

The solution seem to just not use forked subprocesses. According to the article:

  • Starting in Python 3.12, you will get a DeprecationWarning indicating that “fork” will stop being the default in 3.14.
  • In Python 3.14, the default will be changed to either “spawn” or “forkserver” (a mostly safer alternative to “fork”).