dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.57k stars 718 forks source link

cleanup exceptions when running client and workers with mpirun #6302

Open guziy opened 2 years ago

guziy commented 2 years ago

What happened: When program is completed exceptions are thrown.

RuntimeError: cannot schedule new futures after interpreter shutdown

Please see the log file: test.log

What you expected to happen: I expect scheduler and worker exit without exceptions.

Minimal Complete Verifiable Example: To run this example I use the following command:

mpirun -n 4 python test.py
# Put your MCVE code here
# Put your MCVE code here
import logging

from dask_mpi import initialize
from distributed import Client
import dask
from dask.distributed import progress
from distributed.worker import logger as worker_log
from distributed.batched import logger as batched_log

def square(x):
    return x ** 2

def main():
    dask.config.set({
        "distributed.worker.use-file-locking": True,
        "logging.distributed": "error",
    })
    worker_log.setLevel(logging.CRITICAL)
    batched_log.setLevel(logging.CRITICAL)

    initialize(nthreads=1, dashboard=False)

    with Client() as client:
        futures = client.map(square, range(1000))
        progress(futures)
        results = client.gather(futures)
        client.shutdown()

    print(results[:40])

if __name__ == '__main__':
    main()

Anything else we need to know?:

Environment:

Cluster Dump State:
guziy commented 2 years ago

If you want to disable all the logs, I found this way (just above the initialize call):

    logging.getLogger("distributed.worker").setLevel(logging.CRITICAL)                                                                                                                 
    logging.getLogger("distributed.core").setLevel(logging.CRITICAL)                                                                                                                   
    logging.getLogger("distributed.batched").setLevel(logging.CRITICAL)                                                                                                                
    logging.getLogger("distributed.scheduler").setLevel(logging.CRITICAL)

I guess the correct way would be to remove all handlers and add a file handler, in case someone needs more detailed logs.