dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.55k stars 712 forks source link

Scatter with broadcast=True does not broadcast #8657

Closed gsakkis closed 1 week ago

gsakkis commented 1 month ago

Describe the issue:

Running client.scatter with broadcast=True does not broadcast the data to all workers, it sends them to just one worker.

Minimal Complete Verifiable Example:

In [1]: from dask.distributed import Client

In [2]: import pandas as pd

In [3]: c = Client()

In [4]: f = c.scatter(pd.Series(range(100000)), broadcast=True)

In [5]: f
Out[5]: <Future: finished, type: pandas.core.series.Series, key: Series-21dce51035fa9ac9c482c30ab2fd3c3f>

In [6]: c.who_has()
Out[6]: {'Series-21dce51035fa9ac9c482c30ab2fd3c3f': ('tcp://127.0.0.1:53066',)}

In [7]: c.cluster.workers
Out[7]:
{0: <Nanny: tcp://127.0.0.1:53066, threads: 2>,
 1: <Nanny: tcp://127.0.0.1:53068, threads: 2>,
 2: <Nanny: tcp://127.0.0.1:53065, threads: 2>,
 3: <Nanny: tcp://127.0.0.1:53067, threads: 2>}

Environment:

jrbourbeau commented 4 weeks ago

Thanks for the issue @gsakkis. From the scatter API docs it looks you need to disable active memory management (https://distributed.dask.org/en/latest/active_memory_manager.html#enabling-the-active-memory-manager), specifically the reduce replicas policy, to use broadcast=True. Here's a toy example

In [1]: from distributed import Client

In [2]: c = Client(processes=True)

In [3]: c.amm.stop()

In [4]: f = c.scatter(123, broadcast=True)

In [5]: f
Out[5]: <Future: finished, type: int, key: int-4951b9977632a52fcd6f0cc65c57bb33>

In [6]: c.who_has()
Out[6]:
{'int-4951b9977632a52fcd6f0cc65c57bb33': ('tcp://127.0.0.1:55535',
  'tcp://127.0.0.1:55544',
  'tcp://127.0.0.1:55539',
  'tcp://127.0.0.1:55538')}
gsakkis commented 1 week ago

Hi @jrbourbeau, I missed the note about the AMM. Looks like it works as documented, thanks!