dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.55k stars 712 forks source link

`Scheduler`/`Worker` threads hang when calling `sys.exit()` #8644

Open jacobtomlinson opened 1 month ago

jacobtomlinson commented 1 month ago

Describe the issue:

When using the Scheduler or Worker class to start cluster components if the program is exited with sys.exit() (as is done in dask-mpi) the Python process hangs, likely due to a background thread holding the process open.

Minimal Complete Verifiable Example:

import sys
from distributed import Client, Scheduler
from distributed.utils import LoopRunner

async def main():
    async with Scheduler() as scheduler:
        async with Client(scheduler.address, asynchronous=True) as client:
            await client.shutdown()
    print("Done, exiting")
    sys.exit()  # Hangs at this line, comment this out and the program exits as expected

loop_runner = LoopRunner(loop=None, asynchronous=False)
loop_runner.run_sync(main)

I tried to strip things down as far as possible to still reproduce the issue, but I note this doesn't happen when using asyncio.run(main) instead of the LoopRunner that is commonly used in distributed.

Anything else we need to know?:

Environment:

jacobtomlinson commented 1 month ago

I found a workaround for this behaviour using a signal to defer the shutdown. This example does not hang.

+import os
+import signal
 import sys
 from distributed import Client, Scheduler
 from distributed.utils import LoopRunner

 async def main():
     async with Scheduler() as scheduler:
         async with Client(scheduler.address, asynchronous=True) as client:
             await client.shutdown()
     print("Done, exiting")
-    sys.exit()  # Hangs at this line, comment this out and the program exits as expected
+    os.kill(os.getpid(), signal.SIGINT)  # Shutdown using a signal instead

+signal.signal(signal.SIGINT, lambda *_: sys.exit())  # Exit gracefully on signal
 loop_runner = LoopRunner(loop=None, asynchronous=False)
 loop_runner.run_sync(main)