PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
15.96k stars 1.57k forks source link

prefect.utilities.annotations.quote doesn't work on map results #13585

Open williamjamir opened 4 months ago

williamjamir commented 4 months ago

First check

Bug summary

I've noticed that prefect.utilities.annotations.quote doesn't work as expected when it uses the output from a map.

In the example below, you can see that the value is PrefectFuture rather than the value.

Reproduction

from prefect.utilities.annotations import quote
from prefect import flow, task

@task(task_run_name='task1 with {value}')
def task1(value: str) -> list[str]:
    print('Inside task1 with value: ', value)
    return f'{value}-processed'

@task(task_run_name='task2 with {value}')
def task2(value: str) -> list[str]:
    print('Inside task2 with value: ', value)

@task(task_run_name='task3 with {value}')
def task3(value: str) -> list[str]:
    print('Inside task3 with value: ', value)

@flow(flow_run_name='Example flow')
def run_flow():
    items = task1.map(quote(['acme', 'bar']))
    task2.map(quote(items))
    task3(quote(items[0]))
    print(items)

run_flow()

Error

17:04:41.864 | INFO    | prefect.engine - Created flow run 'teal-corgi' for flow 'run-flow'
17:04:41.866 | INFO    | Flow run 'teal-corgi' - View at http://127.0.0.1:4200/flow-runs/flow-run/fcb77134-84c4-4e4e-886a-55b94ba85e48
17:04:41.988 | INFO    | Flow run 'Example flow' - Created task run 'task1-0' for task 'task1'
17:04:41.988 | INFO    | Flow run 'Example flow' - Submitted task run 'task1-0' for execution.
17:04:41.995 | INFO    | Flow run 'Example flow' - Created task run 'task3-0' for task 'task3'
17:04:41.996 | INFO    | Flow run 'Example flow' - Executing 'task3-0' immediately...
17:04:41.997 | INFO    | Flow run 'Example flow' - Created task run 'task1-1' for task 'task1'
17:04:41.998 | INFO    | Flow run 'Example flow' - Submitted task run 'task1-1' for execution.
17:04:42.027 | INFO    | Flow run 'Example flow' - Created task run 'task2-1' for task 'task2'
17:04:42.028 | INFO    | Flow run 'Example flow' - Submitted task run 'task2-1' for execution.
Inside task1 with value:  acme
Inside task1 with value:  bar
Inside task3 with value:  PrefectFuture('task1-0')
17:04:42.066 | INFO    | Task run 'task1 with acme' - Finished in state Completed()
17:04:42.070 | INFO    | Task run 'task1 with bar' - Finished in state Completed()
17:04:42.075 | INFO    | Task run "task3 with PrefectFuture('task1-0')" - Finished in state Completed()
[PrefectFuture('task1-0'), PrefectFuture('task1-1')]
Inside task2 with value:  PrefectFuture('task1-1')
17:04:42.095 | INFO    | Task run "task2 with PrefectFuture('task1-1')" - Finished in state Completed()
17:04:42.130 | INFO    | Flow run 'Example flow' - Created task run 'task2-0' for task 'task2'
17:04:42.131 | INFO    | Flow run 'Example flow' - Submitted task run 'task2-0' for execution.
Inside task2 with value:  PrefectFuture('task1-0')
17:04:42.169 | INFO    | Task run "task2 with PrefectFuture('task1-0')" - Finished in state Completed()
17:04:42.182 | INFO    | Flow run 'teal-corgi' - Finished in state Completed('All states completed.')

Versions

Version:             2.16.9
API version:         0.8.4
Python version:      3.11.5
Git commit:          083def52
Built:               Thu, Apr 4, 2024 3:11 PM
OS/Arch:             darwin/arm64
Profile:             default
Server type:         server

Additional context

No response

zzstoatzz commented 4 months ago

hi @williamjamir - this is actually the intended behavior of quote, i.e. it skips all introspection of the incoming value, which includes resolving futures.

what's your use case for quote here? perhaps you want something like: task3(quote(items[0].result()))

williamjamir commented 4 months ago

Thanks for the quick reply @zzstoatzz !

I just wanted to clarify that my primary use case is actually task 2. I only did task 3 to demonstrate that the behavior was consistent (using the output of one map as the input for another map).

Here's my specific use case: I'm extracting multiple dataframes from API calls and then passing these dataframes through my filter/business logic before uploading them.

@flow
def myflow():
    # Extracting 
    dfs = extract_dfs.map(my_params_and_inputs)

    # Filters from my flow
    cleaned_data = filters_and_businnes_logic.map(dfs)

   # Rest of my flow

this is actually the intended behavior of quote i.e. it skips all introspection of the incoming value, which includes resolving futures.

Just as a suggestion, it's not clear from the warning raised by Prefect (or by the docs) that futures are not resolved when using quote, only the introspection aspected is mentioned.

Task parameter introspection took 190.735 seconds , exceeding 
`PREFECT_TASK_INTROSPECTION_WARN_THRESHOLD` of 10.0. 

Try wrapping large task parameters with `prefect.utilities.annotations.quote` 
for increased performance, e.g. `my_task(quote(param))`. 
To disable this message set `PREFECT_TASK_INTROSPECTION_WARN_THRESHOLD=0`

In my view, the warning message leads us to think that the most "obvious" solution would be to use the quote. Like the example bellow:

@flow
def myflow():
    # Extracting 
    dfs = extract_dfs.map(my_params_and_inputs)

    # Filters from my flow
    cleaned_data = filters_and_businnes_logic.map(quote(dfs))

Which, in this case, doesn't work =/

Would the team consider perhaps moving this logic to the quote implementation to handle such scenarios? Because I clear see a misuse from the quote API, end even worst the propagation of this unsolved Future to others tasks, like this:


@task()
def task1(value: str) -> list[str]:
    print('Inside task1 with value: ', value)
    return f'{value}-processed'

@task()
def task2(value: str) -> list[str]:
    print('Inside task2 with value: ', value)
    return f'{value}-final'

@task()
def task3(value: str) -> list[str]:
    print('Inside task3 with value: ', value)

@flow(flow_run_name='Example flow')
def run_flow():
    items = task1.map(quote(['acme', 'bar']))
    items_2 = task2.map(quote(items))
    task3.map(items_2)

>>>  run_flow()
Inside task1 with value:  acme
Inside task2 with value:  PrefectFuture('task1-0')
Inside task2 with value:  PrefectFuture('task1-1')    
Inside task3 with value:  PrefectFuture('task1-0')-final
Inside task1 with value:  bar
Inside task3 with value:  PrefectFuture('task1-1')-final

Okay, so to confirm, to handle this case correctly I should call result before passing it to the quote?

@flow
def myflow():
    # Extracting 
    dfs = extract_dfs.map(my_params_and_inputs)

    # Filters from my flow
    cleaned_data = filters_and_businnes_logic.map(quote([i.result() for i in dfs]))

Or should it be the job of the task filters_and_businnes_logic to check for PrefectFuture and call .result?


@task
def filters_and_businnes_logic(df):
   if isinstance(value, PrefectFuture):
       df = df.resut()
   # ... implementation
zzstoatzz commented 4 months ago

Okay, so to confirm, to handle this case correctly I should call result before passing it to the quote?

Or should it be the job of the task filters_and_businnes_logic to check for PrefectFuture and call .result?

I would say whichever way makes most sense to you works fine, I dont think there's a strong practical difference between resolving the futures outside or inside the task (I might lean toward the former, i.e. outside, to keep tasks portable as pure python).

Agreed on the callout around the warning message, we could make it more clear that adding quote would prevent futures from being resolved - can update that.