dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.57k stars 718 forks source link

worker config set by config.set is not read by worker #3882

Open samaust opened 4 years ago

samaust commented 4 years ago

The configuration directly within Python is explained in the documentation here : Configuration - Directly within Python

When using dask.config.set, I expect the worker to use those values. Instead, the worker reads the default values and does not use the values set using dask.config.set.

I modified distributed\worker.py as below to print the values received by the worker.

if "memory_spill_fraction" in kwargs:
            self.memory_spill_fraction = kwargs.pop("memory_spill_fraction")
            print("self.memory_spill_fraction from kwargs = {}".format(self.memory_spill_fraction))
        else:
            self.memory_spill_fraction = dask.config.get(
                "distributed.worker.memory.spill"
            )
            print("self.memory_spill_fraction from dask.config = {}".format(self.memory_spill_fraction))
import dask
import dask.dataframe as dd

from dask.distributed import Client, LocalCluster

import pandas as pd

cluster = LocalCluster()
client = Client(cluster)

new = {"distributed.worker.memory.target": 0.1,
       "distributed.worker.memory.spill": 0.2,
       "distributed.worker.memory.pause": 0.3}

with dask.config.set(new):
    print(dask.config.get("distributed.worker.memory"))
    timestamp =  pd.date_range('2018-01-01', periods=4, freq='S')
    col1 = pd.Series(["1", "3", "5", "7"], dtype="string")  
    df = pd.DataFrame({"timestamp": timestamp,"col1": col1}).set_index('timestamp')
    ddf = dd.from_pandas(df, npartitions=1)
    ddf.compute()
    ddf.head(2)

Outputs

self.memory_spill_fraction from dask.config = 0.7
self.memory_spill_fraction from dask.config = 0.7
self.memory_spill_fraction from dask.config = 0.7
self.memory_spill_fraction from dask.config = 0.7
{'target': 0.1, 'spill': 0.2, 'pause': 0.3, 'terminate': 0.4}

Notice the 0.7 value which is the default.

Passing the configuration by kwargs works.

import dask
import dask.dataframe as dd

from dask.distributed import Client, LocalCluster

import pandas as pd

cluster = LocalCluster(
    memory_target_fraction=0.1, 
    memory_spill_fraction=0.2,
    memory_pause_fraction=0.3)
client = Client(cluster)

timestamp =  pd.date_range('2018-01-01', periods=4, freq='S')
col1 = pd.Series(["1", "3", "5", "7"], dtype="string")  
df = pd.DataFrame({"timestamp": timestamp,"col1": col1}).set_index('timestamp')
ddf = dd.from_pandas(df, npartitions=1)
ddf.compute()
ddf.head(2)

Outputs

self.memory_spill_fraction from kwargs = 0.2
self.memory_spill_fraction from kwargs = 0.2
self.memory_spill_fraction from kwargs = 0.2
self.memory_spill_fraction from kwargs = 0.2

Environment:

mrocklin commented 4 years ago

That's correct. Calling dask.config.set only affects the local process, and not subprocesses. That might be a reasonable change to make that we could consider. In the meantime I recommend using a yaml file in your .config/dask/ directory if that is accessible to you.

samaust commented 4 years ago

That's correct. Calling dask.config.set only affects the local process, and not subprocesses.

In that case, it's not a bug but it's the intended behaviour by design. I think it's not obvious for new users and it would help to document this in the documentation. I suggest to add that information in the dask documentation in the section I linked to previously or explain it in the distributed documentation and link to it in the dask documentation. I'm not sure what the exact wording should be like.

quasiben commented 4 years ago

@samaust you should be able to control this with dask.config.set if this is performed before creating the cluster

GFleishman commented 3 years ago

@samaust @mrocklin

FYI this caught me recently as well. I spent this morning trying to figure out why my dask-jobqueue LSFCluster workers were being killed with:

OSError: Timed out during handshake while connecting to tcp://10.36.110.11:38453 after 10 s

After setting:

dask.config.set(
    {'distributed.comm.timeouts.connect':'60s',
     'distributed.comm.timeouts.tcp':'120',}
)

from the scheduler process.

I thought the config.set function was broken. IMHO this should be mentioned in the documentation or even better have subprocesses inherit configuration changes from parents.

fjetter commented 3 years ago

I believe this has been closed with https://github.com/dask/distributed/pull/4378 assuming you're using a Nanny / default config.

Note, that you should set the configuration before you start the processes / local cluster. Propagating config changes once the cluster is up is something we do not support at the moment

    import dask
    from dask.distributed import Client, LocalCluster

    new = {"distributed.worker.memory.target": 0.1,
        "distributed.worker.memory.spill": 0.2,
        "distributed.worker.memory.pause": 0.3}

    def get_config(dask_worker):
        return {
            "distributed.worker.memory.target": dask_worker.memory_target_fraction,
            "distributed.worker.memory.spill": dask_worker.memory_spill_fraction,
            "distributed.worker.memory.pause": dask_worker.memory_pause_fraction,
        }

    with dask.config.set(new):
        cluster = LocalCluster()
        client = Client(cluster)
        results = client.run(get_config)
        print(results)
        for _, worker_config in results.items():
            assert worker_config == new
jrbourbeau commented 3 years ago

After setting ... from the scheduler process

As Matt pointed out in https://github.com/dask/distributed/issues/3882#issuecomment-642338414, for clusters which span across multiple machines we recommend using a yaml configuration file in your .config/dask/ directory if that is accessible to you.

I'm not sure if dask-jobqueue supports a built-in way to forward configuration files to nodes in the cluster. Perhaps @andersy005 knows what the recommended best practices are for dask-jobqueue specifically

Christian8491 commented 2 years ago

Late to the party, but this work for me

from dask.distributed import Client
import dask.distributed

#print(dask.config.config)
dask.config.set({'distributed.deploy.lost-worker-timeout': '10ms'})
print(dask.config.get('distributed.deploy.lost-worker-timeout'))

client = Client('<local_ip>:<port>')

Output: 10ms