PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
17.6k stars 1.65k forks source link

import of module '__prefect_loader__' failed when 'multiprocessing.Pool.apply_async' is used within the flow via remote deployments #9329

Open rsampaths16 opened 1 year ago

rsampaths16 commented 1 year ago

First check

Bug summary

Invoked directly

If the flow is invoked directly via python python example.py then it runs successfully without any issues

Invoked via agent through a deployment

If a deployment is created and it is invoked by an agent then the issues occurs that __prefect_loader__ fails pickling of the function

Reproduction

from prefect import flow
from multiprocessing import Pool
import time

@flow
def foo(a):
    p = Pool(4)
    for i in range(a):
        p.apply_async(bar, (i+1,), callback=ok, error_callback=error)
    p.close()
    p.join()

def ok(x):
    print(f"I am ok {x}")

def error(x):
    print(f"I am error {x}")

def bar(b):
    print('hi')
    time.sleep(b)
    print('bye')

if __name__ == '__main__':
    foo(5)

Error

I am error Can't pickle <function bar at 0x7f5c39a97680>: import of module '__prefect_loader__' failed
I am error Can't pickle <function bar at 0x7f5c39a97680>: import of module '__prefect_loader__' failed
I am error Can't pickle <function bar at 0x7f5c39a97680>: import of module '__prefect_loader__' failed
I am error Can't pickle <function bar at 0x7f5c39a97680>: import of module '__prefect_loader__' failed
I am error Can't pickle <function bar at 0x7f5c39a97680>: import of module '__prefect_loader__' failed
I am error Can't pickle <function bar at 0x7f5c39a97680>: import of module '__prefect_loader__' failed
I am error Can't pickle <function bar at 0x7f5c39a97680>: import of module '__prefect_loader__' failed

Versions

###### prefect versions
2.8.5
2.10.5

###### python versions
3.7.9

Additional context

###
### A complete description of a Prefect Deployment for flow 'foo'
###
name: example
description: null
version: 6c8636a7bb4d72afb9b92c3025675f05
# The work queue that will handle this deployment's runs
work_queue_name: test
work_pool_name: default-agent-pool
tags: []
parameters: {}
schedule: null
is_schedule_active: true
infra_overrides: {}

###
### DO NOT EDIT BELOW THIS LINE
###
flow_name: foo
manifest_path: null
infrastructure:
  type: process
  env: {}
  labels: {}
  name: null
  command: null
  stream_output: true
  working_dir: null
  _block_document_id: 762ff9aa-b9e3-4e34-b047-7a26018db371
  _block_document_name: anonymous-617e9166-a2a4-48cc-94b5-4c36a7fe7276
  _is_anonymous: true
  block_type_slug: process
  _block_type_slug: process
storage: null
path: /home/ubuntu/test
entrypoint: example.py:foo
parameter_openapi_schema:
  title: Parameters
  type: object
  properties:
    a:
      title: a
      position: 0
  required:
  - a
  definitions: null
timestamp: '2023-04-20T07:01:21.085073+00:00'
zanieb commented 1 year ago

Thanks for the report! I'm not sure how best to resolve this; it's going to take some investigation — contributions welcome!

rsampaths16 commented 1 year ago

Hey @madkinsz - the main reason it is failing is because - __prefect_loader__ module isn't available in sys.modules to find the function bar.

Since the entry prefect.engine is called from within a subprocess I don't think we need to clean-up __prefect_loader__ sys.module after loading the script.

https://github.com/PrefectHQ/prefect/blob/70ca8f1e7ba19fc0bda1fc0bde0b02572964be30/src/prefect/engine.py#L2203-L2210

https://github.com/PrefectHQ/prefect/blob/70ca8f1e7ba19fc0bda1fc0bde0b02572964be30/src/prefect/utilities/importtools.py#L166-L168

The simple solve will be to just remove sys.modules.pop("__prefect_loader__") and let it be part of the sys.modules;

This will not mess with other flows as it is invoked within a sub-process.

rsampaths16 commented 1 year ago

From following function usages - it is either invoked as an app-command or as a subprocess ( via prefect.engine );

rsampaths16 commented 1 year ago

Let me know if you have any concerns regarding this @madkinsz?

zanieb commented 1 year ago

@rsampaths16 I'd be curious to see what happens when that line is removed. I don't remember the implications of it and it seems like there may be difficulty retrieving the pickled objects in some cases still.

rsampaths16 commented 1 year ago

@madkinsz - removing that line keeps the loaded module within sys.modules ( i.e., the function is available to be pickled ); if we are worried about overriding then we can use a mangled name instead __prefect_loader_uuid4__ or __prefect_loader_path__ without . in the name

ori-scala commented 9 months ago

Hi @rsampaths16 @zanieb I've encountered the same issue and removing that line solves it. Any chance you can do this small patch and save the day?

update - my bad, it just raises a different issue.

Bottom line - we need a fix for the above issue as we can't run starmap in a flow.

secrettoad commented 9 months ago

fyi this is also affecting torch.save when run from within a prefect flow that remotely executes the torch code via dask

EmilRex commented 1 month ago

Another example that reproduces this (I believe):

import multiprocessing as mp
from prefect import flow

def _task(i):
    pass

@flow
def main():
    with mp.Pool(5) as pool:
        pool.map(_task, range(5))

if __name__ == "__main__":
    main.serve()

Running a deployment results in:

Your flow 'main' is being served and polling for scheduled runs!

To trigger a run for this flow, use the following command:

        $ prefect deployment run 'main/main'

You can also run your flow via the Prefect UI: http://127.0.0.1:4200/deployments/deployment/b8c1cba6-ccdc-494d-9b47-109ac2dd6fc5

14:29:50.799 | INFO    | prefect.flow_runs.runner - Runner 'main' submitting flow run 'e2b6cbac-c516-47fa-ae78-90638cf3af6a'
14:29:50.836 | INFO    | prefect.flow_runs.runner - Opening process...
14:29:50.846 | INFO    | prefect.flow_runs.runner - Completed submission of flow run 'e2b6cbac-c516-47fa-ae78-90638cf3af6a'
14:29:51.507 | INFO    | Flow run 'puzzling-aardwark' - Downloading flow code from storage at '.'
14:29:51.573 | ERROR   | Flow run 'puzzling-aardwark' - Encountered exception during execution: PicklingError("Can't pickle <function _task at 0x106dea980>: import of module '__prefect_loader__' failed")
Traceback (most recent call last):
  File "/Users/emilchristensen/github/scratch/multiprocessing-imports/.venv/lib/python3.12/site-packages/prefect/flow_engine.py", line 655, in run_context
    yield self
  File "/Users/emilchristensen/github/scratch/multiprocessing-imports/.venv/lib/python3.12/site-packages/prefect/flow_engine.py", line 699, in run_flow_sync
    engine.call_flow_fn()
  File "/Users/emilchristensen/github/scratch/multiprocessing-imports/.venv/lib/python3.12/site-packages/prefect/flow_engine.py", line 678, in call_flow_fn
    result = call_with_parameters(self.flow.fn, self.parameters)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/emilchristensen/github/scratch/multiprocessing-imports/.venv/lib/python3.12/site-packages/prefect/utilities/callables.py", line 206, in call_with_parameters
    return fn(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^
  File "/Users/emilchristensen/github/scratch/multiprocessing-imports/mre.py", line 11, in main
    pool.map(_task, range(5))
  File "/opt/homebrew/Cellar/python@3.12/3.12.7_1/Frameworks/Python.framework/Versions/3.12/lib/python3.12/multiprocessing/pool.py", line 367, in map
    return self._map_async(func, iterable, mapstar, chunksize).get()
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Cellar/python@3.12/3.12.7_1/Frameworks/Python.framework/Versions/3.12/lib/python3.12/multiprocessing/pool.py", line 774, in get
    raise self._value
  File "/opt/homebrew/Cellar/python@3.12/3.12.7_1/Frameworks/Python.framework/Versions/3.12/lib/python3.12/multiprocessing/pool.py", line 540, in _handle_tasks
    put(task)
  File "/opt/homebrew/Cellar/python@3.12/3.12.7_1/Frameworks/Python.framework/Versions/3.12/lib/python3.12/multiprocessing/connection.py", line 206, in send
    self._send_bytes(_ForkingPickler.dumps(obj))
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Cellar/python@3.12/3.12.7_1/Frameworks/Python.framework/Versions/3.12/lib/python3.12/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
_pickle.PicklingError: Can't pickle <function _task at 0x106dea980>: import of module '__prefect_loader__' failed
14:29:51.594 | ERROR   | Flow run 'puzzling-aardwark' - Finished in state Failed("Flow run encountered an exception: PicklingError: Can't pickle <function _task at 0x106dea980>: import of module '__prefect_loader__' failed")
14:29:51.595 | ERROR   | prefect.engine - Engine execution of flow run 'e2b6cbac-c516-47fa-ae78-90638cf3af6a' exited with unexpected exception
Traceback (most recent call last):
  File "/Users/emilchristensen/github/scratch/multiprocessing-imports/.venv/lib/python3.12/site-packages/prefect/engine.py", line 42, in <module>
    run_flow(flow, flow_run=flow_run)
  File "/Users/emilchristensen/github/scratch/multiprocessing-imports/.venv/lib/python3.12/site-packages/prefect/flow_engine.py", line 821, in run_flow
    return run_flow_sync(**kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/emilchristensen/github/scratch/multiprocessing-imports/.venv/lib/python3.12/site-packages/prefect/flow_engine.py", line 701, in run_flow_sync
    return engine.state if return_type == "state" else engine.result()
                                                       ^^^^^^^^^^^^^^^
  File "/Users/emilchristensen/github/scratch/multiprocessing-imports/.venv/lib/python3.12/site-packages/prefect/flow_engine.py", line 255, in result
    raise self._raised
  File "/Users/emilchristensen/github/scratch/multiprocessing-imports/.venv/lib/python3.12/site-packages/prefect/flow_engine.py", line 655, in run_context
    yield self
  File "/Users/emilchristensen/github/scratch/multiprocessing-imports/.venv/lib/python3.12/site-packages/prefect/flow_engine.py", line 699, in run_flow_sync
    engine.call_flow_fn()
  File "/Users/emilchristensen/github/scratch/multiprocessing-imports/.venv/lib/python3.12/site-packages/prefect/flow_engine.py", line 678, in call_flow_fn
    result = call_with_parameters(self.flow.fn, self.parameters)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/emilchristensen/github/scratch/multiprocessing-imports/.venv/lib/python3.12/site-packages/prefect/utilities/callables.py", line 206, in call_with_parameters
    return fn(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^
  File "/Users/emilchristensen/github/scratch/multiprocessing-imports/mre.py", line 11, in main
    pool.map(_task, range(5))
  File "/opt/homebrew/Cellar/python@3.12/3.12.7_1/Frameworks/Python.framework/Versions/3.12/lib/python3.12/multiprocessing/pool.py", line 367, in map
    return self._map_async(func, iterable, mapstar, chunksize).get()
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Cellar/python@3.12/3.12.7_1/Frameworks/Python.framework/Versions/3.12/lib/python3.12/multiprocessing/pool.py", line 774, in get
    raise self._value
  File "/opt/homebrew/Cellar/python@3.12/3.12.7_1/Frameworks/Python.framework/Versions/3.12/lib/python3.12/multiprocessing/pool.py", line 540, in _handle_tasks
    put(task)
  File "/opt/homebrew/Cellar/python@3.12/3.12.7_1/Frameworks/Python.framework/Versions/3.12/lib/python3.12/multiprocessing/connection.py", line 206, in send
    self._send_bytes(_ForkingPickler.dumps(obj))
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Cellar/python@3.12/3.12.7_1/Frameworks/Python.framework/Versions/3.12/lib/python3.12/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
_pickle.PicklingError: Can't pickle <function _task at 0x106dea980>: import of module '__prefect_loader__' failed
14:29:51.715 | ERROR   | prefect.flow_runs.runner - Process for flow run 'puzzling-aardwark' exited with status code: 1

Version:

Version:             3.0.10
API version:         0.8.4
Python version:      3.12.7
Git commit:          3aa2d893
Built:               Tue, Oct 15, 2024 1:31 PM
OS/Arch:             darwin/arm64
Profile:             local
Server type:         server
Pydantic version:    2.9.2
kujenga commented 1 day ago

I ran into a version of this issue and was able to work around it by switching from script/file path-based entrypoints to module-based entrypoints, e.g. moving from src/module/submodule.py:my_flow to module.submodule:my_flow.

That seems to avoid this code path that introduces __prefect_loader__: https://github.com/PrefectHQ/prefect/blob/62b100162bed359b4eede533d302125ca4c85a3f/src/prefect/utilities/importtools.py#L153-L160

Instead going through this one that imports the module directly: https://github.com/PrefectHQ/prefect/blob/62b100162bed359b4eede533d302125ca4c85a3f/src/prefect/utilities/importtools.py#L177-L189

Would be great if the scripted approach worked as well though!