dask / dask-yarn

Deploy dask on YARN clusters
http://yarn.dask.org
BSD 3-Clause "New" or "Revised" License
69 stars 41 forks source link

YarnCluster.shutdown() Won't Work on EMR, results in `concurrent.futures._base.CancelledError` #138

Open gallamine opened 3 years ago

gallamine commented 3 years ago

As I was working on #135 I've encountered an issue where the YARN cluster fails to shutdown when asked.

Code to reproduce:

from dask_yarn import YarnCluster

cluster = YarnCluster("python:///usr/bin/python3")
cluster.shutdown()

I'm running on an EMR cluster. Output when I run the above from a CLI while logged into the master node:

Python 3.6.12 (default, Aug 31 2020, 18:56:18)
[GCC 4.8.5 20150623 (Red Hat 4.8.5-28)] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> from dask_yarn import YarnCluster
>>> cluster = YarnCluster("python:///usr/bin/python3")
21/02/09 19:59:20 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
21/02/09 19:59:20 INFO client.RMProxy: Connecting to ResourceManager at ip-10-174-66-52.ec2.internal/10.174.66.52:8032
21/02/09 19:59:21 INFO skein.Driver: Driver started, listening on 41769
21/02/09 19:59:21 INFO skein.Driver: Uploading application resources to hdfs://ip-10-174-66-52.ec2.internal:8020/user/wcox/.skein/application_1612889447769_0012
21/02/09 19:59:22 INFO skein.Driver: Submitting application...
21/02/09 19:59:22 INFO impl.YarnClientImpl: Submitted application application_1612889447769_0012
>>> cluster.shutdown()
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/dask_yarn/core.py", line 903, in _watch_worker_status
    msgs = await comm.read()
  File "/usr/local/lib/python3.6/site-packages/distributed/comm/tcp.py", line 187, in read
    n_frames = await stream.read_bytes(8)
concurrent.futures._base.CancelledError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/local/lib/python3.6/site-packages/dask_yarn/core.py", line 729, in shutdown
    self._sync(self._stop_internal(status=status, diagnostics=diagnostics))
  File "/usr/local/lib/python3.6/site-packages/dask_yarn/core.py", line 701, in _sync
    return future.result()
  File "/usr/lib64/python3.6/concurrent/futures/_base.py", line 432, in result
    return self.__get_result()
  File "/usr/lib64/python3.6/concurrent/futures/_base.py", line 384, in __get_result
    raise self._exception
  File "/usr/local/lib/python3.6/site-packages/dask_yarn/core.py", line 632, in _stop_internal
    await self._stop_task
  File "/usr/local/lib/python3.6/site-packages/dask_yarn/core.py", line 645, in _stop_async
    await cancel_task(self._watch_worker_status_task)
  File "/usr/local/lib/python3.6/site-packages/dask_yarn/core.py", line 65, in cancel_task
    await task
  File "/usr/local/lib/python3.6/site-packages/dask_yarn/core.py", line 924, in _watch_worker_status
    await comm.close()
  File "/usr/local/lib64/python3.6/site-packages/tornado/gen.py", line 326, in wrapper
    yielded = next(result)
  File "/usr/local/lib/python3.6/site-packages/distributed/comm/tcp.py", line 282, in close
    stream.close()
  File "/usr/local/lib64/python3.6/site-packages/tornado/iostream.py", line 636, in close
    self._maybe_run_close_callback()
  File "/usr/local/lib64/python3.6/site-packages/tornado/iostream.py", line 655, in _maybe_run_close_callback
    future.set_exception(StreamClosedError(real_error=self.error))
asyncio.base_futures.InvalidStateError: invalid state
>>>

Note that the YARN Application also continues to run.

Environment:

Plozano94 commented 3 years ago

I have the same issue, any ideas?