dask / dask-yarn

Deploy dask on YARN clusters
http://yarn.dask.org
BSD 3-Clause "New" or "Revised" License
69 stars 41 forks source link

Using dask-yarn + EMR encountering InvalidStateError when doing cluster.close() #127

Open amyzheng1995 opened 4 years ago

amyzheng1995 commented 4 years ago

Thank you for reporting an issue.

When reporting an issue, please include the following:

cluster = YarnCluster() client = Client(cluster)

Scale up to 3 workers

cluster.scale(3)

cluster.close()



- **Relevant logs/tracebacks**
```---------------------------------------------------------------------------
CancelledError                            Traceback (most recent call last)
/home/hadoop/miniconda/lib/python3.7/site-packages/dask_yarn/core.py in _watch_worker_status(self, comm)
    902                 try:
--> 903                     msgs = await comm.read()
    904                 except OSError:

/home/hadoop/miniconda/lib/python3.7/site-packages/distributed/comm/tcp.py in read(self, deserializers)
    185         try:
--> 186             n_frames = await stream.read_bytes(8)
    187             n_frames = struct.unpack("Q", n_frames)[0]

CancelledError: 

During handling of the above exception, another exception occurred:

InvalidStateError                         Traceback (most recent call last)
<ipython-input-8-f7593bfa8275> in <module>()
----> 1 cluster.close()

/home/hadoop/miniconda/lib/python3.7/site-packages/dask_yarn/core.py in close(self, **kwargs)
    740         shutdown
    741         """
--> 742         return self.shutdown(**kwargs)
    743 
    744     def __del__(self):

/home/hadoop/miniconda/lib/python3.7/site-packages/dask_yarn/core.py in shutdown(self, status, diagnostics)
    727             return self._stop_internal(status=status, diagnostics=diagnostics)
    728         if self.loop.asyncio_loop.is_running() and not sys.is_finalizing():
--> 729             self._sync(self._stop_internal(status=status, diagnostics=diagnostics))
    730         else:
    731             # Always run this!

/home/hadoop/miniconda/lib/python3.7/site-packages/dask_yarn/core.py in _sync(self, task)
    699         future = asyncio.run_coroutine_threadsafe(task, self.loop.asyncio_loop)
    700         try:
--> 701             return future.result()
    702         except BaseException:
    703             future.cancel()

/home/hadoop/miniconda/lib/python3.7/concurrent/futures/_base.py in result(self, timeout)
    433                 raise CancelledError()
    434             elif self._state == FINISHED:
--> 435                 return self.__get_result()
    436             else:
    437                 raise TimeoutError()

/home/hadoop/miniconda/lib/python3.7/concurrent/futures/_base.py in __get_result(self)
    382     def __get_result(self):
    383         if self._exception:
--> 384             raise self._exception
    385         else:
    386             return self._result

/home/hadoop/miniconda/lib/python3.7/site-packages/dask_yarn/core.py in _stop_internal(self, status, diagnostics)
    630                 self._stop_async(status=status, diagnostics=diagnostics)
    631             )
--> 632         await self._stop_task
    633 
    634     async def _stop_async(self, status="SUCCEEDED", diagnostics=None):

/home/hadoop/miniconda/lib/python3.7/site-packages/IPython/core/interactiveshell.py in run_code(self, code_obj, result)
   2876                 self.hooks.pre_run_code_hook()
   2877                 #rprint('Running code', repr(code_obj)) # dbg
-> 2878                 exec(code_obj, self.user_global_ns, self.user_ns)
   2879             finally:
   2880                 # Reset our crash handler in place

<ipython-input-5-f7593bfa8275> in <module>()
----> 1 cluster.close()

/home/hadoop/miniconda/lib/python3.7/site-packages/dask_yarn/core.py in close(self, **kwargs)
    740         shutdown
    741         """
--> 742         return self.shutdown(**kwargs)
    743 
    744     def __del__(self):

/home/hadoop/miniconda/lib/python3.7/site-packages/dask_yarn/core.py in shutdown(self, status, diagnostics)
    727             return self._stop_internal(status=status, diagnostics=diagnostics)
    728         if self.loop.asyncio_loop.is_running() and not sys.is_finalizing():
--> 729             self._sync(self._stop_internal(status=status, diagnostics=diagnostics))
    730         else:
    731             # Always run this!

/home/hadoop/miniconda/lib/python3.7/site-packages/dask_yarn/core.py in _sync(self, task)
    699         future = asyncio.run_coroutine_threadsafe(task, self.loop.asyncio_loop)
    700         try:
--> 701             return future.result()
    702         except BaseException:
    703             future.cancel()

/home/hadoop/miniconda/lib/python3.7/concurrent/futures/_base.py in result(self, timeout)
    433                 raise CancelledError()
    434             elif self._state == FINISHED:
--> 435                 return self.__get_result()
    436             else:
    437                 raise TimeoutError()

/home/hadoop/miniconda/lib/python3.7/concurrent/futures/_base.py in __get_result(self)
    382     def __get_result(self):
    383         if self._exception:
--> 384             raise self._exception
    385         else:
    386             return self._result

/home/hadoop/miniconda/lib/python3.7/site-packages/dask_yarn/core.py in _stop_internal(self, status, diagnostics)
    630                 self._stop_async(status=status, diagnostics=diagnostics)
    631             )
--> 632         await self._stop_task
    633 
    634     async def _stop_async(self, status="SUCCEEDED", diagnostics=None):

/home/hadoop/miniconda/lib/python3.7/site-packages/dask_yarn/core.py in _stop_async(self, status, diagnostics)
    643 
    644         if self._watch_worker_status_task is not None:
--> 645             await cancel_task(self._watch_worker_status_task)
    646             self._watch_worker_status_task = None
    647 

/home/hadoop/miniconda/lib/python3.7/site-packages/dask_yarn/core.py in cancel_task(task)
     63     task.cancel()
     64     try:
---> 65         await task
     66     except asyncio.CancelledError:
     67         pass

/home/hadoop/miniconda/lib/python3.7/site-packages/dask_yarn/core.py in _watch_worker_status(self, comm)
    922                     self = None
    923         finally:
--> 924             await comm.close()
    925 
    926     def _widget_status(self):

/home/hadoop/miniconda/lib/python3.7/site-packages/tornado/gen.py in wrapper(*args, **kwargs)
    324                 try:
    325                     orig_stack_contexts = stack_context._state.contexts
--> 326                     yielded = next(result)
    327                     if stack_context._state.contexts is not orig_stack_contexts:
    328                         yielded = _create_future()

/home/hadoop/miniconda/lib/python3.7/site-packages/distributed/comm/tcp.py in close(self)
    279             finally:
    280                 self._finalizer.detach()
--> 281                 stream.close()
    282 
    283     def abort(self):

/home/hadoop/miniconda/lib/python3.7/site-packages/tornado/iostream.py in close(self, exc_info)
    634             self.close_fd()
    635             self._closed = True
--> 636         self._maybe_run_close_callback()
    637 
    638     def _maybe_run_close_callback(self):

/home/hadoop/miniconda/lib/python3.7/site-packages/tornado/iostream.py in _maybe_run_close_callback(self)
    653                 self._ssl_connect_future = None
    654             for future in futures:
--> 655                 future.set_exception(StreamClosedError(real_error=self.error))
    656                 future.exception()
    657             if self._close_callback is not None:

InvalidStateError: invalid state```

- **Version information**
    - Python version 3.7.7
    - Dask-Yarn version 0.8.1
    https://github.com/dask/dask-yarn/blob/master/deployment_resources/aws-emr/bootstrap-dask