PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
15.95k stars 1.57k forks source link

Create DaskTaskRunnerConfig #12980

Open ahuang11 opened 1 year ago

ahuang11 commented 1 year ago

Originally from Evgeny:

class DaskTaskRunnerConfig(Block):
    """Dask Task Runner Config.

    Attributes:
        address (str, optional): Address of a currently running dask scheduler;
            if one is not provided, a temporary cluster will be created in
            `DaskTaskRunner.start()`.  Defaults to `None`.
        cluster_class (str, optional): The cluster class to use when creating
            a temporary dask cluster. Should be the full class name
            (e.g. `"distributed.LocalCluster"`).
        cluster_kwargs (dict, optional): Additional kwargs to pass to the
            `cluster_class` when creating a temporary dask cluster.
        adapt_kwargs (dict, optional): Additional kwargs to pass to
            `cluster.adapt` when creating a temporary dask cluster. Note that
            adaptive scaling is only enabled if `adapt_kwargs` are provided.
        client_kwargs (dict, optional): Additional kwargs to use when creating a
            `dask.distributed.Client`,
            https://distributed.dask.org/en/latest/api.html#client.
    """
    _block_type_name = 'Dask Task Runner Config'

    address: str | None = None
    cluster_class: str | None = None
    cluster_kwargs: dict | None = None
    adapt_kwargs: dict | None = None
    client_kwargs: dict | None = None

    def create_task_runner(self) -> DaskTaskRunner:
        return DaskTaskRunner(
            address=self.address,
            cluster_class=self.cluster_class,
            cluster_kwargs=self.cluster_kwargs,
            adapt_kwargs=self.adapt_kwargs,
            client_kwargs=self.client_kwargs,
        )

However, I wonder how this would work, if it's inside a flow?

zanieb commented 1 year ago

If you do @flow(task_runner=DaskTaskRunnerConfig.load("foo").create_task_runner()) it'll work since we execute the script at runtime.

We'll probably want to explore a real first-class pattern for this, maybe we can find a way to get this in people's hands without committing to it as the final pattern.

desertaxle commented 1 year ago

This is very interesting!

Definitely worth exploring a first-class pattern since it could make the configuration of task runners easier. Could we create a type of block similar to an infra block where it's job is to store config and create a specific type of task runner? If we could, it would probably involve creating a new block capability and adding a new task argument to accept those types of blocks.

Probably other ways to approach this, but this definitely worth exploring!

zanieb commented 1 year ago

Could we create a type of block similar to an infra block where it's job is to store config and create a specific type of task runner?

I've talked about this before, but basically I think that task runners should become WorkerInfrastructure blocks. You'd then just specify @flow(worker_infrastructure="dask-cluster/foo")