PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
15.78k stars 1.54k forks source link

No module named '__prefect_loader__' when using load_flow_from_script as flow to build deployment #6762

Open YasmineHamdy opened 1 year ago

YasmineHamdy commented 1 year ago

First check

Bug summary

I have created dynamic flow and tried to use load_flow_from_script to make the deployment. and it raise the No module named '__prefect_loader__' Error

Reproduction

deployment = await Deployment.build_from_flow(
flow= load_flow_from_script(path=self.file_path,flow_name=self.name),
name=f"deployment",
work_queue_name="demo",
tags=[f"test"],
)
await deployment.apply()

Error

No module named '__prefect_loader__'  

Versions


Version:             2.1.1
API version:         0.8.0
Python version:      3.10.4
Git commit:          dc2ba222
Built:               Thu, Aug 18, 2022 10:18 AM
OS/Arch:             darwin/x86_64
Profile:             default
Server type:         ephemeral
Server:
  Database:          sqlite
  SQLite version:    3.39.2

Additional context

No response

bunchesofdonald commented 1 year ago

Hi @YasmineHamdy! Could you give some more context around your code example? For example what is self here? Is the flow in the same file as the deployment?

YasmineHamdy commented 1 year ago

Hi @bunchesofdonald thanks for your response,

my use case is i have a flow template which i use it to generate flows under specific folder and then deploy the flow.

For Example: this is the base class which i create the flow using it

class BaseFlow:
    name: str = None
    config = {}
    dir_name = ''
    file_path = ''

    def __init__(self, name, config):
        self.config = config
        self.name = f"pipeline_{name}".
        self.dir_name = f"./adapters/orchestrationAdapter/perfect/compiled/pipeline_{self.name}_flows/"
        self.file_path = f"{self.dir_name}{self.name}.py"

    def get_flow(self):
        return load_flow_from_script(path=self.file_path, flow_name=self.name)

   async def deploy(self):
        deployment = await Deployment.build_from_flow(
            flow=self.get_flow(),
            name=f"deployment",
            work_queue_name="demo",
            tags=[f"test"],
        )
        await deployment.apply()

    def compile(self):
        try:
            if not os.path.exists(self.dir_name):
                os.makedirs(self.dir_name)

            shutil.copyfile(
                f"./adapters/orchestrationAdapter/perfect/utils/flow_template.py", self.file_path)

            with fileinput.FileInput(self.file_path, inplace=True) as file:
                for line in file:
                    line = line.replace("configToReplace",
                                        f"'{json.dumps(self.config)}'")
                    line = line.replace(
                        "nameToReplace",  f"'{self.name}'")
                    print(line, end="")

        except OSError as e:
            if e.errno != errno.EEXIST:
                raise NotImplementedError

and the then in a different file i got config and name from API request and created the flow as API response as follows:

   flow = BaseFlow(
                name = name,
                config=config
            )
    flow.compile()
    await flow.deploy()
vkrot-innio commented 1 year ago

same issue here. simple reproduction scenario:

cat <<< '
from prefect import flow, task

@task
def print_hello():
    print("hello")

@flow(name="orion")
def f():
    print_hello()

if __name__ == "__main__":
    f()
' > /tmp/orion_flow.py

cat <<< '
from prefect import Flow
import prefect
from prefect.deployments import Deployment
from prefect.infrastructure import KubernetesJob

flow = prefect.utilities.importtools.import_object("/tmp/orion_flow.py:f")
if isinstance(flow, Flow):
    print(f"Found flow {flow.name}")
else:
    raise RuntimeError(
        f"Found object of unexpected type {type(flow).__name__!r}. Expected 'Flow'."
    )

deployment = Deployment.build_from_flow(
    name=flow.name,
    flow=flow,
    work_queue_name="default",
    skip_upload=True,
    infrastructure=KubernetesJob(
        image="myrepo/my-image:2-python3.10",
    )
)
print(deployment.apply())

' > /tmp/orion_deployment.py

python3 /tmp/orion_deployment.py

Does Deployment actually need the flow if it skips upload anyway? The source of the flow is already in docker image.

zanieb commented 1 year ago

@YasmineHamdy can you please share the full traceback for the error? It'd be nice if you updated the first post as well.

@vkrot-innio thanks for the example. We need the flow to generate a schema for the parameters it accepts.

The issue here is that import_object sets the object module which the deployment then fails to load. There are a few ways we can mitigate this. However, this specific issue should be resolved by https://github.com/PrefectHQ/prefect/pull/6720 in the next release.

skohlleffel commented 1 year ago

I'm running into the same issue.

Prefect version

2.6.3

Failing Scenario

When I use load_flow_from_script with a provided path, the deployment fails.

deployment.py

from prefect.flows import load_flow_from_script
from prefect.deployments import Deployment
from prefect.infrastructure import KubernetesJob

def deploy_flow():

    # absolute path to flow file
    flow = load_flow_from_script(path="/workspaces/nimbus/orchestration/flows/test.py")

    Deployment.build_from_flow(
        apply=True,
        flow=flow,
        name="test-deployment",
        work_queue_name="test",
        infrastructure=KubernetesJob(
            job={
                "apiVersion": "batch/v1",
                "kind": "Job",
                "metadata": {
                    "labels": {}
                },
                "spec": {
                    "template": {
                        "spec": {
                            "parallelism": 1,
                            "completions": 1,
                            "restartPolicy": "Never",
                            "containers": [
                                {
                                    "name": "prefect-job",
                                    "envFrom": [{"secretRef": {"name": "doppler-secrets"}}],
                                    "env": []
                                }
                            ]
                        }
                    }
                },
            },
            # clean up jobs after 5 minutes
            finished_job_ttl=300
        )
    )

if __name__ == "__main__":    
    deploy_flow()

Failing Traceback

Traceback (most recent call last):
  File "/usr/local/lib/python3.9/runpy.py", line 197, in _run_module_as_main
    return _run_code(code, main_globals, None,
  File "/usr/local/lib/python3.9/runpy.py", line 87, in _run_code
    exec(code, run_globals)
  File "/workspaces/nimbus/orchestration/flows/deployment.py", line 43, in <module>
    deploy_flow()
  File "/workspaces/nimbus/orchestration/flows/deployment.py", line 8, in deploy_flow
    Deployment.build_from_flow(
  File "/usr/local/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 201, in coroutine_wrapper
    return run_async_in_new_loop(async_fn, *args, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 152, in run_async_in_new_loop
    return anyio.run(partial(__fn, *args, **kwargs))
  File "/usr/local/lib/python3.9/site-packages/anyio/_core/_eventloop.py", line 70, in run
    return asynclib.run(func, *args, **backend_options)
  File "/usr/local/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 292, in run
    return native_run(wrapper(), debug=debug)
  File "/usr/local/lib/python3.9/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/usr/local/lib/python3.9/asyncio/base_events.py", line 647, in run_until_complete
    return future.result()
  File "/usr/local/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 287, in wrapper
    return await func(*args)
  File "/usr/local/lib/python3.9/site-packages/prefect/deployments.py", line 685, in build_from_flow
    module = importlib.import_module(mod_name)
  File "/usr/local/lib/python3.9/importlib/__init__.py", line 127, in import_module
    return _bootstrap._gcd_import(name[level:], package, level)
  File "<frozen importlib._bootstrap>", line 1030, in _gcd_import
  File "<frozen importlib._bootstrap>", line 1007, in _find_and_load
  File "<frozen importlib._bootstrap>", line 984, in _find_and_load_unlocked
ModuleNotFoundError: No module named '__prefect_loader__'

Successful Scenario

When I import the flow function and supply it directly to the deployment, it succeeds.

deployment.py

from prefect.deployments import Deployment
from prefect.infrastructure import KubernetesJob
from orchestration.flows.test import api_flow

def deploy_flow():
    Deployment.build_from_flow(
        apply=True,
        # supply flow function directly
        flow=api_flow,
        name="test-deployment",
        work_queue_name="test",
        infrastructure=KubernetesJob(
            job={
                "apiVersion": "batch/v1",
                "kind": "Job",
                "metadata": {
                    "labels": {}
                },
                "spec": {
                    "template": {
                        "spec": {
                            "parallelism": 1,
                            "completions": 1,
                            "restartPolicy": "Never",
                            "containers": [
                                {
                                    "name": "prefect-job",
                                    "envFrom": [{"secretRef": {"name": "doppler-secrets"}}],
                                    "env": []
                                }
                            ]
                        }
                    }
                },
            },
            # clean up jobs after 5 minutes
            finished_job_ttl=300
        )
    )

if __name__ == "__main__":    
    deploy_flow()
skohlleffel commented 1 year ago

When I use load_flow_from_script the flow.__module__ = __prefect_loader__. However, when I import the flow function directly, the flow.__module__ = orchestration.flows.test, which is the actual module name.

It looks like that load_flow_from_script uses https://github.com/PrefectHQ/prefect/blob/main/src/prefect/utilities/importtools.py#L120. That's what's setting the module name to __prefect_loader__ and is also what's causing the deployment issue in https://github.com/PrefectHQ/prefect/blob/main/src/prefect/deployments.py#L689.

Is it expected behavior that load_flow_from_script would set the flow.__module__ to __prefect_loader__?

jonscyr commented 1 year ago

running into similar issue.

flows = load_flows_from_script(path=str(path))

for flow in flows:
        deployment = await Deployment.build_from_flow(
            name="deployment2",
            flow=flow,
            storage = await S3.load(name=settings.storage_block_name, client=o_client), # load a pre-defined block
            infrastructure=KubernetesJob(
                image = args.image,
                namespace=settings.namespace,
                service_account_name=settings.service_account,
                image_pull_policy="Always",
                finished_job_ttl=1000,
                job_watch_timeout_seconds=100
            )
        )
        deployment_id = await deployment.apply()
  File "/Users/jonscyriac/work/o4s/prefect-flows/./scripts/register.py", line 280, in <module>
    asyncio.run(main())
  File "/Users/jonscyriac/.pyenv/versions/3.9.6/lib/python3.9/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/Users/jonscyriac/.pyenv/versions/3.9.6/lib/python3.9/asyncio/base_events.py", line 642, in run_until_complete
    return future.result()
  File "/Users/jonscyriac/work/o4s/prefect-flows/./scripts/register.py", line 211, in main
    deployment = await Deployment.build_from_flow(
  File "/Users/jonscyriac/.pyenv/versions/3.9.6/envs/orion/lib/python3.9/site-packages/prefect/deployments.py", line 696, in build_from_flow
    module = importlib.import_module(mod_name)
  File "/Users/jonscyriac/.pyenv/versions/3.9.6/lib/python3.9/importlib/__init__.py", line 127, in import_module
    return _bootstrap._gcd_import(name[level:], package, level)
  File "<frozen importlib._bootstrap>", line 1030, in _gcd_import
  File "<frozen importlib._bootstrap>", line 1007, in _find_and_load
  File "<frozen importlib._bootstrap>", line 984, in _find_and_load_unlocked
ModuleNotFoundError: No module named '__prefect_loader__'

deployment works if i directly import my flow in my registration script. but that doesn't work for my pipeline.

Is there any workaround for this?

skohlleffel commented 1 year ago

@jonscyr The flow.__module__ value set from load_flow_from_script() is __prefect_loader__. The flow.__module__ value set when importing the flow function directly is the actual python module. I'm not sure why that is; I assume it's a bug though since it doesn't work for us when trying to create a deployment from the flow object when using load_flow_from_script. We resolve the issue by explicitly setting the flow.__module__ value after loading the flow object from a script. It may not be optimal, but it has worked for us with no issues.

Here's a minimal example:

orchestration/create_flow_deployments.py

from os import path

full_path = "root_dir/orchestration/flow_dir/flow.py"
flow = load_flow_from_script(full_path) # Load flow from separate file

# get the flow file's relative path - needed when setting 'flow.__module__'
# in this example, it will be `orchestration/flow_dir/flow.py`
relative_path = path.relpath(full_path)

# set flow.__module__ using the flow file's relative path - replaces '__prefect_loader__'
# in this example, it will be `orchestration.flow_dir.flow`
flow.__module__ = relative_path.replace(".py", "").replace("/", ".")

# build deployment from the flow object with the updated 'flow.__module__' value
Deployment.build_from_flow(flow=flow, ...)
jonscyr commented 1 year ago

@skohlleffel this works! thank you!

zanieb commented 1 year ago

When we load a flow from a script, we needed to set the module for some reason. I don't remember why but we can investigate further. When using that flow to build a deployment, it won't be available to determine the path of your flow and will error but it can be manually reconstructed. We did not intend load_flow_from_script to be used this way, but I'm glad you've found a workaround!