dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.57k stars 718 forks source link

RuntimeError: cannot schedule new futures after shutdown #6846

Open graingert opened 2 years ago

graingert commented 2 years ago

it's possible to get a "cannot schedule new futures after shutdown" out of getaddrinfo when running in a thread.

--- Logging error ---
Traceback (most recent call last):
  File "/home/julia/distributed/distributed/comm/tcp.py", line 223, in read
    frames_nbytes = await stream.read_bytes(fmt_size)
tornado.iostream.StreamClosedError: Stream is closed

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/julia/distributed/distributed/client.py", line 1392, in _handle_report
    msgs = await self.scheduler_comm.comm.read()
  File "/home/julia/distributed/distributed/comm/tcp.py", line 239, in read
    convert_stream_closed_error(self, e)
  File "/home/julia/distributed/distributed/comm/tcp.py", line 144, in convert_stream_closed_error
    raise CommClosedError(f"in {obj}: {exc}") from exc
distributed.comm.core.CommClosedError: in <TCP (closed) Client->Scheduler local=tcp://127.0.0.1:43050 remote=tcp://127.0.0.1:44163>: Stream is closed

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/julia/distributed/distributed/utils.py", line 778, in wrapper
    return await func(*args, **kwargs)
  File "/home/julia/distributed/distributed/client.py", line 1211, in _reconnect
    await self._ensure_connected(timeout=timeout)
  File "/home/julia/distributed/distributed/client.py", line 1241, in _ensure_connected
    comm = await connect(
  File "/home/julia/distributed/distributed/comm/core.py", line 291, in connect
    comm = await asyncio.wait_for(
  File "/home/julia/conda/envs/dask-dev/lib/python3.9/asyncio/tasks.py", line 479, in wait_for
    return fut.result()
  File "/home/julia/distributed/distributed/comm/tcp.py", line 449, in connect
    stream = await self.client.connect(
  File "/home/julia/conda/envs/dask-dev/lib/python3.9/site-packages/tornado/tcpclient.py", line 265, in connect
    addrinfo = await self.resolver.resolve(host, port, af)
  File "/home/julia/distributed/distributed/comm/tcp.py", line 434, in resolve
    for fam, _, _, _, address in await asyncio.get_running_loop().getaddrinfo(
  File "/home/julia/conda/envs/dask-dev/lib/python3.9/asyncio/base_events.py", line 856, in getaddrinfo
    return await self.run_in_executor(
  File "/home/julia/conda/envs/dask-dev/lib/python3.9/asyncio/base_events.py", line 814, in run_in_executor
    executor.submit(func, *args), loop=self)
  File "/home/julia/conda/envs/dask-dev/lib/python3.9/concurrent/futures/thread.py", line 167, in submit
    raise RuntimeError('cannot schedule new futures after shutdown')
RuntimeError: cannot schedule new futures after shutdown

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/julia/conda/envs/dask-dev/lib/python3.9/logging/__init__.py", line 1086, in emit
    stream.write(msg + self.terminator)
ValueError: I/O operation on closed file.
Call stack:
  File "/home/julia/conda/envs/dask-dev/lib/python3.9/threading.py", line 930, in _bootstrap
    self._bootstrap_inner()
  File "/home/julia/conda/envs/dask-dev/lib/python3.9/threading.py", line 973, in _bootstrap_inner
    self.run()
  File "/home/julia/conda/envs/dask-dev/lib/python3.9/threading.py", line 910, in run
    self._target(*self._args, **self._kwargs)
  File "/home/julia/distributed/distributed/utils.py", line 485, in run_loop
    loop.start()
  File "/home/julia/conda/envs/dask-dev/lib/python3.9/site-packages/tornado/platform/asyncio.py", line 199, in start
    self.asyncio_loop.run_forever()
  File "/home/julia/conda/envs/dask-dev/lib/python3.9/asyncio/base_events.py", line 596, in run_forever
    self._run_once()
  File "/home/julia/conda/envs/dask-dev/lib/python3.9/asyncio/base_events.py", line 1890, in _run_once
    handle._run()
  File "/home/julia/conda/envs/dask-dev/lib/python3.9/asyncio/events.py", line 80, in _run
    self._context.run(self._callback, *self._args)
  File "/home/julia/distributed/distributed/utils.py", line 778, in wrapper
    return await func(*args, **kwargs)
  File "/home/julia/distributed/distributed/client.py", line 1400, in _handle_report
    await self._reconnect()
  File "/home/julia/distributed/distributed/utils.py", line 778, in wrapper
    return await func(*args, **kwargs)
  File "/home/julia/distributed/distributed/utils.py", line 804, in __exit__
    logger.exception(exc_value)
Message: RuntimeError('cannot schedule new futures after shutdown')
Arguments: ()
cannot schedule new futures after shutdown
Traceback (most recent call last):
  File "/home/julia/distributed/distributed/comm/tcp.py", line 223, in read
    frames_nbytes = await stream.read_bytes(fmt_size)
tornado.iostream.StreamClosedError: Stream is closed

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/julia/distributed/distributed/client.py", line 1392, in _handle_report
    msgs = await self.scheduler_comm.comm.read()
  File "/home/julia/distributed/distributed/comm/tcp.py", line 239, in read
    convert_stream_closed_error(self, e)
  File "/home/julia/distributed/distributed/comm/tcp.py", line 144, in convert_stream_closed_error
    raise CommClosedError(f"in {obj}: {exc}") from exc
distributed.comm.core.CommClosedError: in <TCP (closed) Client->Scheduler local=tcp://127.0.0.1:43050 remote=tcp://127.0.0.1:44163>: Stream is closed

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/julia/distributed/distributed/utils.py", line 778, in wrapper
    return await func(*args, **kwargs)
  File "/home/julia/distributed/distributed/client.py", line 1400, in _handle_report
    await self._reconnect()
  File "/home/julia/distributed/distributed/utils.py", line 778, in wrapper
    return await func(*args, **kwargs)
  File "/home/julia/distributed/distributed/client.py", line 1211, in _reconnect
    await self._ensure_connected(timeout=timeout)
  File "/home/julia/distributed/distributed/client.py", line 1241, in _ensure_connected
    comm = await connect(
  File "/home/julia/distributed/distributed/comm/core.py", line 291, in connect
    comm = await asyncio.wait_for(
  File "/home/julia/conda/envs/dask-dev/lib/python3.9/asyncio/tasks.py", line 479, in wait_for
    return fut.result()
  File "/home/julia/distributed/distributed/comm/tcp.py", line 449, in connect
    stream = await self.client.connect(
  File "/home/julia/conda/envs/dask-dev/lib/python3.9/site-packages/tornado/tcpclient.py", line 265, in connect
    addrinfo = await self.resolver.resolve(host, port, af)
  File "/home/julia/distributed/distributed/comm/tcp.py", line 434, in resolve
    for fam, _, _, _, address in await asyncio.get_running_loop().getaddrinfo(
  File "/home/julia/conda/envs/dask-dev/lib/python3.9/asyncio/base_events.py", line 856, in getaddrinfo
    return await self.run_in_executor(
  File "/home/julia/conda/envs/dask-dev/lib/python3.9/asyncio/base_events.py", line 814, in run_in_executor
    executor.submit(func, *args), loop=self)
  File "/home/julia/conda/envs/dask-dev/lib/python3.9/concurrent/futures/thread.py", line 167, in submit
    raise RuntimeError('cannot schedule new futures after shutdown')
RuntimeError: cannot schedule new futures after shutdown
--- Logging error ---
Traceback (most recent call last):
  File "/home/julia/distributed/distributed/comm/tcp.py", line 223, in read
    frames_nbytes = await stream.read_bytes(fmt_size)
tornado.iostream.StreamClosedError: Stream is closed

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/julia/distributed/distributed/client.py", line 1392, in _handle_report
    msgs = await self.scheduler_comm.comm.read()
  File "/home/julia/distributed/distributed/comm/tcp.py", line 239, in read
    convert_stream_closed_error(self, e)
  File "/home/julia/distributed/distributed/comm/tcp.py", line 144, in convert_stream_closed_error
    raise CommClosedError(f"in {obj}: {exc}") from exc
distributed.comm.core.CommClosedError: in <TCP (closed) Client->Scheduler local=tcp://127.0.0.1:43050 remote=tcp://127.0.0.1:44163>: Stream is closed

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/julia/distributed/distributed/client.py", line 1521, in _close
    await asyncio.wait_for(asyncio.shield(handle_report_task), 0.1)
  File "/home/julia/conda/envs/dask-dev/lib/python3.9/asyncio/tasks.py", line 479, in wait_for
    return fut.result()
  File "/home/julia/distributed/distributed/utils.py", line 778, in wrapper
    return await func(*args, **kwargs)
  File "/home/julia/distributed/distributed/client.py", line 1400, in _handle_report
    await self._reconnect()
  File "/home/julia/distributed/distributed/utils.py", line 778, in wrapper
    return await func(*args, **kwargs)
  File "/home/julia/distributed/distributed/client.py", line 1211, in _reconnect
    await self._ensure_connected(timeout=timeout)
  File "/home/julia/distributed/distributed/client.py", line 1241, in _ensure_connected
    comm = await connect(
  File "/home/julia/distributed/distributed/comm/core.py", line 291, in connect
    comm = await asyncio.wait_for(
  File "/home/julia/conda/envs/dask-dev/lib/python3.9/asyncio/tasks.py", line 479, in wait_for
    return fut.result()
  File "/home/julia/distributed/distributed/comm/tcp.py", line 449, in connect
    stream = await self.client.connect(
  File "/home/julia/conda/envs/dask-dev/lib/python3.9/site-packages/tornado/tcpclient.py", line 265, in connect
    addrinfo = await self.resolver.resolve(host, port, af)
  File "/home/julia/distributed/distributed/comm/tcp.py", line 434, in resolve
    for fam, _, _, _, address in await asyncio.get_running_loop().getaddrinfo(
  File "/home/julia/conda/envs/dask-dev/lib/python3.9/asyncio/base_events.py", line 856, in getaddrinfo
    return await self.run_in_executor(
  File "/home/julia/conda/envs/dask-dev/lib/python3.9/asyncio/base_events.py", line 814, in run_in_executor
    executor.submit(func, *args), loop=self)
  File "/home/julia/conda/envs/dask-dev/lib/python3.9/concurrent/futures/thread.py", line 167, in submit
    raise RuntimeError('cannot schedule new futures after shutdown')
RuntimeError: cannot schedule new futures after shutdown

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/julia/conda/envs/dask-dev/lib/python3.9/logging/__init__.py", line 1086, in emit
    stream.write(msg + self.terminator)
ValueError: I/O operation on closed file.
Call stack:
  File "/home/julia/conda/envs/dask-dev/lib/python3.9/threading.py", line 930, in _bootstrap
    self._bootstrap_inner()
  File "/home/julia/conda/envs/dask-dev/lib/python3.9/threading.py", line 973, in _bootstrap_inner
    self.run()
  File "/home/julia/conda/envs/dask-dev/lib/python3.9/threading.py", line 910, in run
    self._target(*self._args, **self._kwargs)
  File "/home/julia/distributed/distributed/utils.py", line 485, in run_loop
    loop.start()
  File "/home/julia/conda/envs/dask-dev/lib/python3.9/site-packages/tornado/platform/asyncio.py", line 199, in start
    self.asyncio_loop.run_forever()
  File "/home/julia/conda/envs/dask-dev/lib/python3.9/asyncio/base_events.py", line 596, in run_forever
    self._run_once()
  File "/home/julia/conda/envs/dask-dev/lib/python3.9/asyncio/base_events.py", line 1890, in _run_once
    handle._run()
  File "/home/julia/conda/envs/dask-dev/lib/python3.9/asyncio/events.py", line 80, in _run
    self._context.run(self._callback, *self._args)
  File "/home/julia/distributed/distributed/client.py", line 1554, in _close
    self.scheduler = None
  File "/home/julia/distributed/distributed/utils.py", line 804, in __exit__
    logger.exception(exc_value)
Message: RuntimeError('cannot schedule new futures after shutdown')
Arguments: ()
--- Logging error ---
Traceback (most recent call last):
  File "/home/julia/distributed/distributed/comm/tcp.py", line 223, in read
    frames_nbytes = await stream.read_bytes(fmt_size)
tornado.iostream.StreamClosedError: Stream is closed

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/julia/distributed/distributed/client.py", line 1392, in _handle_report
    msgs = await self.scheduler_comm.comm.read()
  File "/home/julia/distributed/distributed/comm/tcp.py", line 239, in read
    convert_stream_closed_error(self, e)
  File "/home/julia/distributed/distributed/comm/tcp.py", line 144, in convert_stream_closed_error
    raise CommClosedError(f"in {obj}: {exc}") from exc
distributed.comm.core.CommClosedError: in <TCP (closed) Client->Scheduler local=tcp://127.0.0.1:43046 remote=tcp://127.0.0.1:44163>: Stream is closed

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/julia/distributed/distributed/utils.py", line 778, in wrapper
    return await func(*args, **kwargs)
  File "/home/julia/distributed/distributed/client.py", line 1211, in _reconnect
    await self._ensure_connected(timeout=timeout)
  File "/home/julia/distributed/distributed/client.py", line 1241, in _ensure_connected
    comm = await connect(
  File "/home/julia/distributed/distributed/comm/core.py", line 291, in connect
    comm = await asyncio.wait_for(
  File "/home/julia/conda/envs/dask-dev/lib/python3.9/asyncio/tasks.py", line 479, in wait_for
    return fut.result()
  File "/home/julia/distributed/distributed/comm/tcp.py", line 449, in connect
    stream = await self.client.connect(
  File "/home/julia/conda/envs/dask-dev/lib/python3.9/site-packages/tornado/tcpclient.py", line 265, in connect
    addrinfo = await self.resolver.resolve(host, port, af)
  File "/home/julia/distributed/distributed/comm/tcp.py", line 434, in resolve
    for fam, _, _, _, address in await asyncio.get_running_loop().getaddrinfo(
  File "/home/julia/conda/envs/dask-dev/lib/python3.9/asyncio/base_events.py", line 856, in getaddrinfo
    return await self.run_in_executor(
  File "/home/julia/conda/envs/dask-dev/lib/python3.9/asyncio/base_events.py", line 814, in run_in_executor
    executor.submit(func, *args), loop=self)
  File "/home/julia/conda/envs/dask-dev/lib/python3.9/concurrent/futures/thread.py", line 167, in submit
    raise RuntimeError('cannot schedule new futures after shutdown')
RuntimeError: cannot schedule new futures after shutdown

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/julia/conda/envs/dask-dev/lib/python3.9/logging/__init__.py", line 1086, in emit
    stream.write(msg + self.terminator)
ValueError: I/O operation on closed file.
Call stack:
  File "/home/julia/conda/envs/dask-dev/lib/python3.9/threading.py", line 930, in _bootstrap
    self._bootstrap_inner()
  File "/home/julia/conda/envs/dask-dev/lib/python3.9/threading.py", line 973, in _bootstrap_inner
    self.run()
  File "/home/julia/conda/envs/dask-dev/lib/python3.9/threading.py", line 910, in run
    self._target(*self._args, **self._kwargs)
  File "/home/julia/distributed/distributed/utils.py", line 485, in run_loop
    loop.start()
  File "/home/julia/conda/envs/dask-dev/lib/python3.9/site-packages/tornado/platform/asyncio.py", line 199, in start
    self.asyncio_loop.run_forever()
  File "/home/julia/conda/envs/dask-dev/lib/python3.9/asyncio/base_events.py", line 596, in run_forever
    self._run_once()
  File "/home/julia/conda/envs/dask-dev/lib/python3.9/asyncio/base_events.py", line 1890, in _run_once
    handle._run()
  File "/home/julia/conda/envs/dask-dev/lib/python3.9/asyncio/events.py", line 80, in _run
    self._context.run(self._callback, *self._args)
  File "/home/julia/distributed/distributed/utils.py", line 778, in wrapper
    return await func(*args, **kwargs)
  File "/home/julia/distributed/distributed/client.py", line 1400, in _handle_report
    await self._reconnect()
  File "/home/julia/distributed/distributed/utils.py", line 778, in wrapper
    return await func(*args, **kwargs)
  File "/home/julia/distributed/distributed/utils.py", line 804, in __exit__
    logger.exception(exc_value)
Message: RuntimeError('cannot schedule new futures after shutdown')
Arguments: ()
cannot schedule new futures after shutdown
Traceback (most recent call last):
  File "/home/julia/distributed/distributed/comm/tcp.py", line 223, in read
    frames_nbytes = await stream.read_bytes(fmt_size)
tornado.iostream.StreamClosedError: Stream is closed

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/julia/distributed/distributed/client.py", line 1392, in _handle_report
    msgs = await self.scheduler_comm.comm.read()
  File "/home/julia/distributed/distributed/comm/tcp.py", line 239, in read
    convert_stream_closed_error(self, e)
  File "/home/julia/distributed/distributed/comm/tcp.py", line 144, in convert_stream_closed_error
    raise CommClosedError(f"in {obj}: {exc}") from exc
distributed.comm.core.CommClosedError: in <TCP (closed) Client->Scheduler local=tcp://127.0.0.1:43046 remote=tcp://127.0.0.1:44163>: Stream is closed

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/julia/distributed/distributed/utils.py", line 778, in wrapper
    return await func(*args, **kwargs)
  File "/home/julia/distributed/distributed/client.py", line 1400, in _handle_report
    await self._reconnect()
  File "/home/julia/distributed/distributed/utils.py", line 778, in wrapper
    return await func(*args, **kwargs)
  File "/home/julia/distributed/distributed/client.py", line 1211, in _reconnect
    await self._ensure_connected(timeout=timeout)
  File "/home/julia/distributed/distributed/client.py", line 1241, in _ensure_connected
    comm = await connect(
  File "/home/julia/distributed/distributed/comm/core.py", line 291, in connect
    comm = await asyncio.wait_for(
  File "/home/julia/conda/envs/dask-dev/lib/python3.9/asyncio/tasks.py", line 479, in wait_for
    return fut.result()
  File "/home/julia/distributed/distributed/comm/tcp.py", line 449, in connect
    stream = await self.client.connect(
  File "/home/julia/conda/envs/dask-dev/lib/python3.9/site-packages/tornado/tcpclient.py", line 265, in connect
    addrinfo = await self.resolver.resolve(host, port, af)
  File "/home/julia/distributed/distributed/comm/tcp.py", line 434, in resolve
    for fam, _, _, _, address in await asyncio.get_running_loop().getaddrinfo(
  File "/home/julia/conda/envs/dask-dev/lib/python3.9/asyncio/base_events.py", line 856, in getaddrinfo
    return await self.run_in_executor(
  File "/home/julia/conda/envs/dask-dev/lib/python3.9/asyncio/base_events.py", line 814, in run_in_executor
    executor.submit(func, *args), loop=self)
  File "/home/julia/conda/envs/dask-dev/lib/python3.9/concurrent/futures/thread.py", line 167, in submit
    raise RuntimeError('cannot schedule new futures after shutdown')
RuntimeError: cannot schedule new futures after shutdown

Originally posted by @jsignell in https://github.com/dask/distributed/issues/6745#issuecomment-1195676661

graingert commented 2 years ago

It's particularly frustrating here because 127.0.0.1 doesn't actually need to use a thread to resolve it because it's already resolved:

graingert commented 2 years ago

this was first fixed in the specific case of interpreter shutdown: