PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
16.25k stars 1.58k forks source link

Callabe task name generators (3.0 vs. 2.x) #15747

Closed dmichaelcarter closed 1 day ago

dmichaelcarter commented 1 week ago

Bug summary

I am doing some spike work on migrating our flows from v2.19 to v3.0. In doing this, I have found what appears to be a bug with callabe task name generators. I often use task vars (more specifically, values in a task dictionary arg) for naming task threads. This is essential in making sense of complex graphs including dozens, if not hundreds of mapped tasks. I often name the task run based on a value such as a table name during ETL processes; thus it would be more of a hassle to gauge impact of failed tasks when they are named by thread ID instead.

The error I am seeing with my callable name generator (which works in Prefect v2.19): TypeError: 'PrefectConcurrentFuture' object is not subscriptable

A much more generic error is also printed during these flow failures:

Please wait for all submitted tasks to complete before exiting your flow by calling `.wait()` on the `PrefectFuture` returned from your `.submit()` calls.

Example:

from prefect import flow, task

@task
def say_hello(name):
    print(f"Hello, {name}!")

@flow
def example_flow():
    future = say_hello.submit(name="Marvin")
    future.wait()

example_flow()

Here is a simple flow which replicates my problem:

from time import sleep
from random import randint
from prefect import task, flow
from prefect.runtime import task_run

def generate_task_run_name() -> str:
    return f'{task_run.task_name} - input["val"]:  {task_run.parameters["input"]["val"]}'

@task(log_prints=True)
def pre_task() -> bool:
    return True

@task(log_prints=True, task_run_name=generate_task_run_name)
def task_1(input: dict) -> dict:
    input_val = input['val']
    input['val'] = input_val + 1
    print(f'task_1 - input["val"]:  {input_val}, output:  {input}')

    sleep(randint(5,20))
    return input

@task(log_prints=True, task_run_name=generate_task_run_name)
def task_2(input: dict) -> dict:
    input_val = input['val']
    input['val'] = input_val * 10
    print(f'task_2 - input["val"]:  {input_val}, output:  {input}')

    sleep(randint(5,20))
    return input

@task(log_prints=True, task_run_name=generate_task_run_name)
def task_3(input: dict) -> None:
    input_val = input['val']
    print(f'task_3 - input["val"]: {input_val}')

    sleep(randint(5,20))
    return

@flow
def my_flow() -> None:
    pre_task()
    inputs: list = [
        {'val': 1, 'something_else': True}
        ,{'val': 2, 'something_else': True}
        ,{'val': 3, 'something_else': True}
        ,{'val': 4, 'something_else': True}
        ,{'val': 5, 'something_else': True}
        ,{'val': 6, 'something_else': True}
        ,{'val': 7, 'something_else': True}
        ,{'val': 8, 'something_else': True}
    ]

    result_1: dict = task_1.map(input=inputs)
    result_2: dict = task_2.map(input=result_1)
    final_result = task_3.map(input=result_2)

    final_result.wait()

if __name__ == '__main__':
    my_flow()

Version info (prefect version output)

Git commit:          0894bad4
Built:               Thu, Oct 10, 2024 10:17 AM
OS/Arch:             darwin/arm64
Profile:             default
Server type:         cloud
Pydantic version:    2.9.2
Integrations:
  prefect-dask:      0.2.9

Additional context

Here is the flow graph when the task name generators are disabled. The tasks have arbitrary IDs as names. I've attached this screenshot to stress my use case for chaining together mapped tasks. This is done for the sake of concurrency and input streaming. Note that downstream tasks kick off as soon as their inputs become available, resulting in the most efficient timing of individual task "threads"/"branches" image

zzstoatzz commented 2 days ago

hi @dmichaelcarter - thank you for the issue!

this turned out to be a little tricky due to how tasks are orchestrated in the 3.x engine

in the open PR linked above, I fix the template syntax for doing this

@task(log_prints=True, task_run_name="running with input: {input[val]}")
def task_1(input: dict[str, Any]) -> dict[str, Any]:
    input["val"] += 1
    print(f"task_1 - input:  {input['val']}")

    sleep(uniform(1, 5))
    return input

and propose an alternative to retrieving the parameters from the runtime module

def generate_task_run_name(parameters: dict) -> str:
    return f'{task_run.task_name} - input: {parameters["input"]["number"]}'

I'm going to see if I can preserve access to runtime.task_run.parameters for backwards compatibility, but what do you think about these alternatives?

dmichaelcarter commented 1 day ago

@zzstoatzz Not sure I totally follow this alternative. How would I pass parameters into generate_task_run_name()?

zzstoatzz commented 1 day ago

hi @dmichaelcarter - you wouldn't have to, as the author of generate_task_run_name, you just assume that we pass the rendered parameters dict so you can use them to generate your name as desired

https://github.com/PrefectHQ/prefect/pull/15773/files#r1813200042

example of how you use this

dmichaelcarter commented 1 day ago

@zzstoatzz okay, so the alternative you are proposing is to use this method?

dmichaelcarter commented 1 day ago

@zzstoatzz sorry I think i misunderstood you. I can still use my original task_run method, but I just need to add the specific task arg name to the generator function?

zzstoatzz commented 1 day ago

I can still use my original task_run method, but I just need to add the specific task arg name to the generator function?

yes! we're just changing where you grab that parameters dict from

you can either do that strategy (a "template string") or the new DX i'll illustrate below - the test I linked illustrates that you can use either one

ill use your original example against main for reference, the only change I need to make is that instead of using prefect.runtime.task_run.parameters I instead assume that prefect will pass me the right parameters if I define my custom function to accept parameters

so all im changing is the following

before

def generate_task_run_name() -> str:
    return f'{task_run.task_name} - input["val"]:  {task_run.parameters["input"]["val"]}'

after

def generate_task_run_name(parameters: dict) -> str:
    return f'{task_run.task_name} - input["val"]:  {parameters["input"]["val"]}'

and prefect will make sure your parameters are resolved and passed to this generate_task_run_name when its time to render the custom name

i.e. prefect will inject parameters if and only if you set task_run_name to a callable that accepts a parameters kwarg

Your original example using the new DX ```python from time import sleep from random import randint from prefect import task, flow from prefect.runtime import task_run def generate_task_run_name(parameters: dict) -> str: return f'{task_run.task_name} - input["val"]: {parameters["input"]["val"]}' @task(log_prints=True) def pre_task() -> bool: return True @task(log_prints=True, task_run_name=generate_task_run_name) def task_1(input: dict) -> dict: input_val = input['val'] input['val'] = input_val + 1 print(f'task_1 - input["val"]: {input_val}, output: {input}') sleep(randint(5,20)) return input @task(log_prints=True, task_run_name=generate_task_run_name) def task_2(input: dict) -> dict: input_val = input['val'] input['val'] = input_val * 10 print(f'task_2 - input["val"]: {input_val}, output: {input}') sleep(randint(5,20)) return input @task(log_prints=True, task_run_name=generate_task_run_name) def task_3(input: dict) -> None: input_val = input['val'] print(f'task_3 - input["val"]: {input_val}') sleep(randint(5,20)) return @flow def my_flow() -> None: pre_task() inputs: list = [ {'val': 1, 'something_else': True} ,{'val': 2, 'something_else': True} ,{'val': 3, 'something_else': True} ,{'val': 4, 'something_else': True} ,{'val': 5, 'something_else': True} ,{'val': 6, 'something_else': True} ,{'val': 7, 'something_else': True} ,{'val': 8, 'something_else': True} ] result_1: dict = task_1.map(input=inputs) result_2: dict = task_2.map(input=result_1) final_result = task_3.map(input=result_2) final_result.wait() if __name__ == '__main__': my_flow() ```