dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.57k stars 718 forks source link

Broken WorkerPlugin can break entire cluster by freezing workers #8694

Closed Fogapod closed 2 months ago

Fogapod commented 3 months ago

Describe the issue:

Broken plugins can freeze workers. Any future connections are stalled by scheduler while workers are in some stuck state. Currently experiencing this with pickling errors.

These are my plugins and one of them triggers pickle error:

class SettingsPlugin(WorkerPlugin):
    async def setup(self, worker: Worker):
        worker.vascular_settings = ServiceSettings.from_env()

class LoggerPlugin(WorkerPlugin):
    async def setup(self, worker: Worker):
        worker.vascular_log = make_logger("worker", level=LogLevels.DEBUG)

async def register_plugins(client: Client):
    log.debug("register plugins")

    log.debug("unregistering existing plugins")
    # added as an attempt to force remove possibly broken plugins
    for maybe_existing in ("vascular_settings", "vascular_logger"):
        with suppress(ValueError):
            await client.unregister_worker_plugin(maybe_existing)

    log.debug("environ")
    await client.register_plugin(Environ(dict(os.environ)), name="vascular_environ")

    log.debug("settings")
    await client.register_plugin(SettingsPlugin(), name="vascular_settings")

    log.debug("logger")
    await client.register_plugin(LoggerPlugin(), name="vascular_logger")

    log.debug("loaded plugins")

This code worked before i moved files around which caused pickle to error. Worker logs:

2024-06-12 16:33:37,332 - distributed.nanny - INFO - Starting Nanny plugin vascular_environ
2024-06-12 16:33:37,333 - distributed.nanny - INFO -         Start Nanny at: 'tcp://10.244.6.157:43163'
2024-06-12 16:33:39,117 - distributed.worker - INFO -       Start worker at:   tcp://10.244.6.157:38339
2024-06-12 16:33:39,117 - distributed.worker - INFO -          Listening to:   tcp://10.244.6.157:38339
2024-06-12 16:33:39,117 - distributed.worker - INFO -           Worker name: dask-primary-default-worker-b27bb8286c
2024-06-12 16:33:39,118 - distributed.worker - INFO -          dashboard at:          10.244.6.157:8788
2024-06-12 16:33:39,118 - distributed.worker - INFO - Waiting to connect to: tcp://dask-primary-scheduler.dev-local.svc.cluster.local:8786
2024-06-12 16:33:39,118 - distributed.worker - INFO - -------------------------------------------------
2024-06-12 16:33:39,118 - distributed.worker - INFO -               Threads:                         32
2024-06-12 16:33:39,118 - distributed.worker - INFO -                Memory:                  31.25 GiB
2024-06-12 16:33:39,118 - distributed.worker - INFO -       Local Directory: /tmp/dask-scratch-space/worker-v00dz10h
2024-06-12 16:33:39,118 - distributed.worker - INFO - -------------------------------------------------
2024-06-12 16:33:40,453 - distributed.worker - INFO - Starting Worker plugin shuffle
2024-06-12 16:33:40,492 - distributed.worker - INFO - Starting Worker plugin vascular_logger
2024-06-12 16:33:40,494 - distributed.protocol.pickle - INFO - Failed to deserialize b'\x80\x05\x956\x00\x00\x00\x00\x00\x00\x00\x8c\x1cbackend.vascular.src.plugins\x94\x8c\x0eSettingsPlugin\x94\x93\x94)\x81\x94.'
Traceback (most recent call last):
  File "/opt/backend/dask/bin.runfiles/rules_python~0.26.0~pip~pypi_311_distributed/site-packages/distributed/protocol/pickle.py", line 96, in loads
    return pickle.loads(x)
           ^^^^^^^^^^^^^^^
ModuleNotFoundError: No module named 'backend.vascular'
2024-06-12 16:33:40,495 - distributed.worker - ERROR - No module named 'backend.vascular'
Traceback (most recent call last):
  File "/opt/backend/dask/bin.runfiles/rules_python~0.26.0~pip~pypi_311_distributed/site-packages/distributed/utils.py", line 832, in wrapper
    return await func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/backend/dask/bin.runfiles/rules_python~0.26.0~pip~pypi_311_distributed/site-packages/distributed/worker.py", line 1854, in plugin_add
    plugin = pickle.loads(plugin)
             ^^^^^^^^^^^^^^^^^^^^
  File "/opt/backend/dask/bin.runfiles/rules_python~0.26.0~pip~pypi_311_distributed/site-packages/distributed/protocol/pickle.py", line 96, in loads
    return pickle.loads(x)
           ^^^^^^^^^^^^^^^
ModuleNotFoundError: No module named 'backend.vascular'
2024-06-12 16:33:40,496 - distributed.worker - INFO - Stopping worker at tcp://10.244.6.157:38339. Reason: failure-to-start-<class 'ModuleNotFoundError'>

Worker hangs indefinitely after this line and scheduler does not do anything about it. Increasing worker count just hangs all of them at once.

scheduler logs during this and my attempt to unregister plugins:

2024-06-12 16:33:40,451 - distributed.scheduler - INFO - Register worker <WorkerState 'tcp://10.244.6.157:38339', name: dask-primary-default-worker-b27bb8286c, status: init, memory: 0, processing: 0>
2024-06-12 16:33:40,453 - distributed.scheduler - INFO - Starting worker compute stream, tcp://10.244.6.157:38339
2024-06-12 16:33:40,453 - distributed.core - INFO - Starting established connection to tcp://10.244.6.157:33144
2024-06-12 16:41:49,742 - distributed.scheduler - WARNING - Worker failed to heartbeat for 489s; attempting restart: <WorkerState 'tcp://10.244.6.157:38339', name: dask-primary-default-worker-b27bb8286c, status: closing, memory: 0, processing: 0>
2024-06-12 16:41:49,743 - distributed.scheduler - INFO - Restarting 1 workers: ['tcp://10.244.6.157:38339'] (stimulus_id='check-worker-ttl-1718210509.7429218'
2024-06-12 16:42:19,743 - distributed.scheduler - ERROR - Workers ['tcp://10.244.6.157:38339'] did not shut down within 30s; force closing
2024-06-12 16:42:19,743 - distributed.scheduler - INFO - Remove worker <WorkerState 'tcp://10.244.6.157:38339', name: dask-primary-default-worker-b27bb8286c, status: closing, memory: 0, processing: 0> (stimulus_id='check-worker-ttl-1718210509.7429218')
2024-06-12 16:42:19,743 - distributed.scheduler - INFO - Lost all workers
2024-06-12 16:42:19,745 - distributed.scheduler - ERROR - 1/1 nanny worker(s) did not shut down within 30s: {'tcp://10.244.6.157:38339'}
Traceback (most recent call last):
  File "/opt/backend/dask/bin.runfiles/rules_python~0.26.0~pip~pypi_311_distributed/site-packages/distributed/utils.py", line 832, in wrapper
    return await func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/backend/dask/bin.runfiles/rules_python~0.26.0~pip~pypi_311_distributed/site-packages/distributed/scheduler.py", line 6377, in restart_workers
    raise TimeoutError(
TimeoutError: 1/1 nanny worker(s) did not shut down within 30s: {'tcp://10.244.6.157:38339'}
2024-06-12 16:42:19,746 - distributed.scheduler - ERROR - 1/1 nanny worker(s) did not shut down within 30s: {'tcp://10.244.6.157:38339'}
Traceback (most recent call last):
  File "/opt/backend/dask/bin.runfiles/rules_python~0.26.0~pip~pypi_311_distributed/site-packages/distributed/utils.py", line 832, in wrapper
    return await func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/backend/dask/bin.runfiles/rules_python~0.26.0~pip~pypi_311_distributed/site-packages/distributed/scheduler.py", line 8432, in check_worker_ttl
    await self.restart_workers(
  File "/opt/backend/dask/bin.runfiles/rules_python~0.26.0~pip~pypi_311_distributed/site-packages/distributed/utils.py", line 832, in wrapper
    return await func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/backend/dask/bin.runfiles/rules_python~0.26.0~pip~pypi_311_distributed/site-packages/distributed/scheduler.py", line 6377, in restart_workers
    raise TimeoutError(
TimeoutError: 1/1 nanny worker(s) did not shut down within 30s: {'tcp://10.244.6.157:38339'}
2024-06-12 16:42:19,746 - tornado.application - ERROR - Exception in callback <bound method Scheduler.check_worker_ttl of <Scheduler 'tcp://10.244.6.156:8786', workers: 0, cores: 0, tasks: 0>>
Traceback (most recent call last):
  File "/opt/backend/dask/bin.runfiles/rules_python~0.26.0~pip~pypi_311_tornado/site-packages/tornado/ioloop.py", line 939, in _run
    await val
  File "/opt/backend/dask/bin.runfiles/rules_python~0.26.0~pip~pypi_311_distributed/site-packages/distributed/utils.py", line 832, in wrapper
    return await func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/backend/dask/bin.runfiles/rules_python~0.26.0~pip~pypi_311_distributed/site-packages/distributed/scheduler.py", line 8432, in check_worker_ttl
    await self.restart_workers(
  File "/opt/backend/dask/bin.runfiles/rules_python~0.26.0~pip~pypi_311_distributed/site-packages/distributed/utils.py", line 832, in wrapper
    return await func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/backend/dask/bin.runfiles/rules_python~0.26.0~pip~pypi_311_distributed/site-packages/distributed/scheduler.py", line 6377, in restart_workers
    raise TimeoutError(
TimeoutError: 1/1 nanny worker(s) did not shut down within 30s: {'tcp://10.244.6.157:38339'}
2024-06-12 16:44:05,160 - distributed.scheduler - INFO - Receive client connection: Client-fafb2a53-28da-11ef-801e-791e012ee4e6
2024-06-12 16:44:05,162 - distributed.core - INFO - Starting established connection to tcp://10.244.6.149:55272
2024-06-12 16:44:05,210 - distributed.scheduler - INFO - Registering Nanny plugin vascular_environ
2024-06-12 16:44:05,210 - distributed.scheduler - INFO - Waiting for Nannies to start {'tcp://10.244.6.157:43163'}

Am I using plugin system wrong? Is there a way to make this system more durable so that newer clients can at least replace broken plugins and recover workers?

Minimal Complete Verifiable Example:

# don't yet have a complete example

Anything else we need to know?:

Code hangs at environ plugin loading indefinitely. This can be solved by a) running client again so that it unregisters plugins then killing worker pod manually b) nuking and redeploying dask cluster

Environment:

fjetter commented 3 months ago

Can you try to assemble a minimal reproducer?

The logs suggest that everything is working as expected

  1. Plugin fails
  2. This prohibits starting the worker
  3. Worker is closing again
Fogapod commented 2 months ago

Looks like this is working as intended