PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
17.67k stars 1.65k forks source link

Adaptive Dask Executor workers don't scale up when running many quick tasks #5483

Open Adam-D-Lewis opened 2 years ago

Adam-D-Lewis commented 2 years ago

Description

When mapping over a large set of inputs which creates many tasks, an adaptive DaskExecutor will not scale up it's workers when the tasks that are mapping over the inputs are quick. Scale up does occur when the tasks take longer.

Reproduction

from dask_gateway import Gateway
from distributed import Client
from prefect import task, Flow
import time
from prefect.executors import DaskExecutor

# Define tasks
@task
def my_quick_task(x):
    return

@task
def my_slow_task(x):
    time.sleep(5)
    return

large_iterable_to_map_over = list(range(100_000))

with Flow('Quick Task Flow') as quick_task_flow:
    my_quick_task.map(large_iterable_to_map_over)

with Flow('Slow Task Flow') as slow_task_flow:
    my_slow_task.map(large_iterable_to_map_over)

# Create new adaptive dask cluster for DaskExecutor
with Gateway() as gateway:
    options = gateway.cluster_options()
    with gateway.new_cluster(options) as cluster, Client(cluster) as client:
        cluster.adapt(1, 5)
        my_executor = DaskExecutor(cluster.scheduler_address, client_kwargs={"security": cluster.security})
        client.wait_for_workers(1)

        # Adaptive Dask Cluster won't scale up (at least not within 5 minutes)
        print('starting quick_task_flow')
        quick_task_flow.run(executor=my_executor)

        # Adaptive Dask Cluster will scale up (within ~5 seconds)
        print('starting slow_task_flow')
        slow_task_flow.run(executor=my_executor)

        # Adaptive Dask Cluster does scale up (within ~40 seconds)
        futures = client.map(my_quick_task.__wrapped__, large_iterable_to_map_over)

Environment

{
  "config_overrides": {
    "backend": true
  },
  "env_vars": [],
  "system_information": {
    "platform": "Linux-5.10.98-x86_64-with-glibc2.2.5",
    "prefect_backend": "server",
    "prefect_version": "0.15.13",
    "python_version": "3.8.12"
  }
}

dask == 2021.08.01 distributed == 2021.08.01 dask-gateway == 0.9.0

zanieb commented 2 years ago

Hi! The scaling here should be fully handled by Dask. Do you see scaling occur if submitting tasks to Dask directly rather than using Prefect?

Adam-D-Lewis commented 2 years ago

@madkinsz Yeah, I do see Dask scaling up when using client.map directly though it took 40-50 seconds. I updated the example above, and took note of how long until the additional worker pods show up after the client.wait_for_workers(1) line, and added what I'm seeing to the comments in the example above.