PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
15.77k stars 1.54k forks source link

No module named '__prefect_loader__' #5984

Open tekumara opened 2 years ago

tekumara commented 2 years ago

Description

The flow run errors with:

Flow could not be retrieved from deployment.
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/prefect/engine.py", line 216, in retrieve_flow_then_begin_flow_run
    flow = await load_flow_from_deployment(deployment, client=client)
  File "/usr/local/lib/python3.9/site-packages/prefect/client.py", line 107, in with_injected_client
    return await fn(*args, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/prefect/deployments.py", line 276, in load_flow_from_deployment
    flow = await maybe_flow.unpackage()
  File "/usr/local/lib/python3.9/site-packages/prefect/client.py", line 107, in with_injected_client
    return await fn(*args, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/prefect/packaging/orion.py", line 24, in unpackage
    return self.serializer.loads(serialized_flow.encode())
  File "/usr/local/lib/python3.9/site-packages/prefect/packaging/serializers.py", line 190, in loads
    return from_qualified_name(blob.decode())
  File "/usr/local/lib/python3.9/site-packages/prefect/utilities/importtools.py", line 58, in from_qualified_name
    module = importlib.import_module(mod_name)
  File "/usr/local/lib/python3.9/importlib/__init__.py", line 127, in import_module
    return _bootstrap._gcd_import(name[level:], package, level)
  File "<frozen importlib._bootstrap>", line 1030, in _gcd_import
  File "<frozen importlib._bootstrap>", line 1007, in _find_and_load
  File "<frozen importlib._bootstrap>", line 984, in _find_and_load_unlocked
ModuleNotFoundError: No module named '__prefect_loader__'

Reproduction / Example

_flows/kubesflow.py:

from prefect import flow, get_run_logger
from prefect.deployments import Deployment
from prefect.flow_runners import KubernetesFlowRunner
from prefect.packaging.orion import OrionPackager
from prefect.packaging.serializers import ImportSerializer

@flow
def kubes_flow() -> None:
    # shown in kubectl logs but not prefect ui
    print("Hello from Kubernetes!")
    # show in prefect ui
    logger = get_run_logger()
    logger.info("Hello Prefect UI from Kubernetes!")

# use the default OrionPackager to store the flow's import path only
# since the flow is already stored inside the docker image
Deployment(
    name="kubes-deployment-orion-packager-import",
    flow=kubes_flow,
    flow_runner=KubernetesFlowRunner(
        image="orion-registry:5000/flow:latest",
    ),
    packager=OrionPackager(serializer=ImportSerializer()),
)

Here's the block for my flow:

curl -s "http://localhost:4200/api/block_documents/a123815c-7a51-4446-8d56-10dc9fd14047" | jq .
{
  "id": "a123815c-7a51-4446-8d56-10dc9fd14047",
  "created": "2022-07-09T11:19:50.140724+00:00",
  "updated": "2022-07-09T11:19:50.140803+00:00",
  "name": "anonymous:48388c349c907ec7959e911b9d42eebd",
  "data": {
    "value": {
      "flow": "__prefect_loader__.kubes_flow"
    }
  },
  "block_schema_id": "02afbc00-fc1e-4dd5-8d42-57b165376620",
  "block_schema": {
    "id": "02afbc00-fc1e-4dd5-8d42-57b165376620",
    "created": "2022-07-09T06:38:11.900676+00:00",
    "updated": "2022-07-09T06:41:41.126000+00:00",
    "checksum": "sha256:767ab2520040f319ca8d60e137cf23f9698fe51deb30b2b2f5848d0944a336d7",
    "fields": {
      "title": "JSON",
      "description": "A block that represents JSON",
      "type": "object",
      "properties": {
        "value": {
          "title": "Value",
          "description": "A JSON-compatible value"
        }
      },
      "required": [
        "value"
      ],
      "block_type_name": "JSON",
      "secret_fields": [],
      "block_schema_references": {}
    },
    "block_type_id": "4dfbd6a2-ba1b-4b44-bfb3-c2732f9fe5dd",
    "block_type": {
      "id": "4dfbd6a2-ba1b-4b44-bfb3-c2732f9fe5dd",
      "created": "2022-07-09T06:38:11.722069+00:00",
      "updated": "2022-07-09T06:38:11.722297+00:00",
      "name": "JSON",
      "logo_url": null,
      "documentation_url": null,
      "description": null,
      "code_example": null,
      "is_protected": false
    },
    "capabilities": []
  },
  "block_type_id": "4dfbd6a2-ba1b-4b44-bfb3-c2732f9fe5dd",
  "block_type": {
    "id": "4dfbd6a2-ba1b-4b44-bfb3-c2732f9fe5dd",
    "created": "2022-07-09T06:38:11.722069+00:00",
    "updated": "2022-07-09T06:38:11.722297+00:00",
    "name": "JSON",
    "logo_url": null,
    "documentation_url": null,
    "description": null,
    "code_example": null,
    "is_protected": false
  },
  "block_document_references": {},
  "is_anonymous": true
}

It looks like __prefect_loader__.kubes_flow is being stored as the module name rather than flows.kubes_flow

prefect 2.0b8

zanieb commented 2 years ago

When the flow script is run directly, we lose the ability to infer the module name since it's replaced with __main__ if you call python <your_script.py> or __prefect_loader__ if we run the script for you. If you move your deployment into a separate file and import the flow, we will be able to use that as the flow's import path:

from my_flows import kubes_flow

Deployment(
    flow=kubes_flow,
)

I'll try to find a way to determine the module path when they're in the same file.

Also note, if your module is a relative import rather than an installed module, this will fail while using the CLI. I'll investigate a fix for that, but you can call Deployment(...).create() directly and run the script with Python to get past it.

tekumara commented 2 years ago

Thanks @madkinsz I'll try that. FYI I'm creating the deployment via the CLI, eg: prefect deployment create flows/kubes_flow.py.

tekumara commented 2 years ago

Yep splitting out the deployment into its own file works! 🥳

NValsted commented 2 years ago

I get a similar error when using DaskTaskRunner and invoking a function from within the same file as the task and flow.

Error

Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/distributed/worker.py", line 2157, in execute
    function, args, kwargs = await self._maybe_deserialize_task(ts)
  File "/usr/local/lib/python3.9/site-packages/distributed/worker.py", line 2127, in _maybe_deserialize_task
    function, args, kwargs = _deserialize(*ts.run_spec)
  File "/usr/local/lib/python3.9/site-packages/distributed/worker.py", line 2755, in _deserialize
    kwargs = pickle.loads(kwargs)
  File "/usr/local/lib/python3.9/site-packages/distributed/protocol/pickle.py", line 73, in loads
    return pickle.loads(x)
ModuleNotFoundError: No module named '__prefect_loader__'

Minimal reproducible example

I have the following 2 files. flow.py:

from prefect import flow, task
from prefect_dask.task_runners import DaskTaskRunner

def inner():
    return 0

@task
def bar():
    inner()

@flow(task_runner=DaskTaskRunner())
def foo():
    future = bar()
    future.wait()

spec.py:

from prefect.deployments import Deployment
from flow import foo

spec = Deployment(
    flow=foo,
    tags=["test"],
)
zanieb commented 1 year ago

Related to #6629

zanieb commented 1 year ago

See also https://github.com/PrefectHQ/prefect/issues/6762

jrbourbeau commented 7 months ago

FWIW I ran into a similar error when attempting to run a computation on a Dask cluster from inside a running Prefect flow. Here's a description and change to distributed to avoid the particular issue I was running into https://github.com/dask/distributed/pull/8502. Just posting here for visibility.