dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.57k stars 717 forks source link

Scheduler deadlocks after asynchronous `client.who_has` call #5144

Open pfackeldey opened 3 years ago

pfackeldey commented 3 years ago

What happened:

Dear dask-distributed developers,

First of all, thank you for this wonderful project! We are using dask-distributed for our local HTCondor computing cluster. In our use-case we periodically kill and spawn dask-workers in HTCondorJobs, such that other HTCondorJobs from other users can slide in between our computing runs. We also need to work with heavy input, which we need to distribute to the dask-workers beforehand using client.scatter. Of course we want to replicate this as soon as new dask-worker are spawned. Thus we added an asynchronous periodic callback to the client's IOLoop, which takes care of this replication. Unfortunately we noticed that the client.who_has(..., asynchronous=True) call deadlocks our scheduler (unfortunately without a stack trace). Any connection to the scheduler results then in a timeout.

What you expected to happen:

We expected that we can add a asynchronous callback, which uses client.who_has(..., asynchronous=True), to the client's IOLoop without deadlocking the scheduler.

Minimal Complete Verifiable Example:

This is a minimal reproducible example, which shows the above-mentioned problem. Since it also happens on a LocalCluster the problem seems to be batch-system-agnostic.

# coding: utf-8

from tornado.ioloop import PeriodicCallback
from dask.distributed import LocalCluster, Client
import numpy as np

class Heavy:
    def __init__(self):
        self.state = np.arange(1000000)

    def __call__(self, arg):
        return np.sum(self.state * arg)

heavy = Heavy()

async def replicate() -> None:
    print("In callback")
    workers = client.ncores(asynchronous=True)
    print("Workers", await workers)
    avail = client.who_has([func], asynchronous=True)  # <--- This seems to deadlock the scheduler
    print("Avail", await avail)
    missing = set((await workers).keys()) - set((await avail)[func.key])
    print("Missing", missing)
    if missing:
        try:
            await client.replicate(func, asynchronous=True)
            print("replicate: success!")
        except:
            print("replicate: failed!")

if __name__ == "__main__":
    cluster = LocalCluster()
    client = Client(cluster)

    func = client.scatter(heavy, broadcast=True, hash=False, direct=None)

    pc = PeriodicCallback(replicate, callback_time=100)
    client.loop.add_callback(pc.start)

The output (only once!):

In callback
Workers {'tls://127.0.0.1:17693': 8, 'tls://127.0.0.1:17747': 8, 'tls://127.0.0.1:18651': 8, 'tls://127.0.0.1:20033': 8, 'tls://127.0.0.1:25061': 8, 'tls://127.0.0.1:4257': 8, 'tls://127.0.0.1:6413': 8, 'tls://127.0.0.1:9189': 8}

Afterwards the scheduler is stuck.

Anything else we need to know?:

-

Environment:

client.get_versions(check=True) does not throw an error and outputs:

 'client': {'host': {'python': '3.8.10.final.0',
   'python-bits': 64,
   'OS': 'Linux',
   'OS-release': '5.4.0-80-generic',
   'machine': 'x86_64',
   'processor': 'x86_64',
   'byteorder': 'little',
   'LC_ALL': 'en_US.UTF-8',
   'LANG': 'en_US.UTF-8'},
  'packages': {'python': '3.8.10.final.0',
   'dask': '2021.06.2',
   'distributed': '2021.06.2',
   'msgpack': '1.0.2',
   'cloudpickle': '1.6.0',
   'tornado': '6.2.dev1',
   'toolz': '0.11.1',
   'numpy': '1.19.5',
   'lz4': '3.1.3',
   'blosc': '1.10.4'}}}

Thank you very much in advance for your input and help!

Best, Peter

jrbourbeau commented 3 years ago

Thanks for raising an issue @pfackeldey. Looking at your example, client.who_has doesn't support an asynchronous keyword argument. Whether client operations are blocking or asynchronous is dictated by the asynchronous= keyword argument when constructing your Client object https://distributed.dask.org/en/latest/client.html#async-await-operation (the default value is asynchronous=False. If you need your client operations to be asynchronous, you should pass asynchronous=True when you create the Client instead of to individual methods

jrbourbeau commented 3 years ago

I should also add that active memory management in Dask is actively be worked on (xref https://github.com/dask/distributed/issues/4982), so this type of replica tracking should be much more transparent in the future

pfackeldey commented 3 years ago

Thank you for your fast reply @jrbourbeau !

The documentation link https://distributed.dask.org/en/latest/client.html#async-await-operation states in the second part:

If you want to reuse the same client in asynchronous and synchronous environments you can apply the asynchronous=True keyword at each method call.

Thus I expected the above mentioned code to work. Otherwise I am willing to help to update/remove this part of the documentation to your wishes so others won't run into the same misunderstanding/problem as me. What is your opinion here?

Also thank you very much for pointing to the ongoing work on the active memory management, I'll keep an eye on this!

Best, Peter