PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
17.51k stars 1.64k forks source link

Allow caching of side effect tasks with no return value #4056

Closed bnaul closed 1 year ago

bnaul commented 3 years ago

Debatable whether this is a bug or enhancement but I found this surprising. For a task with a specified result but that returns None, the result is effectively ignored: the UI shows either NoResultType or the correct GCSResult but with no location, I believe depending on whether the task succeeded or not (these are mapped runs from the same task) image image

Simple example:

from prefect import Flow, task

@task(target="tmp/{task_name}.txt", checkpoint=True)
def task1():
    return 1

@retry_task(target="tmp/{task_name}.txt", checkpoint=True)
def task2(x):
    print("Did some side effect stuff")
    return None

with Flow("blah", result=GCSResult("model_prefect_tmp")) as f:
    x = task1()
    y = task2(x)

f.run()

Running this twice will hit the cache for task1 but not task2.

There's obviously a pretty simple workaround for this which is to just return True or "Success" or whatever. But in my opinion it's not intuitive that the result configuration would be ignored conditional on the output of the function, and furthermore "task does some expensive side effect that we don't wanna re-run" seems like a great use for caching, even if the result itself is just cloudpickle.dumps(None).

bnaul commented 3 years ago

This just bit us again, anyone (@cicdw or @madkinsz maybe?) have any thoughts on whether this behavior should be changed? I might be missing something but it seems like it's probably as simple as removing an if result.value: check somewhere...would be happy to dig in and make a PR if this seems like a reasonable change

zanieb commented 3 years ago

@bnaul I'll talk it through with the team and follow up with you tomorrow. I feel like with an explicit target this should probably still write your checkpoint file but we'll also want to consider if a change would break anything.

bnaul commented 3 years ago

@madkinsz just checking in, any updates on this 👍 👎 ?

cicdw commented 3 years ago

Hey @bnaul - @madkinsz did raise this internally and it's actually closely related to a milestone we have around result / cache policies and making those much more transparent, configurable and streamlined.

For background, the reason for the current implementation is that many ETL pipelines don't exchange data, and Prefect's auto-checkpointing would then write None to tons of files and create lots of file noise which I originally wanted to avoid.

That being said, your use case makes a lot of sense and I think should be a configurable option -- instead of changing that one line and running the risk of breaking other folks, I'd like to wrap this up into our result policy idea and use this as a reason to fast track it. In the meantime, I recommend returning True from your tasks to unblock your current workflows.

github-actions[bot] commented 1 year ago

This issue is stale because it has been open 30 days with no activity. To keep this issue open remove stale label or comment.

zanieb commented 1 year ago

This is available in Prefect 2.

wtfzambo commented 1 year ago

Hey @madkinsz , sorry for necroposting but I am struggling with the caching of a function that returns None.

With a toy example I can make it work, but not with my use case. What are the necessary conditions for this to happen?

I have a function that accepts a list of dicts as argument and saves the args to duck db. No matter what I try, it seems to never get cached.

What am I missing?

Here's the code:

@task(
    name="Add entries to Duckdb",
    task_run_name="Add {table} to Duckdb",
    cache_key_fn=task_input_hash,
)
def add_records_to_duckdb(
    records: list[dict[str, Any]], table: Literal["submissions"] | Literal["comments"]
):
    df = pd.DataFrame.from_records(records)

    try:
        match table:
            case "submissions":
                df.drop("preview", axis=1, inplace=True)
            case "comments":
                df.drop("all_awardings", axis=1, inplace=True)
    except KeyError as e:
        print(f"{e}, continuing...")

    df_header = df.head(0)  # noqa
    con = DuckDBManager().duckdb_con
    con.sql(CREATE_TABLE_IF_NOT_EXISTS.format(table=table, df="df_header"))
    con.sql(CREATE_OR_REPLACE_TABLE.format(table=table, df="df"))
bnaul commented 1 year ago

We now just always return True or "OK" or whatever 🤷‍♂️

zanieb commented 1 year ago

@wtfzambo could you provide a MRE in a new issue please? This issue is for Prefect 1