dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.58k stars 720 forks source link

Negative occupancy causes state machine corruption #7923

Open hendrikmakait opened 1 year ago

hendrikmakait commented 1 year ago

For some reason, WorkerState.network_occ can drop below zero and cause the state machine to corrupt.

Reproducer:

import coiled 
import dask
import dask.array as da
cluster = coiled.Cluster(
    n_workers=10,
    worker_vm_types=["m6i.large"],
    worker_disk_size=128
)
client = cluster.get_client()

with dask.config.set({"array.rechunk.method": "p2p", "optimization.fuse.active": False}):
    rng = da.random.default_rng()
    x = rng.random((100000, 100000))
    x.rechunk((50000, 20)).rechunk((20, 50000)).sum().compute()

Logs:

distributed.scheduler - ERROR - Error transitioning "('rechunk-p2p-677393017a1df2b9929798754cba6a0e', 1, 2128)" from 'processing' to 'released'
Traceback (most recent call last):
  File "/opt/coiled/env/lib/python3.10/site-packages/distributed/scheduler.py", line 1911, in _transition
    recommendations, client_msgs, worker_msgs = func(
  File "/opt/coiled/env/lib/python3.10/site-packages/distributed/scheduler.py", line 2563, in transition_processing_released
    ws = self._exit_processing_common(ts)
  File "/opt/coiled/env/lib/python3.10/site-packages/distributed/scheduler.py", line 3178, in _exit_processing_common
    self.check_idle_saturated(ws)
  File "/opt/coiled/env/lib/python3.10/site-packages/distributed/scheduler.py", line 2920, in check_idle_saturated
    occ = ws.occupancy
  File "/opt/coiled/env/lib/python3.10/site-packages/distributed/scheduler.py", line 820, in occupancy
    return self._occupancy_cache or self.scheduler._calc_occupancy(
  File "/opt/coiled/env/lib/python3.10/site-packages/distributed/scheduler.py", line 1856, in _calc_occupancy
    assert occ >= 0, (res, network_occ, self.bandwidth)
AssertionError: (0.0, -4, 140927311.7922114)

because we do not properly release the task, this in turn seems to cause

Task exception was never retrieved
future: <Task finished name='Task-14887' coro=<Server._handle_comm() done, defined at /opt/coiled/env/lib/python3.10/site-packages/distributed/core.py:830> exception=AssertionError()>
Traceback (most recent call last):
  File "/opt/coiled/env/lib/python3.10/site-packages/distributed/core.py", line 924, in _handle_comm
    result = await result
  File "/opt/coiled/env/lib/python3.10/site-packages/distributed/utils.py", line 754, in wrapper
    return await func(*args, **kwargs)
  File "/opt/coiled/env/lib/python3.10/site-packages/distributed/scheduler.py", line 4303, in add_worker
    await self.handle_worker(comm, address)
  File "/opt/coiled/env/lib/python3.10/site-packages/distributed/scheduler.py", line 5669, in handle_worker
    await self.handle_stream(comm=comm, extra={"worker": worker})
  File "/opt/coiled/env/lib/python3.10/site-packages/distributed/core.py", line 1008, in handle_stream
    handler(**merge(extra, msg))
  File "/opt/coiled/env/lib/python3.10/site-packages/distributed/scheduler.py", line 5540, in handle_task_erred
    self._transitions(recommendations, client_msgs, worker_msgs, stimulus_id)
  File "/opt/coiled/env/lib/python3.10/site-packages/distributed/scheduler.py", line 2015, in _transitions
    new_recs, new_cmsgs, new_wmsgs = self._transition(key, finish, stimulus_id)
  File "/opt/coiled/env/lib/python3.10/site-packages/distributed/scheduler.py", line 1911, in _transition
    recommendations, client_msgs, worker_msgs = func(
  File "/opt/coiled/env/lib/python3.10/site-packages/distributed/scheduler.py", line 2563, in transition_processing_released
    ws = self._exit_processing_common(ts)
  File "/opt/coiled/env/lib/python3.10/site-packages/distributed/scheduler.py", line 3171, in _exit_processing_common
    assert ws
AssertionError

and on the worker:

tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOMainLoop object at 0x7fe6a21d9780>>, <Task finished name='Task-9' coro=<Worker.handle_scheduler() done, defined at /opt/coiled/env/lib/python3.10/site-packages/distributed/worker.py:206> exception=RuntimeError('Encountered invalid state')>)
Traceback (most recent call last):
  File "/opt/coiled/env/lib/python3.10/site-packages/tornado/ioloop.py", line 740, in _run_callback
    ret = callback()
  File "/opt/coiled/env/lib/python3.10/site-packages/tornado/ioloop.py", line 764, in _discard_future_result
    future.result()
  File "/opt/coiled/env/lib/python3.10/site-packages/distributed/worker.py", line 209, in wrapper
    return await method(self, *args, **kwargs)  # type: ignore
  File "/opt/coiled/env/lib/python3.10/site-packages/distributed/worker.py", line 1294, in handle_scheduler
    await self.handle_stream(comm)
  File "/opt/coiled/env/lib/python3.10/site-packages/distributed/core.py", line 1008, in handle_stream
    handler(**merge(extra, msg))
  File "/opt/coiled/env/lib/python3.10/site-packages/distributed/worker.py", line 1909, in _
    self.handle_stimulus(event)
  File "/opt/coiled/env/lib/python3.10/site-packages/distributed/worker.py", line 222, in wrapper
    return method(self, *args, **kwargs)
  File "/opt/coiled/env/lib/python3.10/site-packages/distributed/worker.py", line 1924, in handle_stimulus
    super().handle_stimulus(*stims)
  File "/opt/coiled/env/lib/python3.10/site-packages/distributed/worker_state_machine.py", line 3718, in handle_stimulus
    instructions = self.state.handle_stimulus(*stims)
  File "/opt/coiled/env/lib/python3.10/site-packages/distributed/worker_state_machine.py", line 1360, in handle_stimulus
    recs, instr = self._handle_event(stim)
  File "/opt/coiled/env/lib/python3.10/functools.py", line 926, in _method
    return method.__get__(obj, cls)(*args, **kwargs)
  File "/opt/coiled/env/lib/python3.10/site-packages/distributed/worker_state_machine.py", line 2799, in _handle_remove_replicas
    raise RuntimeError("Encountered invalid state")  # pragma: no cover

XREF: #7538

hendrikmakait commented 1 year ago

The negative occupancy is caused by the barrier changing it's size, which causes us to subtract more from network_occ than we originally added:

distributed.scheduler - ERROR - ('shuffle-barrier-98dccb6094b5f2a50718c2178e4fe090', 24, 28)
Traceback (most recent call last):
  File "/opt/coiled/env/lib/python3.10/site-packages/distributed/scheduler.py", line 821, in add_replica
    assert nbytes == self.needs_what[ts][1], (
AssertionError: ('shuffle-barrier-98dccb6094b5f2a50718c2178e4fe090', 24, 28)

where 24 is the original size and 28 the size we try to subtract.

Also TIL: sizeof(0) == 24 and sizeof(28) == 28

FlorianBury commented 1 year ago

Hi, I keep getting a similar negative occupancy issue

  File "<...>/site-packages/distributed/scheduler.py", line 1818, in _calc_occupancy
    assert occ >= 0, occ
AssertionError: -7191.262672288433

Has this been fixed at some point ? I currently have version 2023.5.1

hendrikmakait commented 1 year ago

@FlorianBury, this has not been fixed but been circumvented in the original problem. Would you have a minimal reproducer for your problem that can help us investigate?