PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
17.68k stars 1.65k forks source link

prefect-aws 0.5.3 cannot import 'Integration' from 'prefect.client.schemas.objects' #16188

Open jggatter opened 1 day ago

jggatter commented 1 day ago

Bug summary

A user from my organization reported a crash this morning. It appears that Prefect is unable to pull our code from S3 due to an ImportError.

CloudWatch Logs:

14:51:06.345 | ERROR   | prefect.engine - Engine execution of flow run '05bff49b-d128-4f92-a94e-d48b4fd0fe52' exited with unexpected exception
Traceback (most recent call last):
  File "/usr/local/lib/python3.12/site-packages/prefect/deployments/steps/core.py", line 161, in run_steps
    step_output = await run_step(step, upstream_outputs)
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/prefect/deployments/steps/core.py", line 131, in run_step
    step_func = _get_function_for_step(fqn, requires=keywords.get("requires"))
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/prefect/deployments/steps/core.py", line 98, in _get_function_for_step
    step_func = import_object(fully_qualified_name)
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/prefect/utilities/importtools.py", line 216, in import_object
    module = load_module(module_name)
             ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/prefect/utilities/importtools.py", line 187, in load_module
    return importlib.import_module(module_name)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/importlib/__init__.py", line 90, in import_module
    return _bootstrap._gcd_import(name[level:], package, level)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "<frozen importlib._bootstrap>", line 1387, in _gcd_import
  File "<frozen importlib._bootstrap>", line 1360, in _find_and_load
  File "<frozen importlib._bootstrap>", line 1310, in _find_and_load_unlocked
  File "<frozen importlib._bootstrap>", line 488, in _call_with_frames_removed
  File "<frozen importlib._bootstrap>", line 1387, in _gcd_import
  File "<frozen importlib._bootstrap>", line 1360, in _find_and_load
  File "<frozen importlib._bootstrap>", line 1310, in _find_and_load_unlocked
  File "<frozen importlib._bootstrap>", line 488, in _call_with_frames_removed
  File "<frozen importlib._bootstrap>", line 1387, in _gcd_import
  File "<frozen importlib._bootstrap>", line 1360, in _find_and_load
  File "<frozen importlib._bootstrap>", line 1331, in _find_and_load_unlocked
  File "<frozen importlib._bootstrap>", line 935, in _load_unlocked
  File "<frozen importlib._bootstrap_external>", line 995, in exec_module
  File "<frozen importlib._bootstrap>", line 488, in _call_with_frames_removed
  File "/usr/local/lib/python3.12/site-packages/prefect_aws/__init__.py", line 7, in <module>
    from .workers import ECSWorker
  File "/usr/local/lib/python3.12/site-packages/prefect_aws/workers/__init__.py", line 1, in <module>
    from .ecs_worker import ECSWorker
  File "/usr/local/lib/python3.12/site-packages/prefect_aws/workers/ecs_worker.py", line 71, in <module>
    from prefect.workers.base import (
  File "/usr/local/lib/python3.12/site-packages/prefect/workers/__init__.py", line 1, in <module>
    from .process import ProcessWorker
  File "/usr/local/lib/python3.12/site-packages/prefect/workers/process.py", line 56, in <module>
    from prefect.workers.base import (
  File "/usr/local/lib/python3.12/site-packages/prefect/workers/base.py", line 24, in <module>
    from prefect.client.schemas.objects import (
ImportError: cannot import name 'Integration' from 'prefect.client.schemas.objects' (/usr/local/lib/python3.12/site-packages/prefect/client/schemas/objects.py)
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
  File "/usr/local/lib/python3.12/site-packages/prefect/engine.py", line 37, in <module>
    flow_run, flow = load_flow_and_flow_run(flow_run_id=flow_run_id)
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/prefect/flow_engine.py", line 104, in load_flow_and_flow_run
    ## TODO: add error handling to update state and log tracebacks
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/prefect/utilities/asyncutils.py", line 243, in run_coro_as_sync
    return call.result()
           ^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/prefect/_internal/concurrency/calls.py", line 312, in result
    return self.future.result(timeout=timeout)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/prefect/_internal/concurrency/calls.py", line 182, in result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/concurrent/futures/_base.py", line 401, in __get_result
    raise self._exception
  File "/usr/local/lib/python3.12/site-packages/prefect/_internal/concurrency/calls.py", line 383, in _run_async
    result = await coro
             ^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/prefect/utilities/asyncutils.py", line 225, in coroutine_wrapper
    return await task
           ^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/prefect/client/utilities.py", line 75, in wrapper
    return await func(client, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/prefect/flows.py", line 1941, in load_flow_from_flow_run
    output = await run_steps(deployment.pull_steps)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/prefect/deployments/steps/core.py", line 189, in run_steps
    raise StepExecutionError(f"Encountered error while running {fqn}") from exc
prefect.deployments.steps.core.StepExecutionError: Encountered error while running prefect_aws.deployments.steps.pull_from_s3
 > Running pull_from_s3 step...
Unable to load step function: prefect_aws.deployments.steps.pull_from_s3. Attempting install of prefect-aws>=0.3.4.

Yesterday, before the prefect-aws 0.5.3 release, things appeared to be working for this developer. Given the ImportError and recent prefect-aws release, I assume that the release has some incompatibilities with the latest Prefect version, 3.1.5.

Dockerfile:

FROM prefecthq/prefect:3.0-python3.12

WORKDIR /app

COPY requirements.txt .

RUN pip install -U pip && pip install --no-cache-dir -r requirements.txt

Requirements.txt

pandas
psycopg2-binary
requests
prefect>=3.0

Our stop gap solution that seems to have worked was: 1) Upgrading the image to prefecthq/prefect:3.1-python3.12 (Just some housekeeping, perhaps not the solution) 2) In requirements.txt pinning prefect == 3.1.4 and prefect-aws == 0.5.2

Unfortunately I do not have time to confirm my suspicions by looking through the Prefect codebase or trying to reproduce with different versions. Please let me know whether any further information is needed.

Version info

Collecting prefect-aws>=0.3.4
  Downloading prefect_aws-0.5.3-py3-none-any.whl.metadata (2.7 kB)
Collecting boto3>=1.24.53 (from prefect-aws>=0.3.4)
  Downloading boto3-1.35.73-py3-none-any.whl.metadata (6.7 kB)
Collecting botocore>=1.27.53 (from prefect-aws>=0.3.4)
  Downloading botocore-1.35.73-py3-none-any.whl.metadata (5.7 kB)
Collecting mypy_boto3_s3>=1.24.94 (from prefect-aws>=0.3.4)
  Downloading mypy_boto3_s3-1.35.72-py3-none-any.whl.metadata (19 kB)
Collecting mypy_boto3_secretsmanager>=1.26.49 (from prefect-aws>=0.3.4)
  Downloading mypy_boto3_secretsmanager-1.35.0-py3-none-any.whl.metadata (13 kB)
Collecting prefect>=3.1.3 (from prefect-aws>=0.3.4)
  Downloading prefect-3.1.5-py3-none-any.whl.metadata (11 kB)

...

Installing collected packages: zipp, wrapt, tenacity, pyparsing, mypy_boto3_secretsmanager, mypy_boto3_s3, jmespath, importlib-metadata, deprecated, botocore, s3transfer, opentelemetry-api, prefect, boto3, prefect-aws
  Attempting uninstall: prefect
    Found existing installation: prefect 3.0.11
    Uninstalling prefect-3.0.11:
      Successfully uninstalled prefect-3.0.11
Successfully installed boto3-1.35.73 botocore-1.35.73 deprecated-1.2.15 importlib-metadata-8.5.0 jmespath-1.0.1 mypy_boto3_s3-1.35.72 mypy_boto3_secretsmanager-1.35.0 opentelemetry-api-1.28.2 prefect-3.1.5 prefect-aws-0.5.3 pyparsing-3.2.0 s3transfer-0.10.4 tenacity-9.0.0 wrapt-1.17.0 zipp-3.21.0

Additional context

Was using the prefecthq/prefect3.0-python3.12 image ECS Fargate runtime.

zzstoatzz commented 1 day ago

hi @jggatter - thanks for the issue!

what appears to be happening here is that since you've selected prefecthq/prefect3.0-python3.12, you're getting the last patch release of 3.0.x which does not have this Integration schema.

so should be able to update your base image to just prefecthq/prefect3-python3.12 or prefecthq/prefect3.1-python3.12 to end up with prefect==3.1.5

» docker run --rm -it prefecthq/prefect:3.0-python3.12 /bin/bash
root@98896b712d36:/opt/prefect# prefect version
Version:             3.0.11
API version:         0.8.4
Python version:      3.12.7
Git commit:          a17ccfcf
Built:               Thu, Oct 24, 2024 5:36 PM
OS/Arch:             linux/aarch64
Profile:             ephemeral
Server type:         ephemeral
Pydantic version:    2.9.2
Server:
  Database:          sqlite
  SQLite version:    3.40.1

root@98896b712d36:/opt/prefect# python -c "from prefect.client.schemas.objects import Integration"
Traceback (most recent call last):
  File "<string>", line 1, in <module>
ImportError: cannot import name 'Integration' from 'prefect.client.schemas.objects' 

rereading your issue, im not yet convinced I have the story perfectly straight, will keep digging

in particular, its surprising to me to see

Attempting uninstall: prefect
  Found existing installation: prefect 3.0.11
  Uninstalling prefect-3.0.11:
    Successfully uninstalled prefect-3.0.11

...
prefect-3.1.5 prefect-aws-0.5.3

as this combination should work fine. @jggatter can you confirm where these logs came from? I assume the ECS flow run container, but just double checking

jggatter commented 1 day ago

Thanks for the quick reply!

Yep, we run containers on ECS Fargate via our push pool.

Yeah, I was also confused to see the prefect-aws installation picking up prefect >= 3.1.3. Perhaps our prefect YAML deployment requiring prefect-aws >= 0.3.4 for the code pulling step causes this reinstallation.

Even then, I suppose it appears the setup steps continue with the original prefect version and do not use the newly installed prefect? Not sure if there are subprocesses/different Pythons runtimes being used in the deployment steps.

In any case I think it's likely just a problem with our setup then. It's better practice for us to keep up with latest docker images or I suppose keep pins in our Prefect YAML files that don't allow for installation of newer, possibly incompatible, versions e.g. prefect-aws 0.5.3.