PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
15.33k stars 1.5k forks source link

Task run creation blocks the event loop #14113

Open EmilRex opened 2 weeks ago

EmilRex commented 2 weeks ago

First check

Bug summary

In the example below, with @task commented out, python fibonacci.py 100 completes in a second or two. With @task uncommented it takes a minute or two, and appears to run sequentially. I would expect it to run concurrently.

Reproduction

# fibonacci.py
import asyncio
import sys
from prefect import task, flow

cache = {0: 0, 1: 1}

@task
async def fibonacci(n):
    if n not in cache:
        print(f"Calculating fibonacci({n})")
        a = fibonacci(n-1)
        b = fibonacci(n-2)
        cache[n] = await a + await b
    return cache[n]

@flow()
async def main(n):
    return await fibonacci(n)

if __name__ == "__main__":
    n = int(sys.argv[1])
    print(asyncio.run(main(n)))

Error

N/A

Versions

Version:             3.0.0rc3
API version:         0.8.4
Python version:      3.12.3
Git commit:          8a59fb49
Built:               Mon, Jun 17, 2024 9:13 AM
OS/Arch:             darwin/arm64
Profile:             sandbox
Server type:         cloud
Pydantic version:    2.7.4

Additional context

No response

desertaxle commented 1 week ago

After looking at this issue with fresh eyes, I think everything is working as expected. The time increase is due to the overhead of calling the Prefect API for each task run, and the order of execution is the same if the @task decorator is or isn't there. I think it looks concurrent because the Python-only version is so much faster. I'm going to close this, but feel free to reopen or respond if there's something I missed!

desertaxle commented 1 week ago

Reopening because we've identified that the new engine creates task runs sequentially in async flows. See this example for reproduction:

import asyncio

from prefect import flow, task, get_run_logger

@task
async def waiter(n: int = 60):
    get_run_logger().info(f"Waiting for {n} seconds")
    await asyncio.sleep(n)

@flow
async def main(n: int = 100):
    waiters = [waiter() for _ in range(n)]
    await asyncio.gather(*waiters)

if __name__ == "__main__":
    asyncio.run(main())

This is caused by the new engine's use of SyncPrefectClient to send create task run API calls. The resolution of this issue will involve making those calls non-blocking in async contexts.