dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.58k stars 717 forks source link

Possible race condition in `replicate()` causing `ValueError("Sample larger than population or is negative")` #6713

Open noloerino opened 2 years ago

noloerino commented 2 years ago

What happened: While running CI for another project, we encountered a ValueError("Sample larger than population or is negative") originating from this line of code in scheduler.py, which in turn is triggered by the user invoking Client.replicate(). The relevant CI run can be found here; the full stack trace is also pasted below.

Per @mvashishtha, a posible cause is that a Dask worker is paused between calculating count and computing tuple(workers - ts._who_has), causing a crash when random.sample is caused. We do not have a minimal example because we're unsure how to reliably set up this scenario.

Full stack trace:
============================= test session starts ==============================
platform linux -- Python 3.8.13, pytest-7.1.2, pluggy-1.0.0
benchmark: 3.4.1 (defaults: timer=time.perf_counter disable_gc=False min_rounds=5 min_time=0.000005 max_time=1.0 calibration_precision=10 warmup=False warmup_iterations=100000)
rootdir: /home/runner/work/modin/modin, configfile: setup.cfg
plugins: cov-2.11.0, benchmark-3.4.1, forked-1.4.0, xdist-2.5.0, Faker-13.15.0
gw0 I / gw1 I
gw0 [488] / gw1 [488]
........................................................................ [ 14%]
........................................................................ [ 29%]
........................................................................ [ 44%]
........................F............................................... [ 59%]
........................................................................ [ 73%]
........................................................................ [ 88%]
........................................................                 [100%]
=================================== FAILURES ===================================
________________ test_pivot[None--None-float_nan_data] _________________
[gw1] linux -- Python 3.8.13 /usr/share/miniconda3/envs/modin/bin/python
data = {'col1': [nan, 15.495823294762069, 54.18965011208873, 84.29175834609546, 52.475393862603724, 3.5983596208683966, ...],...': [nan, nan, nan, nan, 25.291907676386415, nan, ...], 'col12': [nan, nan, nan, nan, 4.509197191137804, nan, ...], ...}
PytestBenchmarkWarning: Benchmarks are automatically disabled because xdist plugin is active.Benchmarks cannot be performed reliably in a parallelized environment.
index = None, columns =  at 0x7f4854de0820>, values = None
    @pytest.mark.parametrize("data", test_data_values, ids=test_data_keys)
    @pytest.mark.parametrize(
        "index", [lambda df: df.columns[0], lambda df: df[df.columns[0]].values, None]
    )
    @pytest.mark.parametrize("columns", [lambda df: df.columns[len(df.columns) // 2]])
    @pytest.mark.parametrize(
        "values", [lambda df: df.columns[-1], lambda df: df.columns[-2:], None]
    )
    def test_pivot(data, index, columns, values):
>       eval_general(
            *create_test_dfs(data),
            lambda df, *args, **kwargs: df.pivot(*args, **kwargs),
            index=index,
            columns=columns,
            values=values,
            check_exception_type=None,
        )
modin/pandas/test/dataframe/test_default.py:489: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
modin/pandas/test/utils.py:767: in eval_general
    values = execute_callable(
modin/pandas/test/utils.py:746: in execute_callable
    md_result = fn(modin_df, **md_kwargs)
modin/pandas/test/dataframe/test_default.py:491: in 
    lambda df, *args, **kwargs: df.pivot(*args, **kwargs),
modin/logging/logger_decorator.py:128: in run_and_log
    return obj(*args, **kwargs)
modin/pandas/dataframe.py:1532: in pivot
    query_compiler=self._query_compiler.pivot(
modin/logging/logger_decorator.py:128: in run_and_log
    return obj(*args, **kwargs)
modin/core/storage_formats/pandas/query_compiler.py:2968: in pivot
    unstacked = reindexed.unstack(level=columns, fill_value=None)
modin/logging/logger_decorator.py:128: in run_and_log
    return obj(*args, **kwargs)
modin/core/storage_formats/pandas/query_compiler.py:1234: in unstack
    new_modin_frame = obj._modin_frame.apply_full_axis(
modin/logging/logger_decorator.py:128: in run_and_log
    return obj(*args, **kwargs)
modin/core/dataframe/pandas/dataframe/dataframe.py:115: in run_f_on_minimally_updated_metadata
    result = f(self, *args, **kwargs)
modin/core/dataframe/pandas/dataframe/dataframe.py:1906: in apply_full_axis
    return self.broadcast_apply_full_axis(
modin/logging/logger_decorator.py:128: in run_and_log
    return obj(*args, **kwargs)
modin/core/dataframe/pandas/dataframe/dataframe.py:115: in run_f_on_minimally_updated_metadata
    result = f(self, *args, **kwargs)
modin/core/dataframe/pandas/dataframe/dataframe.py:2340: in broadcast_apply_full_axis
    new_axes = [
modin/core/dataframe/pandas/dataframe/dataframe.py:2341: in 
    self._compute_axis_labels(i, new_partitions)
modin/logging/logger_decorator.py:128: in run_and_log
    return obj(*args, **kwargs)
modin/core/dataframe/pandas/dataframe/dataframe.py:429: in _compute_axis_labels
    return self._partition_mgr_cls.get_indices(
modin/core/dataframe/pandas/partitioning/partition_manager.py:853: in get_indices
    func = cls.preprocess_func(index_func)
modin/core/dataframe/pandas/partitioning/partition_manager.py:121: in preprocess_func
    return cls._partition_class.preprocess_func(map_func)
modin/core/execution/dask/implementations/pandas_on_dask/partitioning/partition.py:248: in preprocess_func
    return DaskWrapper.put(func, hash=False, broadcast=True)
modin/core/execution/dask/common/engine_wrapper.py:89: in put
    return client.scatter(data, **kwargs)
/usr/share/miniconda3/envs/modin/lib/python3.8/site-packages/distributed/client.py:2354: in scatter
    return self.sync(
/usr/share/miniconda3/envs/modin/lib/python3.8/site-packages/distributed/utils.py:309: in sync
    return sync(
/usr/share/miniconda3/envs/modin/lib/python3.8/site-packages/distributed/utils.py:363: in sync
    raise exc.with_traceback(tb)
/usr/share/miniconda3/envs/modin/lib/python3.8/site-packages/distributed/utils.py:348: in f
    result[0] = yield future
/usr/share/miniconda3/envs/modin/lib/python3.8/site-packages/tornado/gen.py:769: in run
    value = future.result()
/usr/share/miniconda3/envs/modin/lib/python3.8/site-packages/distributed/client.py:2239: in _scatter
    await self.scheduler.scatter(
/usr/share/miniconda3/envs/modin/lib/python3.8/site-packages/distributed/core.py:900: in send_recv_from_rpc
    return await send_recv(comm=comm, op=key, **kwargs)
/usr/share/miniconda3/envs/modin/lib/python3.8/site-packages/distributed/core.py:693: in send_recv
    raise exc.with_traceback(tb)
/usr/share/miniconda3/envs/modin/lib/python3.8/site-packages/distributed/core.py:520: in handle_comm
    result = await result
/usr/share/miniconda3/envs/modin/lib/python3.8/site-packages/distributed/scheduler.py:5847: in scatter
    await self.replicate(keys=keys, workers=workers, n=n)
/usr/share/miniconda3/envs/modin/lib/python3.8/site-packages/distributed/scheduler.py:6644: in replicate
    for ws in random.sample(tuple(workers - ts._who_has), count):
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
>   raise ValueError("Sample larger than population or is negative")
E   ValueError: Sample larger than population or is negative
/usr/share/miniconda3/envs/modin/lib/python3.8/random.py:363: ValueError
----------------------------- Captured stderr call -----------------------------
distributed.worker - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker.html#memtrim for more information. -- Unmanaged memory: 312.40 MiB -- Worker memory limit: 476.84 MiB
distributed.worker - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker.html#memtrim for more information. -- Unmanaged memory: 314.01 MiB -- Worker memory limit: 476.84 MiB
distributed.worker - WARNING - Worker is at 85% memory usage. Pausing worker.  Process memory: 407.79 MiB -- Worker memory limit: 476.84 MiB
distributed.worker - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker.html#memtrim for more information. -- Unmanaged memory: 382.26 MiB -- Worker memory limit: 476.84 MiB
distributed.worker - WARNING - Worker is at 79% memory usage. Resuming worker. Process memory: 378.01 MiB -- Worker memory limit: 476.84 MiB
distributed.worker - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker.html#memtrim for more information. -- Unmanaged memory: 378.01 MiB -- Worker memory limit: 476.84 MiB
distributed.worker - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker.html#memtrim for more information. -- Unmanaged memory: 378.01 MiB -- Worker memory limit: 476.84 MiB
distributed.worker - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker.html#memtrim for more information. -- Unmanaged memory: 377.76 MiB -- Worker memory limit: 476.84 MiB
distributed.worker - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker.html#memtrim for more information. -- Unmanaged memory: 319.11 MiB -- Worker memory limit: 476.84 MiB
distributed.worker - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker.html#memtrim for more information. -- Unmanaged memory: 377.76 MiB -- Worker memory limit: 476.84 MiB
distributed.worker - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker.html#memtrim for more information. -- Unmanaged memory: 377.76 MiB -- Worker memory limit: 476.84 MiB
distributed.worker - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker.html#memtrim for more information. -- Unmanaged memory: 377.76 MiB -- Worker memory limit: 476.84 MiB
distributed.worker - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker.html#memtrim for more information. -- Unmanaged memory: 377.76 MiB -- Worker memory limit: 476.84 MiB
distributed.worker - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker.html#memtrim for more information. -- Unmanaged memory: 377.76 MiB -- Worker memory limit: 476.84 MiB
distributed.worker - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker.html#memtrim for more information. -- Unmanaged memory: 377.76 MiB -- Worker memory limit: 476.84 MiB
distributed.worker - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker.html#memtrim for more information. -- Unmanaged memory: 377.76 MiB -- Worker memory limit: 476.84 MiB
distributed.worker - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker.html#memtrim for more information. -- Unmanaged memory: 377.76 MiB -- Worker memory limit: 476.84 MiB
distributed.worker - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker.html#memtrim for more information. -- Unmanaged memory: 377.76 MiB -- Worker memory limit: 476.84 MiB
distributed.worker - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker.html#memtrim for more information. -- Unmanaged memory: 377.76 MiB -- Worker memory limit: 476.84 MiB
distributed.worker - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker.html#memtrim for more information. -- Unmanaged memory: 377.76 MiB -- Worker memory limit: 476.84 MiB
distributed.worker - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker.html#memtrim for more information. -- Unmanaged memory: 377.76 MiB -- Worker memory limit: 476.84 MiB
distributed.worker - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker.html#memtrim for more information. -- Unmanaged memory: 377.76 MiB -- Worker memory limit: 476.84 MiB
distributed.worker - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker.html#memtrim for more information. -- Unmanaged memory: 377.76 MiB -- Worker memory limit: 476.84 MiB
distributed.worker - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker.html#memtrim for more information. -- Unmanaged memory: 377.76 MiB -- Worker memory limit: 476.84 MiB
distributed.worker - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker.html#memtrim for more information. -- Unmanaged memory: 377.76 MiB -- Worker memory limit: 476.84 MiB
distributed.worker - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker.html#memtrim for more information. -- Unmanaged memory: 377.76 MiB -- Worker memory limit: 476.84 MiB
distributed.worker - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker.html#memtrim for more information. -- Unmanaged memory: 377.76 MiB -- Worker memory limit: 476.84 MiB
distributed.worker - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker.html#memtrim for more information. -- Unmanaged memory: 377.76 MiB -- Worker memory limit: 476.84 MiB
distributed.worker - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker.html#memtrim for more information. -- Unmanaged memory: 377.76 MiB -- Worker memory limit: 476.84 MiB
distributed.worker - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker.html#memtrim for more information. -- Unmanaged memory: 377.76 MiB -- Worker memory limit: 476.84 MiB
distributed.worker - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker.html#memtrim for more information. -- Unmanaged memory: 377.76 MiB -- Worker memory limit: 476.84 MiB
distributed.worker - WARNING - Worker is at 80% memory usage. Pausing worker.  Process memory: 382.70 MiB -- Worker memory limit: 476.84 MiB
distributed.worker - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker.html#memtrim for more information. -- Unmanaged memory: 377.76 MiB -- Worker memory limit: 476.84 MiB
distributed.worker - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker.html#memtrim for more information. -- Unmanaged memory: 382.70 MiB -- Worker memory limit: 476.84 MiB
distributed.worker - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker.html#memtrim for more information. -- Unmanaged memory: 377.76 MiB -- Worker memory limit: 476.84 MiB
distributed.core - ERROR - Exception while handling op scatter
Traceback (most recent call last):
  File "/usr/share/miniconda3/envs/modin/lib/python3.8/site-packages/distributed/core.py", line 520, in handle_comm
    result = await result
  File "/usr/share/miniconda3/envs/modin/lib/python3.8/site-packages/distributed/scheduler.py", line 5847, in scatter
    await self.replicate(keys=keys, workers=workers, n=n)
  File "/usr/share/miniconda3/envs/modin/lib/python3.8/site-packages/distributed/scheduler.py", line 6644, in replicate
    for ws in random.sample(tuple(workers - ts._who_has), count):
  File "/usr/share/miniconda3/envs/modin/lib/python3.8/random.py", line 363, in sample
    raise ValueError("Sample larger than population or is negative")
ValueError: Sample larger than population or is negative
distributed.worker - WARNING - Worker is at 78% memory usage. Resuming worker. Process memory: 376.08 MiB -- Worker memory limit: 476.84 MiB
--------------------------- Captured stderr teardown ---------------------------
distributed.worker - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker.html#memtrim for more information. -- Unmanaged memory: 377.76 MiB -- Worker memory limit: 476.84 MiB
distributed.worker - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker.html#memtrim for more information. -- Unmanaged memory: 376.08 MiB -- Worker memory limit: 476.84 MiB
distributed.worker - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker.html#memtrim for more information. -- Unmanaged memory: 377.76 MiB -- Worker memory limit: 476.84 MiB
distributed.worker - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker.html#memtrim for more information. -- Unmanaged memory: 377.76 MiB -- Worker memory limit: 476.84 MiB
distributed.worker - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker.html#memtrim for more information. -- Unmanaged memory: 377.76 MiB -- Worker memory limit: 476.84 MiB
distributed.worker - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker.html#memtrim for more information. -- Unmanaged memory: 377.76 MiB -- Worker memory limit: 476.84 MiB
distributed.worker - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker.html#memtrim for more information. -- Unmanaged memory: 377.76 MiB -- Worker memory limit: 476.84 MiB
distributed.worker - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker.html#memtrim for more information. -- Unmanaged memory: 377.76 MiB -- Worker memory limit: 476.84 MiB
distributed.worker - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker.html#memtrim for more information. -- Unmanaged memory: 377.76 MiB -- Worker memory limit: 476.84 MiB
distributed.worker - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker.html#memtrim for more information. -- Unmanaged memory: 377.76 MiB -- Worker memory limit: 476.84 MiB
distributed.worker - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker.html#memtrim for more information. -- Unmanaged memory: 377.76 MiB -- Worker memory limit: 476.84 MiB
distributed.worker - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker.html#memtrim for more information. -- Unmanaged memory: 377.76 MiB -- Worker memory limit: 476.84 MiB
distributed.worker - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker.html#memtrim for more information. -- Unmanaged memory: 377.76 MiB -- Worker memory limit: 476.84 MiB
distributed.worker - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker.html#memtrim for more information. -- Unmanaged memory: 377.76 MiB -- Worker memory limit: 476.84 MiB
distributed.worker - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker.html#memtrim for more information. -- Unmanaged memory: 377.76 MiB -- Worker memory limit: 476.84 MiB
distributed.worker - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker.html#memtrim for more information. -- Unmanaged memory: 377.76 MiB -- Worker memory limit: 476.84 MiB
distributed.worker - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker.html#memtrim for more information. -- Unmanaged memory: 377.76 MiB -- Worker memory limit: 476.84 MiB
distributed.worker - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker.html#memtrim for more information. -- Unmanaged memory: 377.76 MiB -- Worker memory limit: 476.84 MiB
distributed.worker - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker.html#memtrim for more information. -- Unmanaged memory: 377.76 MiB -- Worker memory limit: 476.84 MiB
---------- coverage: platform linux, python 3.8.13-final-0 -----------
Coverage XML written to file coverage.xml
=========================== short test summary info ============================
FAILED modin/pandas/test/dataframe/test_default.py::test_pivot[None--None-float_nan_data]
=========== 1 failed, 487 passed, 1361 warnings in 380.33s (0:06:20) ===========
Error: Process completed with exit code 1.

What you expected to happen: above crash does not occur

Minimal Complete Verifiable Example: N/A, unsure how to set up relevant conditions

Anything else we need to know?:

Environment:

quasiben commented 2 years ago

Is it possible to upgrade your version of Dask ? The most recent version of Dask is 2022.7.0

Though it's worth noting that particular code path still exists: https://github.com/dask/distributed/blob/02f9c8f21adcda6026207942f14376ffeb0b4c62/distributed/scheduler.py#L5781-L5783

noloerino commented 2 years ago

I'm not sure if I'm able to bump the Dask version without breaking anything, but it's hard to tell if doing so would fix the problem. Like you said, the code path is still around, and more importantly, the failure seems to be very transient (as far as I know it's the first time it's appeared in our CI, and rerunning the test made the error go away).

gjoseph92 commented 2 years ago

There are plans to re-implement replicate using AMM, since it already has a number of issues: https://github.com/dask/distributed/issues/6578. We can add this one to the list. cc @crusaderky.

zmbc commented 8 months ago

I had this happen as well, but only after a very long sequence of homogenous tasks was processed successfully with lots of worker pausing on the cluster (which aligns with @mvashishtha's theory). So, I can confirm the race condition is still possible in version 2024.2.0, though I don't know how to reproduce it.