pangeo-forge / gpm-imerge-hhr-feedstock

The gpm-imerge-hhr Feedstock
Apache License 2.0
0 stars 0 forks source link

Slow task submission from Prefect DaskExecutor prevents cluster scaling. #1

Open sharkinsspatial opened 3 years ago

sharkinsspatial commented 3 years ago

We are experiencing extremely slow task submission via the DaskExecutor for very large mapped tasks. With previous flow tests where a task was mapped over roughly 20K items, task submission was sufficiently fast that our Dask cluster scaled workers up to the worker limit. But with a task mapped over 400K items, the DaskExecutor task submission to the scheduler appears rate limited and there are never sufficient tasks on the scheduler it to create more workers and scale so we are stuck with the cluster crawling along with the minimum number of workers. Screen Shot 2021-09-17 at 11 13 47 AM

And note the relatively small number of task which the scheduler has received. Normally the number of cache_inputs tasks should be growing very rapidly and the workers should be saturated forcing the cluster to scale but as you can see in the dashboard image below, the task submission to the scheduler is slow for some reason. Screen Shot 2021-09-17 at 11 12 23 AM

Prefect Slack discussion is https://prefect-community.slack.com/archives/C01TK7XSLT0/p1631896033088800 @rabernat This might be another topic of technical discussion for when you meet with the Prefect team.

rabernat commented 3 years ago

Can you reproduce this without Pangeo Forge? Like, if you just create a really simple flow with 400k mapped tasked, do you see the same behavior?

sharkinsspatial commented 3 years ago

☝️ @rabernat I will run a test to verify.

rabernat commented 3 years ago

Can you also explain why there are so many tasks? Is that basically just how many files there are?

sharkinsspatial commented 3 years ago

Correct. That is the total number of input files.

rabernat commented 3 years ago

I wonder if we should be batching together some of these into larger tasks. That is possible at the store_chunk stage with inputs_per_chunk, but not at the cache_inputs stage.

sharkinsspatial commented 3 years ago

@rabernat I tested with the following example flow

from prefect import Flow, task
from time import sleep

    @task
    def map_fn(x):
        sleep(0.25)
    task_range = list(range(1, 400000))
    with Flow('map_testing') as flow:
        map_fn.map(task_range)

And we did not see the same slow task submission issue

Screen Shot 2021-09-17 at 3 48 10 PM

However this revealed another, deeper issue as my scheduler pod (which is configured with 10GB of memory) was killed almost instantly with an OOM error. Additionally we see worker unmanaged memory growing rapidly. As shown in the image above, unmanaged memory has inexplicably grown to 11GB after only 311 tasks have been processed.

I'm flummoxed that this simple example map is not manageable with Prefect on instances with this resource capacity.

sharkinsspatial commented 3 years ago

@rabernat I'm quickly putting together a pure Dask demonstration to try and narrow this down to a Prefect issue.

sharkinsspatial commented 3 years ago

As a quick experiment to verify the issues we are experiencing are Prefect specific (as there have been some related Dask issue discussions previously https://github.com/dask/distributed/issues/2757). I ran a small experiment to approximate the task submission behavior used by Prefect's DaskExecutor.

from dask_kubernetes import KubeCluster, make_pod_spec
from distributed import Client
from time import sleep
import weakref

futures = weakref.WeakSet()

pod_spec = make_pod_spec(
    image=worker_image,
    memory_limit='4G',
    memory_request='4G',
    cpu_limit=1,
    cpu_request=1,
    env={}
)
cluster = KubeCluster(pod_spec)
cluster.adapt(minimum=4, maximum=10)

client = Client(cluster)

def map_fn(x):
    sleep(0.25)

value_range = list(range(1, 400000))

for value in value_range:
    future = client.submit(map_fn, value)
    futures.add(future)

client.gather(futures)

I did not encounter task submission delays or the memory spikes demonstrated in the equivalent Prefect operation using the DaskExecutor. This operation completed in approx 60s on 4 workers with no memory pressure.