PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
17.48k stars 1.64k forks source link

Make it possible for TaskRunner to get access to corresponding flow run. #10373

Open ashtuchkin opened 1 year ago

ashtuchkin commented 1 year ago

First check

Prefect Version

2.x

Describe the current behavior

Currently TaskRunners can't know anything about the corresponding flow run when they are started (when .start() method is called). FlowRunContext is not set at that point. It would be nice to have this context to make it possible to customize task runners depending on the flow run.

Describe the proposed behavior

Either pass flow_run as an argument to start() function, or set FlowRunContext before it's called.

Example Use

We're using DaskTaskRunner, backed by ephemeral dask clusters (one per flow run). It would be nice if we could use flow run name as the corresponding dask cluster name - that would make observability and debugging easier. Additionally, we could use more information from the flow run like tags to modify underlying clusters.

Note this should ideally work both in deployments and when running the flows locally.

Additional context

No response

cicdw commented 1 year ago

Hi @ashtuchkin - we actually do expose many fields dynamically for this exact reason within prefect.runtime. See the original PR for an example of configuring a task runner dynamically based on parameters.

Let me know if this covers your use case and we can close the issue if so!

ashtuchkin commented 1 year ago

Yes I'm aware of both prefect.runtime and FlowRunContext. The problem is that they are not set when the task runner is being started. Let me try to give you a minimal reproducible example.

ashtuchkin commented 1 year ago

Here's an example flow, you can save it to a python file and run:

from distributed import LocalCluster
from prefect import flow
from prefect.context import FlowRunContext
from prefect.runtime import flow_run
from prefect_dask.task_runners import DaskTaskRunner

class MyLocalCluster(LocalCluster):
    def __init__(self, **kwargs):
        # Here I'd like to set the name of the cluster to be equal to the name of the flow
        # But I don't know how to access the flow run name from here. All contexts are None.
        assert FlowRunContext.get() is None
        assert flow_run.name is None

        name = "i_want_this_to_be_the_flow_run_name"
        super().__init__(name=name, n_workers=1, **kwargs)

@flow(task_runner=DaskTaskRunner(cluster_class=MyLocalCluster))
def my_flow():
    print("Hello world!")

if __name__ == "__main__":
    my_flow()
ashtuchkin commented 1 year ago

Note the cluster constructor is called in _start function of the DaskTaskRunner, that's why I'm referring to it in the description. This issue is not specific to DaskTaskRunner.

cicdw commented 1 year ago

You should try that same code with prefect.runtime.flow_run.name

ashtuchkin commented 1 year ago

I'm doing this in the code above, it's None:

from prefect.runtime import flow_run
# ...
       assert flow_run.name is None
cicdw commented 1 year ago

Ah I missed that; ok I know what's going on but I'm not sure what the best way to proceed is yet. Your task runner is being initialized when you define your flow, before any run exists. The run is created (and the name set) when you call the flow in your main block. If this flow was being run via a deployment, your code would work because the run would have been created prior to loading the file.

ashtuchkin commented 1 year ago

AFAIK it's going through the following stages:

  1. Task Runner instance is constructed (initialized) when we define the flow (in @flow decorator or before that).
  2. Task Runner is started (it's "start()" method is called) when the flow_run is already created, in the begin_flow_run function. In the example above, it's at this point DaskTaskRunner creates MyLocalCluster and this is where it would be nice to have the flow run available. Note, at this point FlowRunContext is either None if it's a regular flow, or it still contains parent flow's context if it's a subflow.
  3. FlowRunContext for the current flow is set later, in the orchestrate_flow_run function.

So the ask is to propagate the current flow_run into the TaskRunner.start() function, not into task runner constructor (as the flow_run is not available at that time).

zzstoatzz commented 8 months ago

hi @ashtuchkin - as mentioned above, prefect.runtime.flow_run.name will work if you are running this flow from a deployment (via a worker or a flow that you serve) but does not if you are calling my_flow yourself in the main block because, as you mention, the flow run does not yet exist at the time you call it.

So if you are in a situation where you calling the flow yourself, how does something like this work for you?

from prefect import task, flow
from prefect_dask import DaskTaskRunner

@task
def foo():
    pass

@flow
def my_flow(some_param: str):
    foo.submit()

def get_flow_run_name_from_parameters(parameters: dict) -> str:
    """any transform you need to produce a meaningful name from the params"""
    return f"some-name-{parameters['some_param']}"

if __name__ == "__main__":
    parameters = {"some_param": "foo-value"}

    flow_run_name = get_flow_run_name_from_parameters(parameters)

    my_new_flow = my_flow.with_options(
        flow_run_name=flow_run_name,
        task_runner=DaskTaskRunner(
            cluster_kwargs=dict(name=flow_run_name)
        )
    )

    my_new_flow(**parameters)

where I am producing a flow_run_name from the parameters (optional, you could of course hard code this or set it however you want) and then setting that value as the name of the cluster via cluster_kwargs.

ashtuchkin commented 8 months ago

I guess it could work, but it's pretty verbose and we'd need to implement 2 places where we set the cluster name to support the deployment mode.. We have more than a dozen flows, so putting this boilerplate into each one of them is not great. My hope was to create a reusable TaskRunner class that can be used directly and would work for both deployment mode and local mode.