Open fjetter opened 2 years ago
The issue is a race condition in a narrow time window after the worker established the scheduler connection but before the server base class finishes the start.
Specifically, the Worker
sets the status attribute to running in https://github.com/dask/distributed/blob/ef13425ff77a0ae3ec14e0cb3bc6c3a87c363dd6/distributed/worker.py#L1179
and the Server
base class sets it again in https://github.com/dask/distributed/blob/ef13425ff77a0ae3ec14e0cb3bc6c3a87c363dd6/distributed/core.py#L491
During this time, which is more or less one event loop tick, the close status can be reset by the server start s.t. subsequent Worker.close calls are not caught by the guard
This issue can be exacerbated if there are slow plugins around that require ticks to startup
The below is a reproducer. Interestingly, the Nanny.restart
API is not suffering from this problem since it does not rely on a done callback but instantiates the worker explicitly.
@pytest.mark.parametrize("api", ["restart", "kill"])
@gen_cluster(client=True, nthreads=[("", 1)], Worker=Nanny)
async def test_restart_stress(c, s, a, api):
async def keep_killing():
pool = await ConnectionPool()
try:
rpc = pool(a.address)
for _ in range(10):
try:
meth = getattr(rpc, api)
await meth(reason="scheduler-restart")
except OSError:
break
await asyncio.sleep(0.1)
finally:
await pool.close()
kill_task = asyncio.create_task(keep_killing())
await kill_task
assert a.status == Status.running
FWIW spawning multiple keep_killing
tasks is also fine because Nanny.instantiate
that's running behind Nanny.restart
is idempotent
When restarting a cluster of Nannies we expect every worker to be restarted and connect to the worker.
There appears to exist a race condition where the worker instead closes for good and tears down the Nanny with it.
This has been frequently observed in the coiled-runtime benchmarks, see https://github.com/coiled/coiled-runtime/issues/468
Some sample logs (I added line numbers to reference logs below)
What this translate to is
Nanny
a restart alreadyWorker.close(nanny=False)
for the first timestream-close
to the scheduler (TODO: verify if possible) such that the batched stream is abortedWorker.handle_stream
encounters an exception due to the closed streamWorker.close(nanny=True)
due to the severed connection. This should wait for the first close to terminate and do nothing. However, it initiates another close procedure, this time with the nanny which closes the server for goodWhat I do not understand is why the second close is not waiting for
Server.finished
in https://github.com/dask/distributed/blob/ef13425ff77a0ae3ec14e0cb3bc6c3a87c363dd6/distributed/worker.py#L1491-L1498 This can only happen if the status after the first close (L46) is altered. As far as I can tell, all status mutations trigger a log message (either on scheduler or worker side) but I couldn't find anything.The entire thing is over in slightly under 2s.