dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.58k stars 718 forks source link

dask not respecting worker resources #2832

Open nmatare opened 5 years ago

nmatare commented 5 years ago

I believe this issue is related to this SO post.

I'm finding that, despite resource restrictions, the dask-scheduler will assign keys to worker nodes even when I'm specifying otherwise.

For example:

from dask.distributed import LocalCluster, Client
cluster = LocalCluster(n_workers=0)
cluster.start_worker(ncores=1)  #  resources={'CPU': 1}, no resources to compute

client = Client(cluster.scheduler.address)
fut = client.submit(lambda x=1: x+1, resources={'CPU':1})
client.who_has()  # here we expect to see nothing as no workers have resources

fut2 = client.submit(lambda x=1: x+1)
client.who_has() # yep, assigned because it doesn't need resources

import dask.dataframe as dd
import pandas.util.testing as tm
import os 
files = dd.from_pandas(tm.makeTimeSeries(1000, freq='10ms'), npartitions=4).to_csv('example*.csv')

# simulate some pipeline where data is read and transformed 
fut3 = client.compute(dd.read_csv('example*.csv').to_delayed(), resources={'CPU':1})
client.who_has()  
# now we see that the scheduler has placed the delayed keys 
# onto the worker even though it has no resources to compute.
#  "('from-delayed-238ec9c6404d8e52399becf66971834c', 2)": ('tcp://127.0.0.1:44429',),
#  "('pandas_read_text-read-block-from-delayed-238ec9c6404d8e52399becf66971834c', 2)": (),
[os.remove(f) for f in files]

Likewise, I'm finding similar behavior with methods like read_parquet.

dask.__version__
'1.2.2'
quasiben commented 5 years ago

@nmatare, thanks for the reproducible report!

I just tried with dask>=2.0 -- is it possible for you to upgrade ?

nmatare commented 5 years ago

Sure. Here's the report with dask 2.1.0.

from dask.distributed import LocalCluster, Client
cluster = LocalCluster(n_workers=0)
cluster.scale(n=1)  

client = Client(cluster.scheduler.address)
client.scheduler_info()['workers'] #  resources={'CPU': 1}, no resources to compute

fut = client.submit(lambda x=1: x+1, resources={'CPU':1})
client.who_has()  # here we expect to see nothing as no workers have resources

fut2 = client.submit(lambda x=1: x+1)
client.who_has() # yep, assigned because it doesn't need resources

import dask.dataframe as dd
import pandas.util.testing as tm
import os 
files = dd.from_pandas(tm.makeTimeSeries(1000, freq='10ms'), npartitions=4).to_csv('example*.csv')

# simulate some pipeline where data is read and transformed 
fut3 = client.compute(dd.read_csv('example*.csv').to_delayed(), resources={'CPU':1})
client.who_has()  
# now we see that the scheduler has placed the delayed keys 
# onto the worker even though it has no resources to compute.
#  "('from-delayed-238ec9c6404d8e52399becf66971834c', 2)": ('tcp://127.0.0.1:44429',),
#  "('pandas_read_text-read-block-from-delayed-238ec9c6404d8e52399becf66971834c', 2)": (),
[os.remove(f) for f in files]
import dask
dask.__version__
# 2.1.0

Getting the same results as with version '1.2.2'

mrocklin commented 5 years ago

The resources keyword only affects the final result tasks by default. There isn't a great way to restrict the entire computation today.

On Thu, Jul 11, 2019 at 4:51 PM Nathan Matare notifications@github.com wrote:

Sure. Here's the report with dask 2.1.0.

from dask.distributed import LocalCluster, Client cluster = LocalCluster(n_workers=0) cluster.scale(n=1)

client = Client(cluster.scheduler.address) client.scheduler_info()['workers'] # resources={'CPU': 1}, no resources to compute

fut = client.submit(lambda x=1: x+1, resources={'CPU':1}) client.who_has() # here we expect to see nothing as no workers have resources

fut2 = client.submit(lambda x=1: x+1) client.who_has() # yep, assigned because it doesn't need resources

import dask.dataframe as dd import pandas.util.testing as tm import os files = dd.from_pandas(tm.makeTimeSeries(1000, freq='10ms'), npartitions=4).to_csv('example*.csv')

simulate some pipeline where data is read and transformed

fut3 = client.compute(dd.read_csv('example*.csv').to_delayed(), resources={'CPU':1}) client.who_has()

now we see that the scheduler has placed the delayed keys

onto the worker even though it has no resources to compute.

"('from-delayed-238ec9c6404d8e52399becf66971834c', 2)": ('tcp://127.0.0.1:44429',),

"('pandas_read_text-read-block-from-delayed-238ec9c6404d8e52399becf66971834c', 2)": (),

[os.remove(f) for f in files]

import dask dask.version

2.1.0

Getting the same results as with version '1.2.2'

— You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHub https://github.com/dask/distributed/issues/2832?email_source=notifications&email_token=AACKZTGOO76RVHOI2OFBQ6TP66TPJA5CNFSM4IAOW4RKYY3PNVWWK3TUL52HS4DFVREXG43VMVBW63LNMVXHJKTDN5WW2ZLOORPWSZGODZYCYXQ#issuecomment-510667870, or mute the thread https://github.com/notifications/unsubscribe-auth/AACKZTHLNQIUJEFOKCNQG5LP66TPJANCNFSM4IAOW4RA .

nmatare commented 5 years ago

Hmm. From what we've put together the downstream effects for us result in the below situation:

We have several workers with special hardware and others without. The workers without hardware do processing before transferring their results to the others via a distributed.Sub/Pub channel. When we trigger computation, the scheduler loads some keys onto the nodes with hardware. (Which we would not like to have happen) We've got other, non-python code running on the nodes with hardware, resulting in the memory on these workers quickly filling up. The keys that are then loaded onto these nodes get stuck on the workers as the workers pause with high memory and stop accepting new tasks. This ends up bricking the cluster as the other workers can't access the keys to finish their computation. We could increase memory capacity on these nodes to ensure we don't hit limits, but it's an inefficient use of our resources.

I imagine that sub-setting the cluster with workers= would produce a similar results? Are resources tracked at the hidden attribute level (i.e., __dask_keys__) Could I inject the resources into each task before triggering the final call to compute?

syagev commented 4 years ago

I'm a bit confused, I have a similar issue but not sure if it's identical.

My pipline is roughly:

  (read_parquet)       (some_delayed_func)
        |                      |
 (to_delayed-1..n)             |
        |                      |
        \                      |
         ------ [delayed(to_delayed-1, some_delayed_func),
                 delayed(to_delayed-2, some_delayed_func),
                 ...
                 delayed(to_delayed-n, some_delayed_func)]
                               |
                         (from_delayed)
                               |
                          (to_parquet)

I call to_parquet with compute=False and get back the write_metadata Delayed object. I then call it's .compute method with the resources keyword set to a dict, which has a single key which is a tuple of all the delayed tasks in that list before from_delayed (I dug those keys using .__dask_graph__().layers).

It seems I'm setting the right keys because looking the task in the dashboard the resources are correctly listed: image

However, the scheduler simply over-assigns these tasks (!!) image

Is this the same issue as described here? Can I somehow constrain those intermediate tasks so that no more than 1 of them are assigned to a single worker?

TomAugspurger commented 4 years ago

@syagev how many processes do you have per worker?

And we'd be grateful if you're able to provide a reproducible example of these resources not being honored.

syagev commented 4 years ago

The cluster is spawned this way:

LocalCluster(processes=True, threads_per_worker=1, resources={'core': 1},
             local_directory=tempfile.gettempdir(), scheduler_port=8786)

But just to make sure I understand - there shouldn't be a case where a worker shows "Consumed" > "Total" right?

I'll do my best to generate a repro. Thanks!

TomAugspurger commented 4 years ago

Thanks, that setup looks correct.

there shouldn't be a case where a worker shows "Consumed" > "Total" right?

That seems logical to me, but I've never used resources personally :)

bnaul commented 4 years ago

I've also started seeing the same behavior as @syagev (starting maybe within the last month or so?): image Our workflow has been running into this case fairly reliably but I'm not quite sure how to come up with a reproducible example (ours is running in k8s w/ many other tasks in flight at the same time). I guess it's possible that the dashboard information was incorrect before also, but as of recently workers that show overprovisioned resources like this deadlock and never finish any of them.

nuKs commented 3 years ago

I did checked with htop and resources are respected in my case, even though worker's "Processing" tab displays multiples "ongoing task", only the top one task in the worker task list is actually being processed.

I have Consumed > Total as well. I think it is normal, as consumed tasks amounts for all tasks that have moved through the worker:

bw4sz commented 2 years ago

My experience here is the same as above. Consumed >total, but if you look at the call stacks, only the top task is actually being computed, which is good news.

image image

My conclusion is that resources behavior is acting properly, but that the 'consumed' label is just a bit confusing. My intuition was that dask would label the other tasks as 'waiting' and not 'processing', because all resources are occupied. Hope this helps others.

BrunoBelucci commented 2 months ago

Today I still observe the same behavior as you talked about, which seems odd at first. I have even observed 'Consumed' as negative numbers. Although I do not understand what this 'Consumed' should account for, it seems, as @bw4sz said, that the resources are respected and only one task is running at a time. I am wondering if this behavior has something to do with adjusting or disabling queuing when using custom resources.