PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
15.51k stars 1.52k forks source link

`sync_compatible` async/sync context detection error #6677

Closed gbmarc1 closed 2 weeks ago

gbmarc1 commented 1 year ago

First check

Bug summary

Using pytest-asyncio to perform my test, the Azure.load("name") returns an error because the decorator synch_compatible determines that I am not in an async context while I am. Running async_fn(*args, **kwargs) in the wrapper the coroutine is correctly created and works.

Reproduction

import os
from time import sleep

import pytest
import pytest_asyncio
from httpx import ReadError
from prefect.client import get_client
from prefect.filesystems import Azure
from prefect.orion.schemas.core import BlockDocument
from prefect.testing.utilities import prefect_test_harness

@pytest.fixture(autouse=True, scope="session")
def prefect_test_fixture():
    with prefect_test_harness():
        yield

@pytest_asyncio.fixture
async def prefect_client(prefect_test_fixture):
    async with get_client() as client:
        yield client

@pytest_asyncio.fixture
async def azure_blob(prefect_client):
    name = "azureblobblock"
    azure_block_type = list(
        filter(
            lambda b: b.block_type.slug == "azure",
            (await prefect_client.read_block_schemas()),
        )
    )[0]
    azure_block = BlockDocument(
        name=name,
        data={
            "bucket_path": "prefect",
            "azure_storage_account_name": os.environ["ACCOUNT_NAME"],
            "azure_storage_account_key": os.environ["ACCOUNT_KEY"],
        },
        block_schema_id=azure_block_type.id,
        block_type_id=azure_block_type.block_type_id,
    )
    azure_block = await prefect_client.create_block_document(azure_block)

    yield await Azure.load(name)

    await prefect_client.delete_block_document(azure_block.id)

@pytest.mark.asyncio
async def test_azure_blob(azure_blob):

    content = b"ok"
    await azure_blob.write_path("yo.txt", content)
    assert await azure_blob.read_path("yo.txt") == content

Error


   @wraps(async_fn)
    def wrapper(*args, **kwargs):
        if in_async_main_thread():
            caller_frame = sys._getframe(1)
            caller_module = caller_frame.f_globals.get("__name__", "unknown")
            caller_async = caller_frame.f_code.co_flags & inspect.CO_COROUTINE
            if caller_async or any(
                # Add exceptions for the internals anyio/asyncio which can run
                # coroutines from synchronous functions
                caller_module.startswith(f"{module}.")
                for module in ["asyncio", "anyio"]
            ):
                # In the main async context; return the coro for them to await
                return async_fn(*args, **kwargs)
            else:
                # In the main thread but call was made from a sync method
>               raise RuntimeError(
                    "A 'sync_compatible' method was called from a context that was "
                    "previously async but is now sync. The sync call must be changed "
                    "to run in a worker thread to support sending the coroutine for "
                    f"{async_fn.__name__!r} to the main thread."
                )
E               RuntimeError: A 'sync_compatible' method was called from a context that was previously async but is now sync. The sync call must be changed to run in a worker thread to support sending the coroutine for 'load' to the main thread.

Versions


Version:             2.3.0
API version:         0.8.0
Python version:      3.10.0
Git commit:          8d9316c0
Built:               Tue, Aug 30, 2022 5:30 PM
OS/Arch:             linux/x86_64
Profile:             default
Server type:         ephemeral
Server:
Database:          sqlite
SQLite version:    3.38.5

Additional context

No response

gbmarc1 commented 1 year ago

To work around the issue I do the following hack:

    block_document = await prefect_client.read_block_document_by_name(
        name=azure_block.name, block_type_slug=azure_block.block_type.slug
    )
    yield Azure._from_block_document(block_document)
zanieb commented 1 year ago

Thanks for the report! This is definitely incorrect behavior. I believe this is because caller_async does not have handling for wrapped functions, but I'm not sure. We'll need to investigate.

guigautier commented 1 year ago

I got a similar issue when I want to deploy to prefect server.

from prefect.blocks.core import Block
from pydantic import SecretStr

class SFTPCredentials(Block):
    host: str
    username: str
    password: SecretStr
    port: int

def update_block():
    sftp_credentials = SFTPCredentials(
        host="localhost",
        username="demo",
        password=SecretStr("pass"),
        port=4422,
    )
    sftp_credentials.save(name="sftp-credentials", overwrite=True)

When I do prefect deployment apply myflow.yaml with the method update_block().

I got

RuntimeError: A 'sync_compatible' method was called from a context that was previously async but is now sync. The sync call must be changed to run in a worker thread to support sending the coroutine for 'save' to the main thread.
zhiyang112 commented 1 year ago

Got same issue for this RuntimeError: A 'sync_compatible' method was called from a context that was previously async but is now sync. The sync call must be changed to run in a worker thread to support sending the coroutine for 'load' to the main thread.

when i try to use the gcs load

from prefect.filesystems import GCS

gcs_block = GCS.load("test-block")
samanax commented 1 year ago

Should this code work?

from prefect import task, flow
from prefect.client import get_client
from prefect.filesystems import Azure
import asyncio

async def client_setup():
    async with get_client() as client:
        azure_block = Azure.load("stoprefectdev")
        block_document = await client.read_block_document_by_name(
        name=azure_block.name, block_type_slug=azure_block.block_type.slug
    )
    yield Azure._from_block_document(block_document)

asyncio.run(client_setup())

I'm getting the following error: RuntimeError: asyncio.run() cannot be called from a running event loop

zanieb commented 1 year ago

@samanax Please don't add new errors to existing issues.

The code

from prefect import task, flow
from prefect.client import get_client
from prefect.filesystems import Azure
import asyncio

async def client_setup():
    async with get_client() as client:
        azure_block = await Azure.load("stoprefectdev")
        block_document = await client.read_block_document_by_name(
            name=azure_block.name, block_type_slug=azure_block.block_type.slug
        )
    return Azure._from_block_document(block_document)

asyncio.run(client_setup())

Runs fine for me in a Python shell.

chris-aeviator commented 1 year ago

This happens whenever I add

from prefect.filesystems import RemoteFileSystem
remote_file_system_block = RemoteFileSystem.load("my-s3")

no await in my file at all

zanieb commented 1 year ago

@chris-aeviator how is the file being executed?

chris-aeviator commented 1 year ago

@madkinsz it was inside my pipeline file, and deployed via

prefect deployment build ./test-project/test.py:my_task -n automated-task --storage-block remote-file-system/my-s3

but I have since deleted re-added my s3 block via prefect block register --file s3block.py and removed it from my test.py.

zanieb commented 1 year ago

This sounds like a bug with how Prefect is loading your deployment file. Can you share the traceback?

singletonerik commented 1 year ago

I am getting the same error when I run the app from uvicorn(FastaPI). Here's a minimalist recreation:

error_test.py

# boilerplate fastapi helloworld
from fastapi import FastAPI
app = FastAPI()
@app.get("/")
async def root():
    return {"message": "Hello World"}

from prefect.filesystems import GitHub
storage = GitHub.load('repo')

then in the command line:

$ uvicorn error_test.py:app

Traceback (most recent call last):
  File "/Users/erik/code_venvs/prefect-simple-webhook-_il2wvVZ-py3.9/bin/uvicorn", line 8, in <module>
    sys.exit(main())
  File "/Users/erik/code_venvs/prefect-simple-webhook-_il2wvVZ-py3.9/lib/python3.9/site-packages/click/core.py", line 1130, in __call__
    return self.main(*args, **kwargs)
  File "/Users/erik/code_venvs/prefect-simple-webhook-_il2wvVZ-py3.9/lib/python3.9/site-packages/click/core.py", line 1055, in main
    rv = self.invoke(ctx)
  File "/Users/erik/code_venvs/prefect-simple-webhook-_il2wvVZ-py3.9/lib/python3.9/site-packages/click/core.py", line 1404, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/Users/erik/code_venvs/prefect-simple-webhook-_il2wvVZ-py3.9/lib/python3.9/site-packages/click/core.py", line 760, in invoke
    return __callback(*args, **kwargs)
  File "/Users/erik/code_venvs/prefect-simple-webhook-_il2wvVZ-py3.9/lib/python3.9/site-packages/uvicorn/main.py", line 408, in main
    run(
  File "/Users/erik/code_venvs/prefect-simple-webhook-_il2wvVZ-py3.9/lib/python3.9/site-packages/uvicorn/main.py", line 576, in run
    server.run()
  File "/Users/erik/code_venvs/prefect-simple-webhook-_il2wvVZ-py3.9/lib/python3.9/site-packages/uvicorn/server.py", line 60, in run
    return asyncio.run(self.serve(sockets=sockets))
  File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/asyncio/base_events.py", line 642, in run_until_complete
    return future.result()
  File "/Users/erik/code_venvs/prefect-simple-webhook-_il2wvVZ-py3.9/lib/python3.9/site-packages/uvicorn/server.py", line 67, in serve
    config.load()
  File "/Users/erik/code_venvs/prefect-simple-webhook-_il2wvVZ-py3.9/lib/python3.9/site-packages/uvicorn/config.py", line 479, in load
    self.loaded_app = import_from_string(self.app)
  File "/Users/erik/code_venvs/prefect-simple-webhook-_il2wvVZ-py3.9/lib/python3.9/site-packages/uvicorn/importer.py", line 21, in import_from_string
    module = importlib.import_module(module_str)
  File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/importlib/__init__.py", line 127, in import_module
    return _bootstrap._gcd_import(name[level:], package, level)
  File "<frozen importlib._bootstrap>", line 1030, in _gcd_import
  File "<frozen importlib._bootstrap>", line 1007, in _find_and_load
  File "<frozen importlib._bootstrap>", line 972, in _find_and_load_unlocked
  File "<frozen importlib._bootstrap>", line 228, in _call_with_frames_removed
  File "<frozen importlib._bootstrap>", line 1030, in _gcd_import
  File "<frozen importlib._bootstrap>", line 1007, in _find_and_load
  File "<frozen importlib._bootstrap>", line 986, in _find_and_load_unlocked
  File "<frozen importlib._bootstrap>", line 680, in _load_unlocked
  File "<frozen importlib._bootstrap_external>", line 790, in exec_module
  File "<frozen importlib._bootstrap>", line 228, in _call_with_frames_removed
  File "./error_test.py", line 11, in <module>
    storage = GitHub.load('repo')
  File "/Users/erik/code_venvs/prefect-simple-webhook-_il2wvVZ-py3.9/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 198, in wrapper
    raise RuntimeError(
RuntimeError: A 'sync_compatible' method was called from a context that was previously async but is now sync. The sync call must be changed to run in a worker thread to support sending the coroutine for 'load' to the main thread.
zanieb commented 1 year ago

Thanks for the reproduction! This is because when Uvicorn loads the file it does so from a thread with an event loop. If you place your calls in a synchronous function, things will work as you desire:

from fastapi import FastAPI
app = FastAPI()

@app.get("/")
async def root():
    return {"message": "Hello World"}

@app.get("/foo")
def foo():
    from prefect.filesystems import GitHub
    storage = GitHub.load('repo')

If you're going to use it from an async function, you should be using await GitHub.load('repo') instead.

rushter commented 1 year ago

Thanks for the reproduction! This is because when Uvicorn loads the file it does so from a thread with an event loop. If you place your calls in a synchronous function, things will work as you desire:

from fastapi import FastAPI
app = FastAPI()

@app.get("/")
async def root():
    return {"message": "Hello World"}

@app.get("/foo")
def foo():
    from prefect.filesystems import GitHub
    storage = GitHub.load('repo')

If you're going to use it from an async function, you should be using await GitHub.load('repo') instead.

It would be cool to have a way to explicitly call the sync version functions. I call the same sync functions from sync and async environments. The async detector goes crazy when there is an event loop. My function is sync and it fetches secrets via Secret.load. I think currently, there is no way to force sync mode and I have to pass secrets as parameters instead.

aaazzam commented 2 weeks ago

This is addressed in Prefect 3, where sync_compatible allows a force_sync keyword.

Thanks!