dask / dask-gateway

A multi-tenant server for securely deploying and managing Dask clusters.
https://gateway.dask.org/
BSD 3-Clause "New" or "Revised" License
137 stars 88 forks source link

Scheduler plugins created through dask-gateway don't have their close() method called #439

Open orf opened 3 years ago

orf commented 3 years ago

What happened:

Given a small SchedulerPlugin running on Kubernetes:

from dask_gateway import GatewayCluster
import dask.distributed

class SomePlugin(dask.distributed.SchedulerPlugin):
        async def close(self):
            print("Closing")

with GatewayCluster(...) as cluster:
    client = cluster.get_client()
    client..register_scheduler_plugin(SomePlugin())

The close() method is never called on the SchedulerPlugin. The start() method is, however.

What you expected to happen:

close() should be called

Minimal Complete Verifiable Example:

from dask_gateway import GatewayCluster
import dask.distributed

class SomePlugin(dask.distributed.SchedulerPlugin):
        async def close(self):
            print("Closing")

with GatewayCluster(...) as cluster:
    client = cluster.get_client()
    client..register_scheduler_plugin(SomePlugin())

Anything else we need to know?:

Environment:

jcrist commented 3 years ago

Hmmm, that's interesting. I don't see anything obvious that dask-gateway does that should prevent this. What versions of dask-gateway, dask, and distributed are you using on your scheduler node? Do the scheduler plugins successfully have their close methods called for other dask deployment backends (e.g. dask-kubernetes)?

orf commented 3 years ago

Unfortunately I'm not in a position to test with dask-kubernetes, but I think I've narrowed it down a bit.

When dask-gateway kills a pod, the code here (https://github.com/dask/distributed/blob/31afb54cf02141050ae1cc7cbd7c3919557637fe/distributed/cli/dask_scheduler.py#L203-L205) throws a CancelledError that's printed to stdout from somewhere.

We created our own SchedulerPlugin like so:

class SignalPlugin(dask.distributed.SchedulerPlugin):
    name = "signal-workaround-plugin"

    def start(self, scheduler: Scheduler):
        async def shutdown_signal(signal):
            print(f"Received signal {signal}. Closing plugins")
            try:
                await asyncio.gather(
                    *[plugin.close() for plugin in list(scheduler.plugins.values())]
                )
            except Exception as e:
                print("Error closing plugins:", e)

        install_signal_handlers(loop=scheduler.loop, cleanup=shutdown_signal)

which correctly invokes the plugin close methods. It seems like the default signal handlers just close the loop, rather than end up on a path that does a graceful shutdown of the scheduler. I'm fairly new to Dask internals, so I reported this issue here rather than on the distributed repo because I felt like it had to be related to how dask-gateway + k8s interacts with the scheduler. Please let me know if this would be better reported to distributed or another repo.

We're running:

But this issue also appeared on the previous version of dask and distributed.

We're seeing other issues that I think are caused by graceful shutdowns not happening on workers, I'm trying to triage them now and I'll update this issue with anything I find. Anything areas you think I could explore to diagnose this would be much appreciated - as I said before I'm fairly new to Dask so it's a bit daunting to work out how signals interact with the schedulers and workers.

orf commented 3 years ago

Possibly related, I can reproduce part of the exception when running some test cases locally using the test backend:

   1   │ distributed.preloading - INFO - Import preload module: dask_gateway.scheduler_preload
   2   │ distributed.scheduler - INFO - -----------------------------------------------
   3   │ distributed.preloading - INFO - Import preload module: dask_gateway.scheduler_preload
   4   │ distributed.http.proxy - INFO - To route to workers diagnostics web server please install jupyter-server-proxy: python -m pip install jupyter-server-proxy
   5   │ distributed.scheduler - INFO - -----------------------------------------------
   6   │ distributed.scheduler - INFO - Clear task state
   7   │ distributed.scheduler - INFO -   Scheduler at:     tcp://127.0.0.1:60778
   8   │ distributed.scheduler - INFO -   dashboard at:           127.0.0.1:60777
   9   │ distributed.preloading - INFO - Run preload setup click command: dask_gateway.scheduler_preload
  10   │ dask_gateway.scheduler_preload - INFO - Requesting scale to 3 workers from 0
  11   │ distributed.scheduler - INFO - End scheduler at 'tcp://127.0.0.1:60778'
  12   │ Traceback (most recent call last):
  13   │   File "/Users/tom/PycharmProjects/github/orf/dask-gateway/venv/bin/dask-scheduler", line 33, in <module>
  14   │     sys.exit(load_entry_point('distributed', 'console_scripts', 'dask-scheduler')())
  15   │   File "/Users/tom/PycharmProjects/github/orf/dask-gateway/venv/lib/python3.8/site-packages/distributed/cli/dask_scheduler.py", line 217, in go
  16   │     main()
  17   │   File "/Users/tom/PycharmProjects/github/orf/dask-gateway/venv/lib/python3.8/site-packages/click/core.py", line 1137, in __call__
  18   │     return self.main(*args, **kwargs)
  19   │   File "/Users/tom/PycharmProjects/github/orf/dask-gateway/venv/lib/python3.8/site-packages/click/core.py", line 1062, in main
  20   │     rv = self.invoke(ctx)
  21   │   File "/Users/tom/PycharmProjects/github/orf/dask-gateway/venv/lib/python3.8/site-packages/click/core.py", line 1404, in invoke
  22   │     return ctx.invoke(self.callback, **ctx.params)
  23   │   File "/Users/tom/PycharmProjects/github/orf/dask-gateway/venv/lib/python3.8/site-packages/click/core.py", line 763, in invoke
  24   │     return __callback(*args, **kwargs)
  25   │   File "/Users/tom/PycharmProjects/github/orf/dask-gateway/venv/lib/python3.8/site-packages/distributed/cli/dask_scheduler.py", line 208, in main
  26   │     loop.run_sync(run)
  27   │   File "/Users/tom/PycharmProjects/github/orf/dask-gateway/venv/lib/python3.8/site-packages/tornado/ioloop.py", line 529, in run_sync
  28   │     raise TimeoutError("Operation timed out after %s seconds" % timeout)
  29   │ tornado.util.TimeoutError: Operation timed out after None seconds
hyenal commented 1 year ago

I confirm that I could reproduce the error from time to time if I close my cluster in the following order:

client.close()
gateway_cluster.shutdown()

I solved this by closing in the opposite order (I suspect not doing client.close() works as well):

gateway_cluster.shutdown()
client.close()

I am using

dask == 2023.2.1
distributed==2023.2.1
dask_gateway==2022.6.1