PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
17.52k stars 1.64k forks source link

Multiprocessing in Prefect tasks causes deadlock if workload cannot be pickled #12083

Open j-tr opened 8 months ago

j-tr commented 8 months ago

First check

Bug summary

follow up issue for #10794.

Running workloads in Prefect tasks that use multiprocessing and require objects that cannot be pickled causes deadlock.

According to the python docs, running multiprocessing with the default "fork" start method from a multithreaded process can be problematic and leads to a deadlock (showcased in test configuration "fork_task").

Setting the start method "spawn" as suggested in https://github.com/PrefectHQ/prefect/issues/10794#issuecomment-1832620128 requires that the workload can be pickled. Since MyClass from the MRE holds an open file object, it cannot be pickled and execution results in a cannot pickle '_io.TextIOWrapper' error (showcased in test configuration "spawn_task")

Using Prefect's cloudpickle_wrapped_call can solve this problem in certain situations (e.g. if the file is only open for reading). However, there are still cases where the workload is not even cloudpickle serializable (showcased in test configuration "spawn_task_cloudpickle").

Reproduction

from multiprocessing import Process, set_start_method
from prefect import flow, task
from prefect.utilities.callables import cloudpickle_wrapped_call

# running workload in flow, default start_method
# --> works !
fork_no_task = {
    "run_in_task": False,
    "start_method_type": "fork",
    "cloudpickle_wrapper": False,
}

# running workload in task, default start_method
# --> deadlock!
fork_task = {
    "run_in_task": True,
    "start_method_type": "fork",
    "cloudpickle_wrapper": False,
}

# running workload in task, start_method "spawn"
# --> cannot pickle '_io.TextIOWrapper' object because of open file
spawn_task = {
    "run_in_task": True,
    "start_method_type": "spawn",
    "cloudpickle_wrapper": False,
}

# running workload in task, start_method "spawn", and cloudpickle wrapper
# --> Cannot pickle files that are not opened for reading: w
# would work if file is opened for reading
spawn_task_cloudpickle = {
    "run_in_task": True,
    "start_method_type": "spawn",
    "cloudpickle_wrapper": True,
}

# change this assignment to run different tet config
TEST_CONFIG = spawn_task_cloudpickle

class MyClass:
    def __init__(self):
        self.file_obj = open("test.txt", "w")

    def do_things(self):
        print("Doing things")

    def do_things_in_multiple_processes(self):
        if TEST_CONFIG["cloudpickle_wrapper"]:
            Process(target=cloudpickle_wrapped_call(self.do_things)).start()
        else:
            Process(target=self.do_things).start()

@task
def my_task():
    MyClass().do_things_in_multiple_processes()

@flow()
def my_flow() -> None:
    if TEST_CONFIG["run_in_task"]:
        my_task()
    else:
        MyClass().do_things_in_multiple_processes()

if __name__ == "__main__":
    set_start_method(TEST_CONFIG["start_method_type"])
    my_flow()

Error

No response

Versions

Version:             2.16.0
API version:         0.8.4
Python version:      3.10.12
Git commit:          17f42e9d
Built:               Thu, Feb 22, 2024 3:45 PM
OS/Arch:             linux/x86_64
Profile:             default
Server type:         ephemeral
Server:
  Database:          sqlite
  SQLite version:    3.37.2

Additional context

No response

serinamarie commented 5 months ago

Hi @j-tr, would you be able to share what you're seeing?

I get a flow in a failed state and a pickling error:

❯ python multi_flow.py
09:17:25.674 | INFO    | prefect.engine - Created flow run 'spiritual-marmot' for flow 'my-flow'
09:17:25.675 | INFO    | Flow run 'spiritual-marmot' - View at https://app.prefect.cloud/account/abc/workspace/xyz/flow-runs/flow-run/123
09:17:25.990 | INFO    | Flow run 'spiritual-marmot' - Created task run 'my_task-0' for task 'my_task'
09:17:25.990 | INFO    | Flow run 'spiritual-marmot' - Executing 'my_task-0' immediately...
09:17:26.735 | ERROR   | Task run 'my_task-0' - Encountered exception during execution:
Traceback (most recent call last):
  File "/Users/bean/code-oss/project-demo/hello-projects/flows/new-venv/lib/python3.11/site-packages/prefect/engine.py", line 2107, in orchestrate_task_run
    result = await call.aresult()
             ^^^^^^^^^^^^^^^^^^^^
  File "/Users/bean/code-oss/project-demo/hello-projects/flows/new-venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 326, in aresult
    return await asyncio.wrap_future(self.future)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/bean/code-oss/project-demo/hello-projects/flows/new-venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 351, in _run_sync
    result = self.fn(*self.args, **self.kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/bean/code-oss/project-demo/hello-projects/flows/multi_flow.py", line 57, in my_task
    MyClass().do_things_in_multiple_processes()
  File "/Users/bean/code-oss/project-demo/hello-projects/flows/multi_flow.py", line 51, in do_things_in_multiple_processes
    Process(target=cloudpickle_wrapped_call(self.do_things)).start()
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/bean/code-oss/project-demo/hello-projects/flows/new-venv/lib/python3.11/site-packages/prefect/utilities/callables.py", line 206, in cloudpickle_wrapped_call
    payload = cloudpickle.dumps((__fn, args, kwargs))
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/bean/code-oss/project-demo/hello-projects/flows/new-venv/lib/python3.11/site-packages/cloudpickle/cloudpickle.py", line 1479, in dumps
    cp.dump(obj)
  File "/Users/bean/code-oss/project-demo/hello-projects/flows/new-venv/lib/python3.11/site-packages/cloudpickle/cloudpickle.py", line 1245, in dump
    return super().dump(obj)
           ^^^^^^^^^^^^^^^^^
  File "/Users/bean/code-oss/project-demo/hello-projects/flows/new-venv/lib/python3.11/site-packages/cloudpickle/cloudpickle.py", line 931, in _file_reduce
    raise pickle.PicklingError(
_pickle.PicklingError: Cannot pickle files that are not opened for reading: w
09:17:26.882 | ERROR   | Task run 'my_task-0' - Finished in state Failed('Task run encountered an exception PicklingError: Cannot pickle files that are not opened for reading: w')
09:17:26.889 | ERROR   | Flow run 'spiritual-marmot' - Encountered exception during execution:
Traceback (most recent call last):
  File "/Users/bean/code-oss/project-demo/hello-projects/flows/new-venv/lib/python3.11/site-packages/prefect/engine.py", line 863, in orchestrate_flow_run
    result = await flow_call.aresult()
             ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/bean/code-oss/project-demo/hello-projects/flows/new-venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 326, in aresult
    return await asyncio.wrap_future(self.future)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/bean/code-oss/project-demo/hello-projects/flows/new-venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 351, in _run_sync
    result = self.fn(*self.args, **self.kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/bean/code-oss/project-demo/hello-projects/flows/multi_flow.py", line 63, in my_flow
    my_task()
  File "/Users/bean/code-oss/project-demo/hello-projects/flows/new-venv/lib/python3.11/site-packages/prefect/tasks.py", line 571, in __call__
    return enter_task_run_engine(
           ^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/bean/code-oss/project-demo/hello-projects/flows/new-venv/lib/python3.11/site-packages/prefect/engine.py", line 1400, in enter_task_run_engine
    return from_sync.wait_for_call_in_loop_thread(begin_run)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/bean/code-oss/project-demo/hello-projects/flows/new-venv/lib/python3.11/site-packages/prefect/_internal/concurrency/api.py", line 243, in wait_for_call_in_loop_thread
    return call.result()
           ^^^^^^^^^^^^^
  File "/Users/bean/code-oss/project-demo/hello-projects/flows/new-venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 317, in result
    return self.future.result(timeout=timeout)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/bean/code-oss/project-demo/hello-projects/flows/new-venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 178, in result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
  File "/Users/bean/.pyenv/versions/3.11.1/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
    raise self._exception
  File "/Users/bean/code-oss/project-demo/hello-projects/flows/new-venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 388, in _run_async
    result = await coro
             ^^^^^^^^^^
  File "/Users/bean/code-oss/project-demo/hello-projects/flows/new-venv/lib/python3.11/site-packages/prefect/engine.py", line 1568, in get_task_call_return_value
    return await future._result()
           ^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/bean/code-oss/project-demo/hello-projects/flows/new-venv/lib/python3.11/site-packages/prefect/futures.py", line 237, in _result
    return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/bean/code-oss/project-demo/hello-projects/flows/new-venv/lib/python3.11/site-packages/prefect/states.py", line 91, in _get_state_result
    raise await get_state_exception(state)
  File "/Users/bean/code-oss/project-demo/hello-projects/flows/new-venv/lib/python3.11/site-packages/prefect/engine.py", line 2107, in orchestrate_task_run
    result = await call.aresult()
             ^^^^^^^^^^^^^^^^^^^^
  File "/Users/bean/code-oss/project-demo/hello-projects/flows/new-venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 326, in aresult
    return await asyncio.wrap_future(self.future)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/bean/code-oss/project-demo/hello-projects/flows/new-venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 351, in _run_sync
    result = self.fn(*self.args, **self.kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/bean/code-oss/project-demo/hello-projects/flows/multi_flow.py", line 57, in my_task
    MyClass().do_things_in_multiple_processes()
  File "/Users/bean/code-oss/project-demo/hello-projects/flows/multi_flow.py", line 51, in do_things_in_multiple_processes
    Process(target=cloudpickle_wrapped_call(self.do_things)).start()
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/bean/code-oss/project-demo/hello-projects/flows/new-venv/lib/python3.11/site-packages/prefect/utilities/callables.py", line 206, in cloudpickle_wrapped_call
    payload = cloudpickle.dumps((__fn, args, kwargs))
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/bean/code-oss/project-demo/hello-projects/flows/new-venv/lib/python3.11/site-packages/cloudpickle/cloudpickle.py", line 1479, in dumps
    cp.dump(obj)
  File "/Users/bean/code-oss/project-demo/hello-projects/flows/new-venv/lib/python3.11/site-packages/cloudpickle/cloudpickle.py", line 1245, in dump
    return super().dump(obj)
           ^^^^^^^^^^^^^^^^^
  File "/Users/bean/code-oss/project-demo/hello-projects/flows/new-venv/lib/python3.11/site-packages/cloudpickle/cloudpickle.py", line 931, in _file_reduce
    raise pickle.PicklingError(
_pickle.PicklingError: Cannot pickle files that are not opened for reading: w
09:17:27.021 | ERROR   | Flow run 'spiritual-marmot' - Finished in state Failed('Flow run encountered an exception. PicklingError: Cannot pickle files that are not opened for reading: w')
Traceback (most recent call last):
  File "/Users/bean/code-oss/project-demo/hello-projects/flows/multi_flow.py", line 70, in 
    my_flow()
  File "/Users/bean/code-oss/project-demo/hello-projects/flows/new-venv/lib/python3.11/site-packages/prefect/flows.py", line 1130, in __call__
    return enter_flow_run_engine_from_flow_call(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/bean/code-oss/project-demo/hello-projects/flows/new-venv/lib/python3.11/site-packages/prefect/engine.py", line 295, in enter_flow_run_engine_from_flow_call
    retval = from_sync.wait_for_call_in_loop_thread(
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/bean/code-oss/project-demo/hello-projects/flows/new-venv/lib/python3.11/site-packages/prefect/_internal/concurrency/api.py", line 243, in wait_for_call_in_loop_thread
    return call.result()
           ^^^^^^^^^^^^^
  File "/Users/bean/code-oss/project-demo/hello-projects/flows/new-venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 317, in result
    return self.future.result(timeout=timeout)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/bean/code-oss/project-demo/hello-projects/flows/new-venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 178, in result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
  File "/Users/bean/.pyenv/versions/3.11.1/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
    raise self._exception
  File "/Users/bean/code-oss/project-demo/hello-projects/flows/new-venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 388, in _run_async
    result = await coro
             ^^^^^^^^^^
  File "/Users/bean/code-oss/project-demo/hello-projects/flows/new-venv/lib/python3.11/site-packages/prefect/client/utilities.py", line 51, in with_injected_client
    return await fn(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/bean/code-oss/project-demo/hello-projects/flows/new-venv/lib/python3.11/site-packages/prefect/engine.py", line 398, in create_then_begin_flow_run
    return await state.result(fetch=True)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/bean/code-oss/project-demo/hello-projects/flows/new-venv/lib/python3.11/site-packages/prefect/states.py", line 91, in _get_state_result
    raise await get_state_exception(state)
  File "/Users/bean/code-oss/project-demo/hello-projects/flows/new-venv/lib/python3.11/site-packages/prefect/engine.py", line 863, in orchestrate_flow_run
    result = await flow_call.aresult()
             ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/bean/code-oss/project-demo/hello-projects/flows/new-venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 326, in aresult
    return await asyncio.wrap_future(self.future)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/bean/code-oss/project-demo/hello-projects/flows/new-venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 351, in _run_sync
    result = self.fn(*self.args, **self.kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/bean/code-oss/project-demo/hello-projects/flows/multi_flow.py", line 63, in my_flow
    my_task()
  File "/Users/bean/code-oss/project-demo/hello-projects/flows/new-venv/lib/python3.11/site-packages/prefect/tasks.py", line 571, in __call__
    return enter_task_run_engine(
           ^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/bean/code-oss/project-demo/hello-projects/flows/new-venv/lib/python3.11/site-packages/prefect/engine.py", line 1400, in enter_task_run_engine
    return from_sync.wait_for_call_in_loop_thread(begin_run)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/bean/code-oss/project-demo/hello-projects/flows/new-venv/lib/python3.11/site-packages/prefect/_internal/concurrency/api.py", line 243, in wait_for_call_in_loop_thread
    return call.result()
           ^^^^^^^^^^^^^
  File "/Users/bean/code-oss/project-demo/hello-projects/flows/new-venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 317, in result
    return self.future.result(timeout=timeout)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/bean/code-oss/project-demo/hello-projects/flows/new-venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 178, in result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
  File "/Users/bean/.pyenv/versions/3.11.1/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
    raise self._exception
  File "/Users/bean/code-oss/project-demo/hello-projects/flows/new-venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 388, in _run_async
    result = await coro
             ^^^^^^^^^^
  File "/Users/bean/code-oss/project-demo/hello-projects/flows/new-venv/lib/python3.11/site-packages/prefect/engine.py", line 1568, in get_task_call_return_value
    return await future._result()
           ^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/bean/code-oss/project-demo/hello-projects/flows/new-venv/lib/python3.11/site-packages/prefect/futures.py", line 237, in _result
    return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/bean/code-oss/project-demo/hello-projects/flows/new-venv/lib/python3.11/site-packages/prefect/states.py", line 91, in _get_state_result
    raise await get_state_exception(state)
  File "/Users/bean/code-oss/project-demo/hello-projects/flows/new-venv/lib/python3.11/site-packages/prefect/engine.py", line 2107, in orchestrate_task_run
    result = await call.aresult()
             ^^^^^^^^^^^^^^^^^^^^
  File "/Users/bean/code-oss/project-demo/hello-projects/flows/new-venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 326, in aresult
    return await asyncio.wrap_future(self.future)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/bean/code-oss/project-demo/hello-projects/flows/new-venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 351, in _run_sync
    result = self.fn(*self.args, **self.kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/bean/code-oss/project-demo/hello-projects/flows/multi_flow.py", line 57, in my_task
    MyClass().do_things_in_multiple_processes()
  File "/Users/bean/code-oss/project-demo/hello-projects/flows/multi_flow.py", line 51, in do_things_in_multiple_processes
    Process(target=cloudpickle_wrapped_call(self.do_things)).start()
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/bean/code-oss/project-demo/hello-projects/flows/new-venv/lib/python3.11/site-packages/prefect/utilities/callables.py", line 206, in cloudpickle_wrapped_call
    payload = cloudpickle.dumps((__fn, args, kwargs))
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/bean/code-oss/project-demo/hello-projects/flows/new-venv/lib/python3.11/site-packages/cloudpickle/cloudpickle.py", line 1479, in dumps
    cp.dump(obj)
  File "/Users/bean/code-oss/project-demo/hello-projects/flows/new-venv/lib/python3.11/site-packages/cloudpickle/cloudpickle.py", line 1245, in dump
    return super().dump(obj)
           ^^^^^^^^^^^^^^^^^
  File "/Users/bean/code-oss/project-demo/hello-projects/flows/new-venv/lib/python3.11/site-packages/cloudpickle/cloudpickle.py", line 931, in _file_reduce
    raise pickle.PicklingError(
_pickle.PicklingError: Cannot pickle files that are not opened for reading: w
j-tr commented 5 months ago

hey @serinamarie, thanks for looking into this. I think you are getting the same outcome as I. Please note that the Reproduction that I posted showcases one of four cases depending on the value of TEST_CONFIG.

Only TEST_CONFIG = fork_no_task works as expected.

This is to show that the recommended approaches to multiprocessing in tasks do not work as expected if the workload cannot be pickled (every bullet point from https://github.com/PrefectHQ/prefect/issues/10794#issuecomment-1962121937 corresponds to one of the TEST_CONFIGs)

aaazzam commented 3 weeks ago

@desertaxle do you know if this is resolved in a 3.X world?

desertaxle commented 3 weeks ago

The fork_task scenario in the MRE still deadlocks in 3.x. The task successfully completes before the deadlock, so I suspect something in the engine is causing the process to deadlock.