dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.58k stars 718 forks source link

Unlucky workload balance after Bag.groupby() #2461

Open stuarteberg opened 5 years ago

stuarteberg commented 5 years ago

[Disclaimer: I'm not 100% sure I'm interpreting these results correctly, so let me know if this is, in fact, expected behavior.]

In certain "unlucky" (but maybe not uncommon) scenarios, Bag.groupby() can lead to very unbalanced workers in a cluster.

Let's start with a simple test case that works fine. I'll create a bag of 100 ints, and then "group" them trivially -- every element is in a group by itself. Then I'll use get_worker() to see how the results are balanced across my cluster of 30 workers.

Good grouping example ```python In [1]: from distributed import Client ...: from distributed.worker import get_worker ...: from dask_jobqueue import LSFCluster ...: from toolz import frequencies ...: import dask.bag as db ...: ...: In [2]: cluster = LSFCluster(cores=1, processes=1, memory='15GB', walltime='01:00', log_directory='worker-logs') ...: cluster.scale(30) ...: client = Client(cluster) ...: ...: In [3]: bag = db.from_sequence(np.arange(100)) In [4]: groups = bag.groupby(lambda x: x) In [5]: workers = groups.map(lambda t: get_worker().address).compute() In [6]: frequencies(workers) Out[6]: {'tcp://10.36.111.32:40190': 3, 'tcp://10.36.111.36:33107': 3, 'tcp://10.36.111.39:37165': 3, 'tcp://10.36.110.32:40587': 4, 'tcp://10.36.111.33:45234': 3, 'tcp://10.36.111.42:41962': 4, 'tcp://10.36.110.40:44968': 4, 'tcp://10.36.111.36:39365': 3, 'tcp://10.36.111.23:37433': 3, 'tcp://10.36.111.32:36918': 3, 'tcp://10.36.111.27:37787': 3, 'tcp://10.36.111.21:45552': 4, 'tcp://10.36.111.36:35714': 5, 'tcp://10.36.110.32:35934': 3, 'tcp://10.36.110.38:44253': 4, 'tcp://10.36.111.27:46136': 4, 'tcp://10.36.110.35:37716': 4, 'tcp://10.36.111.32:43984': 4, 'tcp://10.36.111.18:40827': 2, 'tcp://10.36.111.21:33749': 4, 'tcp://10.36.110.32:40599': 3, 'tcp://10.36.111.16:46645': 4, 'tcp://10.36.111.18:39199': 3, 'tcp://10.36.110.40:41659': 3, 'tcp://10.36.110.38:38503': 3, 'tcp://10.36.110.38:38701': 3, 'tcp://10.36.111.21:33351': 3, 'tcp://10.36.111.39:44899': 3, 'tcp://10.36.110.40:32966': 3, 'tcp://10.36.110.35:42767': 2} In [7]: len(_) Out[7]: 30 ```


So far, so good. All 30 workers end up with some data. The balance isn't perfect, but it's reasonable.

Now let's make a tiny change. This time, I'll multiply each int by 64:

Bad grouping example ```python In [8]: bag = db.from_sequence(64*np.arange(100)) In [9]: groups = bag.groupby(lambda x: x) In [10]: workers = groups.map(lambda t: get_worker().address).compute() In [11]: frequencies(workers) Out[11]: {'tcp://10.36.111.27:37787': 12, 'tcp://10.36.110.40:44968': 4, 'tcp://10.36.110.38:38701': 8, 'tcp://10.36.111.36:33107': 12, 'tcp://10.36.110.32:35934': 8, 'tcp://10.36.111.39:37165': 8, 'tcp://10.36.111.21:33749': 8, 'tcp://10.36.111.21:33351': 4, 'tcp://10.36.111.32:43984': 4, 'tcp://10.36.111.18:39199': 8, 'tcp://10.36.111.39:44899': 4, 'tcp://10.36.111.33:45234': 4, 'tcp://10.36.110.32:40599': 4, 'tcp://10.36.111.36:35714': 4, 'tcp://10.36.111.32:40190': 4, 'tcp://10.36.110.32:40587': 4} In [12]: len(_) Out[12]: 16 ```


As you can see, nearly half of my workers ended up with no data at all, a few of the others have 3x their "fair share" of data. In my real-world code, I'm manipulating large images, and this unfortunate balancing causes my workers to run out of RAM (or at least, I think that's what's happening).

I wonder if this has something to do with the hash() function.

FWIW, I've seen a similar issue in pyspark before, too, though in that case it resulted in WAY worse balancing. I vaguely recall them doing something to workaround it, but I'm having trouble finding mention of it in their issue tracker.

In the pyspark case, the problem occurred when using 2^N workers to group tuples whose elements were offset by a power of two. It boils down to a surprising (to me) property of the python hash function: Although the data in these tuples hash to different values, they're all the same after mod-by-2^N, so pyspark would hash all these tuples to a single worker.

Python hash demo (simulate pyspark worker assignments) ```python In [1]: tuples = [(i,) for i in 64*np.arange(5)] ...: ...: hashes = [hash(t) for t in tuples] ...: print('hashes:\n', hashes, '\n') ...: ...: # Use modulo of hash to assign worker ...: WORKER_COUNT = 32 ...: worker_ids = [h % WORKER_COUNT for h in hashes] ...: print('worker assignments:\n', worker_ids) ...: hashes: [3430018387555, 3429954387363, 3430146387939, 3430082387747, 3430274388323] worker assignments: [3, 3, 3, 3, 3] ```


Clearly, the "bad balancing" in the dask example above isn't THAT bad, so maybe I'm on the wrong track with this hashing theory.

Interestingly, dask.DataFrame.groupby() does NOT exhibit the bad balance that Bag.groupby() exhibits:

DataFrame Example ```python In [62]: df = ddf.from_pandas( pd.DataFrame({'x': 64*np.arange(100)}), npartitions=100 ).set_index('x') In [63]: workers = df.groupby('x')\ .apply(lambda _: pd.DataFrame({'worker': [get_worker().address]}),\ meta=pd.DataFrame({'worker': ['']}))\ .compute()['worker'] In [64]: frequencies(workers) Out[64]: {'tcp://10.36.111.27:37787': 4, 'tcp://10.36.111.18:39199': 3, 'tcp://10.36.111.36:39365': 5, 'tcp://10.36.110.35:37716': 3, 'tcp://10.36.111.16:46645': 3, 'tcp://10.36.111.39:44899': 3, 'tcp://10.36.110.32:35934': 3, 'tcp://10.36.111.42:41962': 3, 'tcp://10.36.110.38:38701': 3, 'tcp://10.36.110.38:38503': 3, 'tcp://10.36.111.21:33351': 3, 'tcp://10.36.110.32:40587': 3, 'tcp://10.36.110.32:40599': 3, 'tcp://10.36.110.35:42767': 3, 'tcp://10.36.110.38:44253': 3, 'tcp://10.36.110.40:32966': 3, 'tcp://10.36.110.40:41659': 3, 'tcp://10.36.110.40:44968': 3, 'tcp://10.36.111.18:40827': 3, 'tcp://10.36.111.21:33749': 3, 'tcp://10.36.111.21:45552': 3, 'tcp://10.36.111.23:37433': 3, 'tcp://10.36.111.27:46136': 4, 'tcp://10.36.111.32:36918': 4, 'tcp://10.36.111.32:40190': 4, 'tcp://10.36.111.32:43984': 4, 'tcp://10.36.111.33:45234': 4, 'tcp://10.36.111.36:33107': 4, 'tcp://10.36.111.36:35714': 4, 'tcp://10.36.111.39:37165': 3} In [65]: len(_) Out[65]: 30 ```


I'm not yet familiar enough with the scheduler internals to know which code to look at to understand how workers are chosen for groupby() results.

mrocklin commented 5 years ago

I agree with your assessment that a bad hashing function might account for this behavior. It looks like we're just using the standard Python hash function, which just passes through integers. Probably it would be good to investigate other options for this hashing function that had nicer hashing properties while still being robust and fast.

On Sun, Jan 13, 2019 at 8:38 PM Stuart Berg notifications@github.com wrote:

[Disclaimer: I'm not 100% sure I'm interpreting these results correctly, so let me know if this is, in fact expected behavior.]

In certain "unlucky" (but maybe not uncommon) scenarios, Bag.groupby() can lead to very unbalanced workers in a cluster.

Let's start with a simple test case that works fine. I'll create a bag of 100 ints, and then "group" them trivially -- every element is in a group by itself. Then I'll use get_worker() to see how the results are balanced across my cluster of 30 workers. Good grouping example

In [1]: from distributed import Client ...: from distributed.worker import get_worker ...: from dask_jobqueue import LSFCluster ...: from toolz import frequencies ...: import dask.bag as db ...: ...:

In [2]: cluster = LSFCluster(cores=1, processes=1, memory='15GB', walltime='01:00', log_directory='worker-logs') ...: cluster.scale(30) ...: client = Client(cluster) ...: ...:

In [3]: bag = db.from_sequence(np.arange(100))

In [4]: groups = bag.groupby(lambda x: x)

In [5]: workers = groups.map(lambda t: get_worker().address).compute()

In [6]: frequencies(workers) Out[6]: {'tcp://10.36.111.32:40190': 3, 'tcp://10.36.111.36:33107': 3, 'tcp://10.36.111.39:37165': 3, 'tcp://10.36.110.32:40587': 4, 'tcp://10.36.111.33:45234': 3, 'tcp://10.36.111.42:41962': 4, 'tcp://10.36.110.40:44968': 4, 'tcp://10.36.111.36:39365': 3, 'tcp://10.36.111.23:37433': 3, 'tcp://10.36.111.32:36918': 3, 'tcp://10.36.111.27:37787': 3, 'tcp://10.36.111.21:45552': 4, 'tcp://10.36.111.36:35714': 5, 'tcp://10.36.110.32:35934': 3, 'tcp://10.36.110.38:44253': 4, 'tcp://10.36.111.27:46136': 4, 'tcp://10.36.110.35:37716': 4, 'tcp://10.36.111.32:43984': 4, 'tcp://10.36.111.18:40827': 2, 'tcp://10.36.111.21:33749': 4, 'tcp://10.36.110.32:40599': 3, 'tcp://10.36.111.16:46645': 4, 'tcp://10.36.111.18:39199': 3, 'tcp://10.36.110.40:41659': 3, 'tcp://10.36.110.38:38503': 3, 'tcp://10.36.110.38:38701': 3, 'tcp://10.36.111.21:33351': 3, 'tcp://10.36.111.39:44899': 3, 'tcp://10.36.110.40:32966': 3, 'tcp://10.36.110.35:42767': 2}

In [7]: len(_) Out[7]: 30

So far, so good. All 30 workers end up with some data. The balance isn't perfect, but it's reasonable.

Now let's make a tiny change. This time, I'll multiply each int by 64: Bad grouping example

In [8]: bag = db.from_sequence(64*np.arange(100))

In [9]: groups = bag.groupby(lambda x: x)

In [10]: workers = groups.map(lambda t: get_worker().address).compute()

In [11]: frequencies(workers) Out[11]: {'tcp://10.36.111.27:37787': 12, 'tcp://10.36.110.40:44968': 4, 'tcp://10.36.110.38:38701': 8, 'tcp://10.36.111.36:33107': 12, 'tcp://10.36.110.32:35934': 8, 'tcp://10.36.111.39:37165': 8, 'tcp://10.36.111.21:33749': 8, 'tcp://10.36.111.21:33351': 4, 'tcp://10.36.111.32:43984': 4, 'tcp://10.36.111.18:39199': 8, 'tcp://10.36.111.39:44899': 4, 'tcp://10.36.111.33:45234': 4, 'tcp://10.36.110.32:40599': 4, 'tcp://10.36.111.36:35714': 4, 'tcp://10.36.111.32:40190': 4, 'tcp://10.36.110.32:40587': 4}

In [12]: len(_) Out[12]: 16

As you can see, nearly half of my workers ended up with no data at all, a few of the others have 3x their "fair share" of data. In my real-world code, I'm manipulating large images, and this unfortunate balancing causes my workers to run out of RAM (or at least, I think that's what's happening).

I wonder if this has something to do with the hash() function.

FWIW, I've seen a issue in pyspark before, too, though in that case it resulted in WAY. I vaguely recall them doing something to workaround it, but I'm having trouble finding mention of it in their issue tracker.

In the pyspark case, the problem occurred when using 2^N workers to group tuples whose elements were offset by a power of two. It boils down to a surprising (to me) property of the python hash function: Although the data in these tuples hash to different values, they're all the same after mod-by-2^N, so pyspark would hash all these tuples to a single worker.

In [1]: tuples = [(i,) for i in 64*np.arange(5)] ...: ...: hashes = [hash(t) for t in tuples] ...: print('hashes:\n', hashes, '\n') ...: ...: # Use modulo of hash to assign worker ...: WORKER_COUNT = 32 ...: worker_ids = [h % WORKER_COUNT for h in hashes] ...: print('worker assignments:\n', worker_ids) ...: hashes: [3430018387555, 3429954387363, 3430146387939, 3430082387747, 3430274388323]

worker assignments: [3, 3, 3, 3, 3]

Clearly, the "bad balancing" in the dask example above isn't THAT bad, so maybe I'm on the wrong track with this hashing theory.

Interestingly, dask.DataFrame.groupby() does NOT exhibit the bad balance that Bag.groupby() exhibits: DataFrame Example

In [62]: df = ddf.from_pandas( pd.DataFrame({'x': 64*np.arange(100)}), npartitions=100 ).set_index('x')

In [63]: workers = df.groupby('x')\ .apply(lambda _: pd.DataFrame({'worker': [get_worker().address]}),\ meta=pd.DataFrame({'worker': ['']}))\ .compute()['worker']

In [64]: frequencies(workers) Out[64]: {'tcp://10.36.111.27:37787': 4, 'tcp://10.36.111.18:39199': 3, 'tcp://10.36.111.36:39365': 5, 'tcp://10.36.110.35:37716': 3, 'tcp://10.36.111.16:46645': 3, 'tcp://10.36.111.39:44899': 3, 'tcp://10.36.110.32:35934': 3, 'tcp://10.36.111.42:41962': 3, 'tcp://10.36.110.38:38701': 3, 'tcp://10.36.110.38:38503': 3, 'tcp://10.36.111.21:33351': 3, 'tcp://10.36.110.32:40587': 3, 'tcp://10.36.110.32:40599': 3, 'tcp://10.36.110.35:42767': 3, 'tcp://10.36.110.38:44253': 3, 'tcp://10.36.110.40:32966': 3, 'tcp://10.36.110.40:41659': 3, 'tcp://10.36.110.40:44968': 3, 'tcp://10.36.111.18:40827': 3, 'tcp://10.36.111.21:33749': 3, 'tcp://10.36.111.21:45552': 3, 'tcp://10.36.111.23:37433': 3, 'tcp://10.36.111.27:46136': 4, 'tcp://10.36.111.32:36918': 4, 'tcp://10.36.111.32:40190': 4, 'tcp://10.36.111.32:43984': 4, 'tcp://10.36.111.33:45234': 4, 'tcp://10.36.111.36:33107': 4, 'tcp://10.36.111.36:35714': 4, 'tcp://10.36.111.39:37165': 3}

In [65]: len(_) Out[65]: 30

I'm not yet familiar enough with the scheduler internals to understand the problem more deeply than this.

— You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHub https://github.com/dask/distributed/issues/2461, or mute the thread https://github.com/notifications/unsubscribe-auth/AASszIesvGOXXWdfR46ClUdHnroDOVIlks5vDAm6gaJpZM4Z9mbj .

mrocklin commented 5 years ago

Dask dataframe uses hashing built-in to Pandas, and so is both faster and doesn't have this issue.

On Mon, Jan 14, 2019 at 8:14 AM Matthew Rocklin mrocklin@gmail.com wrote:

I agree with your assessment that a bad hashing function might account for this behavior. It looks like we're just using the standard Python hash function, which just passes through integers. Probably it would be good to investigate other options for this hashing function that had nicer hashing properties while still being robust and fast.

On Sun, Jan 13, 2019 at 8:38 PM Stuart Berg notifications@github.com wrote:

[Disclaimer: I'm not 100% sure I'm interpreting these results correctly, so let me know if this is, in fact expected behavior.]

In certain "unlucky" (but maybe not uncommon) scenarios, Bag.groupby() can lead to very unbalanced workers in a cluster.

Let's start with a simple test case that works fine. I'll create a bag of 100 ints, and then "group" them trivially -- every element is in a group by itself. Then I'll use get_worker() to see how the results are balanced across my cluster of 30 workers. Good grouping example

In [1]: from distributed import Client ...: from distributed.worker import get_worker ...: from dask_jobqueue import LSFCluster ...: from toolz import frequencies ...: import dask.bag as db ...: ...:

In [2]: cluster = LSFCluster(cores=1, processes=1, memory='15GB', walltime='01:00', log_directory='worker-logs') ...: cluster.scale(30) ...: client = Client(cluster) ...: ...:

In [3]: bag = db.from_sequence(np.arange(100))

In [4]: groups = bag.groupby(lambda x: x)

In [5]: workers = groups.map(lambda t: get_worker().address).compute()

In [6]: frequencies(workers) Out[6]: {'tcp://10.36.111.32:40190': 3, 'tcp://10.36.111.36:33107': 3, 'tcp://10.36.111.39:37165': 3, 'tcp://10.36.110.32:40587': 4, 'tcp://10.36.111.33:45234': 3, 'tcp://10.36.111.42:41962': 4, 'tcp://10.36.110.40:44968': 4, 'tcp://10.36.111.36:39365': 3, 'tcp://10.36.111.23:37433': 3, 'tcp://10.36.111.32:36918': 3, 'tcp://10.36.111.27:37787': 3, 'tcp://10.36.111.21:45552': 4, 'tcp://10.36.111.36:35714': 5, 'tcp://10.36.110.32:35934': 3, 'tcp://10.36.110.38:44253': 4, 'tcp://10.36.111.27:46136': 4, 'tcp://10.36.110.35:37716': 4, 'tcp://10.36.111.32:43984': 4, 'tcp://10.36.111.18:40827': 2, 'tcp://10.36.111.21:33749': 4, 'tcp://10.36.110.32:40599': 3, 'tcp://10.36.111.16:46645': 4, 'tcp://10.36.111.18:39199': 3, 'tcp://10.36.110.40:41659': 3, 'tcp://10.36.110.38:38503': 3, 'tcp://10.36.110.38:38701': 3, 'tcp://10.36.111.21:33351': 3, 'tcp://10.36.111.39:44899': 3, 'tcp://10.36.110.40:32966': 3, 'tcp://10.36.110.35:42767': 2}

In [7]: len(_) Out[7]: 30

So far, so good. All 30 workers end up with some data. The balance isn't perfect, but it's reasonable.

Now let's make a tiny change. This time, I'll multiply each int by 64: Bad grouping example

In [8]: bag = db.from_sequence(64*np.arange(100))

In [9]: groups = bag.groupby(lambda x: x)

In [10]: workers = groups.map(lambda t: get_worker().address).compute()

In [11]: frequencies(workers) Out[11]: {'tcp://10.36.111.27:37787': 12, 'tcp://10.36.110.40:44968': 4, 'tcp://10.36.110.38:38701': 8, 'tcp://10.36.111.36:33107': 12, 'tcp://10.36.110.32:35934': 8, 'tcp://10.36.111.39:37165': 8, 'tcp://10.36.111.21:33749': 8, 'tcp://10.36.111.21:33351': 4, 'tcp://10.36.111.32:43984': 4, 'tcp://10.36.111.18:39199': 8, 'tcp://10.36.111.39:44899': 4, 'tcp://10.36.111.33:45234': 4, 'tcp://10.36.110.32:40599': 4, 'tcp://10.36.111.36:35714': 4, 'tcp://10.36.111.32:40190': 4, 'tcp://10.36.110.32:40587': 4}

In [12]: len(_) Out[12]: 16

As you can see, nearly half of my workers ended up with no data at all, a few of the others have 3x their "fair share" of data. In my real-world code, I'm manipulating large images, and this unfortunate balancing causes my workers to run out of RAM (or at least, I think that's what's happening).

I wonder if this has something to do with the hash() function.

FWIW, I've seen a issue in pyspark before, too, though in that case it resulted in WAY. I vaguely recall them doing something to workaround it, but I'm having trouble finding mention of it in their issue tracker.

In the pyspark case, the problem occurred when using 2^N workers to group tuples whose elements were offset by a power of two. It boils down to a surprising (to me) property of the python hash function: Although the data in these tuples hash to different values, they're all the same after mod-by-2^N, so pyspark would hash all these tuples to a single worker.

In [1]: tuples = [(i,) for i in 64*np.arange(5)] ...: ...: hashes = [hash(t) for t in tuples] ...: print('hashes:\n', hashes, '\n') ...: ...: # Use modulo of hash to assign worker ...: WORKER_COUNT = 32 ...: worker_ids = [h % WORKER_COUNT for h in hashes] ...: print('worker assignments:\n', worker_ids) ...: hashes: [3430018387555, 3429954387363, 3430146387939, 3430082387747, 3430274388323]

worker assignments: [3, 3, 3, 3, 3]

Clearly, the "bad balancing" in the dask example above isn't THAT bad, so maybe I'm on the wrong track with this hashing theory.

Interestingly, dask.DataFrame.groupby() does NOT exhibit the bad balance that Bag.groupby() exhibits: DataFrame Example

In [62]: df = ddf.from_pandas( pd.DataFrame({'x': 64*np.arange(100)}), npartitions=100 ).set_index('x')

In [63]: workers = df.groupby('x')\ .apply(lambda _: pd.DataFrame({'worker': [get_worker().address]}),\ meta=pd.DataFrame({'worker': ['']}))\ .compute()['worker']

In [64]: frequencies(workers) Out[64]: {'tcp://10.36.111.27:37787': 4, 'tcp://10.36.111.18:39199': 3, 'tcp://10.36.111.36:39365': 5, 'tcp://10.36.110.35:37716': 3, 'tcp://10.36.111.16:46645': 3, 'tcp://10.36.111.39:44899': 3, 'tcp://10.36.110.32:35934': 3, 'tcp://10.36.111.42:41962': 3, 'tcp://10.36.110.38:38701': 3, 'tcp://10.36.110.38:38503': 3, 'tcp://10.36.111.21:33351': 3, 'tcp://10.36.110.32:40587': 3, 'tcp://10.36.110.32:40599': 3, 'tcp://10.36.110.35:42767': 3, 'tcp://10.36.110.38:44253': 3, 'tcp://10.36.110.40:32966': 3, 'tcp://10.36.110.40:41659': 3, 'tcp://10.36.110.40:44968': 3, 'tcp://10.36.111.18:40827': 3, 'tcp://10.36.111.21:33749': 3, 'tcp://10.36.111.21:45552': 3, 'tcp://10.36.111.23:37433': 3, 'tcp://10.36.111.27:46136': 4, 'tcp://10.36.111.32:36918': 4, 'tcp://10.36.111.32:40190': 4, 'tcp://10.36.111.32:43984': 4, 'tcp://10.36.111.33:45234': 4, 'tcp://10.36.111.36:33107': 4, 'tcp://10.36.111.36:35714': 4, 'tcp://10.36.111.39:37165': 3}

In [65]: len(_) Out[65]: 30

I'm not yet familiar enough with the scheduler internals to understand the problem more deeply than this.

— You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHub https://github.com/dask/distributed/issues/2461, or mute the thread https://github.com/notifications/unsubscribe-auth/AASszIesvGOXXWdfR46ClUdHnroDOVIlks5vDAm6gaJpZM4Z9mbj .

jakirkham commented 5 years ago

cc @rabernat (as I was re-reading your older comment, which sounds similar)

stuarteberg commented 5 years ago

Related, I think: Lately I'm seeing an even more exaggerated version of this behavior. In this case, the bad balance occurs before I even use groupby(). All I'm using here is from_sequence() and map_partitions().

Here are the stats of the job I'm running right now:

Bag size (total items): 511 num_partitions: 257 number of dask workers: 124

I had hoped that the tasks would be evenly distributed to my 124 workers, with each one being given a single partition. When I started the job, all tasks were assigned to a single worker! A minute later, I took the following screenshots. By that point, some of the tasks had been given to one other worker: But the balance is still quite bad. Most of my workers are sitting idle, while only two are processing any data.

image

Any tips on how to debug this behavior and achieve a better balance would be helpful.

I know that I should switch to DataFrames (or Arrays) to benefit from improved hashing (as mentioned above), but in this case I don't think hashing is responsible. For instance, I'm specifically avoiding powers-of-two when choosing my partition size and number of workers, so I think ordinary Python hashing ought to be well-behaved in this instance.

FWIW, the specific workload here is fairly simple, and not very compute-intensive: Essentially, each partition consists of a bag of integers, and loading a single PNG file with the corresponding name (0.png, 1.png, 2.png, etc.). At this stage, most of the time is spent on disk I/O.

mrocklin commented 5 years ago

I had hoped that the tasks would be evenly distributed to my 124 workers, with each one being given When I started the job, all tasks were assigned to a single worker!

This seems different from this issue. Dask doesn't determine data location by hashing it. A new issue with an MCVE would be welcome.

migichen commented 1 year ago

Hello, we have experienced almost same situation as what is seen in the above image posted by @stuarteberg, working on embarrassingly parallel tasks. We found just calling

client.reblance()

every second works. However, we hesitate to take it as final solution because we are not sure this will instead jam the network. It would be of great help if any experienced fellow could comment on this approach.