dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.57k stars 716 forks source link

Error involving cpu key in scheduler #3355

Open CMCDragonkai opened 4 years ago

CMCDragonkai commented 4 years ago

I see that this error has been reported in: #3147 as well.

This is the error I'm seeing from running the scheduler.

distributed.utils - ERROR - 'cpu'
Traceback (most recent call last):
  File "/nix/store/8zmsa97jhnns0p96ajqq20lan2hxbh7r-python3.7-distributed-2.3.0/lib/python3.7/site-packages/distributed/utils.py", line 657, in log_errors
    yield
  File "/nix/store/8zmsa97jhnns0p96ajqq20lan2hxbh7r-python3.7-distributed-2.3.0/lib/python3.7/site-packages/distributed/dashboard/scheduler.py", line 433, in update
    cpu = [int(ws.metrics["cpu"]) for ws in workers]

This is how to reproduce this...

This is the scheduler.py.

#!/usr/bin/env python3

import argparse
import asyncio
from dask.distributed import Scheduler

async def loop(host, port, dashboard_address):
    async with Scheduler(
        host=host, port=port, dashboard_address=dashboard_address
    ) as s:
        await s.finished()

def main():

    options_parser = argparse.ArgumentParser()
    options_parser.add_argument("--host")
    options_parser.add_argument("--port", type=int)
    options_parser.add_argument("--dashboard-address")

    options = options_parser.parse_args()

    asyncio.get_event_loop().run_until_complete(
        loop(
            host=options.host,
            port=options.port,
            dashboard_address=options.dashboard_address,
        )
    )

if __name__ == "__main__":
    main()

This is the worker.py

#!/usr/bin/env python3

import argparse
import asyncio
from dask.distributed import Scheduler

async def loop(host, port, dashboard_address):
    async with Scheduler(
        host=host, port=port, dashboard_address=dashboard_address
    ) as s:
        await s.finished()

def main():

    options_parser = argparse.ArgumentParser()
    options_parser.add_argument("--host")
    options_parser.add_argument("--port", type=int)
    options_parser.add_argument("--dashboard-address")

    options = options_parser.parse_args()

    asyncio.get_event_loop().run_until_complete(
        loop(
            host=options.host,
            port=options.port,
            dashboard_address=options.dashboard_address,
        )
    )

if __name__ == "__main__":
    main()
./scheduler.py --host 127.0.0.1 --port 3201 --dashboard-address :3202
./worker.py --scheduler-ip 127.0.0.1 --scheduler-port 3201 --name image-classifier-worker
./worker.py --scheduler-ip 127.0.0.1 --scheduler-port 3201 --name image-classifier-worker

On the second invocation of the worker I get:

distributed.nanny - INFO -         Start Nanny at: 'tcp://127.0.0.1:43955'
distributed.diskutils - INFO - Found stale lock file and directory '/home/cmcdragonkai/Projects/image-classifier/dask-worker-space/worker-xl3_qtz_', purging
distributed.worker - INFO -       Start worker at:      tcp://127.0.0.1:46821
distributed.worker - INFO -          Listening to:      tcp://127.0.0.1:46821
distributed.worker - INFO - Waiting to connect to:       tcp://127.0.0.1:3201
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -               Threads:                         16
distributed.worker - INFO -                Memory:                   33.72 GB
distributed.worker - INFO -       Local Directory: /home/cmcdragonkai/Projects/image-classifier/dask-worker-space/worker-xmi581os
distributed.worker - INFO - -------------------------------------------------
distributed.nanny - ERROR - Failed to start worker
Traceback (most recent call last):
  File "/nix/store/8zmsa97jhnns0p96ajqq20lan2hxbh7r-python3.7-distributed-2.3.0/lib/python3.7/site-packages/distributed/nanny.py", line 674, in run
    await worker
  File "/nix/store/8zmsa97jhnns0p96ajqq20lan2hxbh7r-python3.7-distributed-2.3.0/lib/python3.7/site-packages/distributed/worker.py", line 1016, in start
    await self._register_with_scheduler()
  File "/nix/store/8zmsa97jhnns0p96ajqq20lan2hxbh7r-python3.7-distributed-2.3.0/lib/python3.7/site-packages/distributed/worker.py", line 835, in _register_with_scheduler
    raise ValueError("Unexpected response from register: %r" % (response,))
ValueError: Unexpected response from register: {'status': 'error', 'message': 'name taken, image-classifier-worker', 'time': 1578361153.2849612}
distributed.comm.tcp - WARNING - Closing dangling stream in <TCP  local=tcp://127.0.0.1:46848 remote=tcp://127.0.0.1:3201>
distributed.nanny - INFO - Closing Nanny at 'tcp://127.0.0.1:43955'

But on the scheduler it starts having a failure regarding the cpu key.

CMCDragonkai commented 4 years ago

My distributed version is:

distributed==2.3.0
quasiben commented 4 years ago

Hmm, I haven't seen this error in some time. Would it be possible to upgrade to 2.9.1 ?

TomAugspurger commented 4 years ago

@CMCDragonkai were you able to try with a more recent version of distributed?

Timost commented 4 years ago

I'm hitting ~this~ a very similar issue also on dask 2.9.1. I have this setup:

I can reproduce this systematically doing this:

This puts the scheduler in a corrupted state where:

After that I can't recover the scheduler. I've tried scaling down the cluster, restarting the client, creating a new client from the dask-kubernetes cluster object. Nothing has worked so far.

Here is an example stack-trace:

Traceback (most recent call last):
  File "/root/.cache/pypoetry/virtualenvs/acquisition-uD-vDiZT-py3.6/lib/python3.6/site-packages/tornado/web.py", line 1592, in _execute
    result = yield result
  File "/root/.cache/pypoetry/virtualenvs/acquisition-uD-vDiZT-py3.6/lib/python3.6/site-packages/tornado/gen.py", line 1133, in run
    value = future.result()
  File "/root/.cache/pypoetry/virtualenvs/acquisition-uD-vDiZT-py3.6/lib/python3.6/site-packages/tornado/gen.py", line 1141, in run
    yielded = self.gen.throw(*exc_info)
  File "/usr/local/lib/python3.6/types.py", line 184, in throw
    return self.__wrapped.throw(tp, *rest)
  File "/root/.cache/pypoetry/virtualenvs/acquisition-uD-vDiZT-py3.6/lib/python3.6/site-packages/bokeh/server/views/doc_handler.py", line 56, in get
    session = yield self.get_session()
  File "/root/.cache/pypoetry/virtualenvs/acquisition-uD-vDiZT-py3.6/lib/python3.6/site-packages/tornado/gen.py", line 1133, in run
    value = future.result()
  File "/root/.cache/pypoetry/virtualenvs/acquisition-uD-vDiZT-py3.6/lib/python3.6/site-packages/tornado/gen.py", line 1141, in run
    yielded = self.gen.throw(*exc_info)
  File "/usr/local/lib/python3.6/types.py", line 184, in throw
    return self.__wrapped.throw(tp, *rest)
  File "/root/.cache/pypoetry/virtualenvs/acquisition-uD-vDiZT-py3.6/lib/python3.6/site-packages/bokeh/server/views/session_handler.py", line 79, in get_session
    session = yield self.application_context.create_session_if_needed(session_id, self.request)
  File "/root/.cache/pypoetry/virtualenvs/acquisition-uD-vDiZT-py3.6/lib/python3.6/site-packages/tornado/gen.py", line 1133, in run
    value = future.result()
  File "/root/.cache/pypoetry/virtualenvs/acquisition-uD-vDiZT-py3.6/lib/python3.6/site-packages/tornado/gen.py", line 1147, in run
    yielded = self.gen.send(value)
  File "/root/.cache/pypoetry/virtualenvs/acquisition-uD-vDiZT-py3.6/lib/python3.6/site-packages/bokeh/server/contexts.py", line 222, in create_session_if_needed
    self._application.initialize_document(doc)
  File "/root/.cache/pypoetry/virtualenvs/acquisition-uD-vDiZT-py3.6/lib/python3.6/site-packages/bokeh/application/application.py", line 178, in initialize_document
    h.modify_document(doc)
  File "/root/.cache/pypoetry/virtualenvs/acquisition-uD-vDiZT-py3.6/lib/python3.6/site-packages/bokeh/application/handlers/function.py", line 133, in modify_document
    self._func(doc)
  File "/root/.cache/pypoetry/virtualenvs/acquisition-uD-vDiZT-py3.6/lib/python3.6/site-packages/distributed/dashboard/components/scheduler.py", line 1748, in status_doc
    current_load.update()
  File "/root/.cache/pypoetry/virtualenvs/acquisition-uD-vDiZT-py3.6/lib/python3.6/site-packages/bokeh/core/property/validation.py", line 97, in func
    return input_function(*args, **kwargs)
  File "/root/.cache/pypoetry/virtualenvs/acquisition-uD-vDiZT-py3.6/lib/python3.6/site-packages/distributed/dashboard/components/scheduler.py", line 646, in update
    cpu = [int(ws.metrics["cpu"]) for ws in workers]
  File "/root/.cache/pypoetry/virtualenvs/acquisition-uD-vDiZT-py3.6/lib/python3.6/site-packages/distributed/dashboard/components/scheduler.py", line 646, in <listcomp>
    cpu = [int(ws.metrics["cpu"]) for ws in workers]
KeyError: 'cpu'
GenevieveBuckley commented 2 years ago

I just tried this with a newer version of Dask 2021-.09-0, and it looks like the minimal, reproducible example here needs to be updated slightly.

At the line:

./worker.py --scheduler-ip 127.0.0.1 --scheduler-port 3201 --name image-classifier-worker

...I get an error indicating the way we provide command line arguments has changed.

usage: worker.py [-h] [--host HOST] [--port PORT]
                 [--dashboard-address DASHBOARD_ADDRESS]
worker.py: error: unrecognized arguments: --scheduler-ip 127.0.0.1 --scheduler-port 3201 --name image-classifier-worker

So I tried changing it to this:

./worker.py --host 127.0.0.1 --port 3201

and then this (I don't think this is right, but it felt worth checking all the combinations of options, even if I didn't think they'd work):

./worker.py --dashboard-address 127.0.0.1 --port 3201

... which gave me errors saying the address is already in use

OSError: [Errno 98] Address already in use

I imagine it shouldn't be too difficult to update this example, I'm likely just missing something obvious. @quasiben do you have suggesionts?