PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
16.37k stars 1.59k forks source link

Make Flow Version Available in the Runtime #14886

Closed BlakeEriks closed 2 months ago

BlakeEriks commented 2 months ago

Describe the current behavior

Currently, the runtime only supports a subset of the information for a flow on a given flow run.

Namely:

[
    "id",
    "tags",
    "scheduled_start_time",
    "name",
    "flow_name",
    "parameters",
    "parent_flow_run_id",
    "parent_deployment_id",
    "run_count",
    "api_url",
    "ui_url",
]

Describe the proposed behavior

I would like more information on the flow to be available in the runtime, namely the version for the flow associated with the current flow_run.

Example Use

A user could leverage this information from the runtime within their result_storage_key for a flow.

@flow(
  result_storage_key=“v{flow_run.flow_version}/cached_result.json“
)

This would allow me to invalidate the result storage cache by incrementing my flow version.

Additional context

No response

zzstoatzz commented 2 months ago

hi @BlakeEriks - thanks for the issue!

Assuming you mean to pass result_storage_key as a task setting, you should be able to do this today with get_run_context and with_options, something like

from prefect import flow, task
from prefect.context import get_run_context

@task
def some_task():
    return "hello"

@flow(version="somespecialflowversion")
def some_flow():
    current_flow_version = get_run_context().flow.version
    some_task.with_options(result_storage_key=current_flow_version)()

if __name__ == "__main__":
    some_flow()
running an example ```python » python flows/proofs/flow_version_result_key.py 09:41:52.106 | INFO | prefect.engine - Created flow run 'brown-aardwolf' for flow 'some-flow' 09:41:52.690 | INFO | Task run 'some_task-0' - Created task run 'some_task-0' for task 'some_task' 09:41:53.073 | INFO | Task run 'some_task-0' - Finished in state Completed() 09:41:53.218 | INFO | Flow run 'brown-aardwolf' - Finished in state Completed() » ls ~/.prefect/storage | rg somespeci somespecialflowversion ```

if you're thinking that there would be a preferable DX, feel free to express that here!


EDIT

I see the motivating slack thread now!