dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.58k stars 718 forks source link

Future with no references reused #1860

Open jakirkham opened 6 years ago

jakirkham commented 6 years ago

Appears that when launching tasks with the Distributed Client, sometimes Futures are held onto a little too long and their results may end up being reused for subsequent tasks that have the same name. A trivial example is shown below.

Test script: ```python # test_dist_fut.py from dask.delayed import Delayed import distributed if __name__ == "__main__": c = distributed.Client() v = Delayed("v", {"v": 0}) v = v.persist() print(v.compute()) del v v = Delayed("v", {"v": 1}) v = v.persist() print(v.compute()) ```


Running script: ```bash $ python test_dist_fut.py 0 0 ```


Environment: ```yaml name: test channels: - conda-forge - defaults dependencies: - ca-certificates=2018.1.18=0 - click=6.7=py_1 - cloudpickle=0.5.2=py_0 - cytoolz=0.9.0.1=py36_0 - dask-core=0.17.2=py_0 - distributed=1.21.4=py36_0 - heapdict=1.0.0=py36_0 - msgpack-python=0.5.5=py36_0 - ncurses=5.9=10 - openssl=1.0.2n=0 - psutil=5.4.3=py36_0 - python=3.6.4=0 - pyyaml=3.12=py36_1 - readline=7.0=0 - six=1.11.0=py36_1 - sortedcontainers=1.5.9=py36_0 - sqlite=3.20.1=2 - tblib=1.3.2=py36_0 - tk=8.6.7=0 - toolz=0.9.0=py_0 - tornado=5.0.1=py36_1 - xz=5.2.3=0 - yaml=0.1.7=0 - zict=0.1.3=py_0 - zlib=1.2.11=0 ```
jakirkham commented 6 years ago

Should add that sometimes when running this the value does change as it should, but it is infrequent.

Also sometimes a GeneratorExit exception is thrown and ignored resulting in a RuntimeError like so.

```python Exception ignored in: RuntimeError: generator ignored GeneratorExit ```


Occasionally a different error crops up suggesting the cluster did not get shutdown appropriately somehow. Error shown below. Sometimes there are longer versions of this error involving more StreamClosedErrors.

```python Future exception was never retrieved future: : Stream is closed',)> Traceback (most recent call last): File "/zopt/conda2/envs/test/lib/python3.6/site-packages/distributed/comm/tcp.py", line 179, in read n_frames = yield stream.read_bytes(8) File "/zopt/conda2/envs/test/lib/python3.6/site-packages/tornado/gen.py", line 1099, in run value = future.result() tornado.iostream.StreamClosedError: Stream is closed During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/zopt/conda2/envs/test/lib/python3.6/site-packages/tornado/gen.py", line 1107, in run yielded = self.gen.throw(*exc_info) File "/zopt/conda2/envs/test/lib/python3.6/site-packages/distributed/comm/tcp.py", line 200, in read convert_stream_closed_error(self, e) File "/zopt/conda2/envs/test/lib/python3.6/site-packages/distributed/comm/tcp.py", line 128, in convert_stream_closed_error raise CommClosedError("in %s: %s" % (obj, exc)) distributed.comm.core.CommClosedError: in : Stream is closed ```


These two errors may or may not show up together or at all and may or may not correspond to the correct or incorrect output. All of this suggesting these are probably different issues.

mrocklin commented 6 years ago

We might also just consider this a misuse of dask.

On Fri, Mar 23, 2018 at 9:33 PM, jakirkham notifications@github.com wrote:

Should add that sometimes when running this the value does change as it should, but it is infrequent.

Also sometimes a GeneratorExit exception is thrown and ignored resulting in a RuntimeError like so.

Exception ignored in: <generator object add_client at 0x106f19f68>RuntimeError: generator ignored GeneratorExit

Occasionally a different error crops up suggesting the cluster did not get shutdown appropriately somehow. Error shown below. Sometimes there are longer versions of this error involving more StreamClosedErrors.

Future exception was never retrieved future: <Future finished exception=CommClosedError('in : Stream is closed',)> Traceback (most recent call last): File "/zopt/conda2/envs/test/lib/python3.6/site-packages/distributed/comm/tcp.py", line 179, in read n_frames = yield stream.read_bytes(8) File "/zopt/conda2/envs/test/lib/python3.6/site-packages/tornado/gen.py", line 1099, in run value = future.result() tornado.iostream.StreamClosedError: Stream is closed

During handling of the above exception, another exception occurred:

Traceback (most recent call last): File "/zopt/conda2/envs/test/lib/python3.6/site-packages/tornado/gen.py", line 1107, in run yielded = self.gen.throw(*exc_info) File "/zopt/conda2/envs/test/lib/python3.6/site-packages/distributed/comm/tcp.py", line 200, in read convert_stream_closed_error(self, e) File "/zopt/conda2/envs/test/lib/python3.6/site-packages/distributed/comm/tcp.py", line 128, in convert_stream_closed_error raise CommClosedError("in %s: %s" % (obj, exc)) distributed.comm.core.CommClosedError: in : Stream is closed

These two errors may or may not show up together or at all and may or may not correspond to the correct or incorrect output. All of this suggesting these are probably different issues.

— You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHub https://github.com/dask/distributed/issues/1860#issuecomment-375837014, or mute the thread https://github.com/notifications/unsubscribe-auth/AASszDYcrNq0x1kI8eKTnAFwkbNhEKuaks5thaKBgaJpZM4S5jMs .

jakirkham commented 6 years ago

Honestly that's a fair point and it does give rise to discussion in issue ( https://github.com/dask/dask/issues/3322 ). Have more to say on this subject, but have already said it in that issue.

That said, while this example is a bit contrived, it does reproduce the problem I'm seeing minimally and in the process also shows a few other issues I have seen with Distributed that have been hard to reproduce (particularly the second comment). Regardless of whether this is reasonable behavior, it provides a lot of information that is hopefully useful in addressing some of these issues.