dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.58k stars 718 forks source link

raise CommClosedError() distributed.comm.core.CommClosedError with tornado.iostream.StreamClosedError: Stream is closedz #4103

Open agniszczotka opened 4 years ago

agniszczotka commented 4 years ago

What happened: When creating large data frames and saving to parquet there is an error:

Task exception was never retrieved future: <Task finished coro=<connect..() done, defined at /Users/agnieszkaszczotka/.pyenv/versions/anaconda3-2019.10/envs/bayesian/lib/python3.6/site-packages/distributed/comm/core.py:279> exception=CommClosedError()> Traceback (most recent call last): File "/Users/agnieszkaszczotka/.pyenv/versions/anaconda3-2019.10/envs/bayesian/lib/python3.6/site-packages/distributed/comm/core.py", line 288, in handshake = await asyncio.wait_for(comm.read(), 1) File "/Users/agnieszkaszczotka/.pyenv/versions/anaconda3-2019.10/envs/bayesian/lib/python3.6/asyncio/tasks.py", line 351, in wait_for yield from waiter concurrent.futures._base.CancelledError

During handling of the above exception, another exception occurred:

Traceback (most recent call last): File "/Users/agnieszkaszczotka/.pyenv/versions/anaconda3-2019.10/envs/bayesian/lib/python3.6/site-packages/distributed/comm/core.py", line 295, in _ raise CommClosedError() distributed.comm.core.CommClosedError distributed.worker - ERROR - Worker stream died during communication: tcp://127.0.0.1:57234 Traceback (most recent call last): File "/Users/agnieszkaszczotka/.pyenv/versions/anaconda3-2019.10/envs/bayesian/lib/python3.6/site-packages/distributed/comm/tcp.py", line 186, in read n_frames = await stream.read_bytes(8) tornado.iostream.StreamClosedError: Stream is closed

The above exception was the direct cause of the following exception:

Traceback (most recent call last): File "/Users/agnieszkaszczotka/.pyenv/versions/anaconda3-2019.10/envs/bayesian/lib/python3.6/site-packages/distributed/worker.py", line 1983, in gather_dep self.rpc, deps, worker, who=self.address File "/Users/agnieszkaszczotka/.pyenv/versions/anaconda3-2019.10/envs/bayesian/lib/python3.6/site-packages/distributed/worker.py", line 3258, in get_data_from_worker return await retry_operation(_get_data, operation="get_data_from_worker") File "/Users/agnieszkaszczotka/.pyenv/versions/anaconda3-2019.10/envs/bayesian/lib/python3.6/site-packages/distributed/utils_comm.py", line 390, in retry_operation operation=operation, File "/Users/agnieszkaszczotka/.pyenv/versions/anaconda3-2019.10/envs/bayesian/lib/python3.6/site-packages/distributed/utils_comm.py", line 370, in retry return await coro() File "/Users/agnieszkaszczotka/.pyenv/versions/anaconda3-2019.10/envs/bayesian/lib/python3.6/site-packages/distributed/worker.py", line 3245, in _get_data max_connections=max_connections, File "/Users/agnieszkaszczotka/.pyenv/versions/anaconda3-2019.10/envs/bayesian/lib/python3.6/site-packages/distributed/core.py", line 666, in send_recv response = await comm.read(deserializers=deserializers) File "/Users/agnieszkaszczotka/.pyenv/versions/anaconda3-2019.10/envs/bayesian/lib/python3.6/site-packages/distributed/comm/tcp.py", line 201, in read convert_stream_closed_error(self, e) File "/Users/agnieszkaszczotka/.pyenv/versions/anaconda3-2019.10/envs/bayesian/lib/python3.6/site-packages/distributed/comm/tcp.py", line 125, in convert_stream_closed_error raise CommClosedError("in %s: %s" % (obj, exc)) from exc distributed.comm.core.CommClosedError: in : Stream is closed

What you expected to happen: no error

Minimal Complete Verifiable Example:

    from dask.distributed import Client
    from scipy import sparse
    import numpy as np
    import pandas as pd
    import dask.dataframe as dd

    client = Client()
    n = 10000
    columns1 = np.arange(n).astype(str)
    columns2 = np.arange(start=1000, stop=n + 1000).astype(str)

    df1 = pd.DataFrame.sparse.from_spmatrix(sparse.eye(n), index=columns1, columns=columns1)  # Create a pandas df
    df2 = pd.DataFrame.sparse.from_spmatrix(sparse.eye(n), index=columns2, columns=columns2)  # Create a pandas df
    ddf1 = dd.from_pandas(df1, chunksize=1000, sort=True)  # Create a dask.dataframe with known divisions
    ddf2 = dd.from_pandas(df2, chunksize=1000, sort=True)  # Create a dask.dataframe with known divisions

    ddfs = ddf1.add(ddf2)
    ddfs = ddfs.map_partitions(lambda x: x.sparse.to_dense().fillna(0.0))
    print(ddfs.head())
    ddfs.to_parquet(path='save', compute=True)

Anything else we need to know?: the variable n is setting the size of dataframe, when n is small e.g 100 the error does not show up.

Environment:

quasiben commented 4 years ago

Thanks for the report @agniszczotka . When I ran I got a different error:

distributed.worker - WARNING -  Compute Failed
Function:  execute_task
args:      ((<function apply at 0x7ff1f6bd40d0>, <function ArrowEngine.write_partition at 0x7ff17fb1a430>, [                       0    1   10  100  1000  10000  10001  10002  10003  10004  10005  10006  10007  10008  10009  1001  10010  10011  10012  10013  ...  9981  9982  9983  9984  9985  9986  9987  9988  9989  999  9990  9991  9992  9993  9994  9995  9996  9997  9998  9999
__null_dask_index__                                                                                                                                    ...
8199                 0.0  0.0  0.0  0.0   0.0    0.0    0.0    0.0    0.0    0.0    0.0    0.0    0.0    0.0    0.0   0.0    0.0    0.0    0.0    0.0  ...   0.0   0.0   0.0   0.0   0.0   0.0   0.0   0.0   0.0  0.0   0.0   0.0   0.0   0.0   0.0   0.0   0.0   0.0   0.0   0.0
82                   0.0  0.0  0.0  0.0   0.0    0.0    0.0    0.0    0.0    0.0
kwargs:    {}
Exception: ArrowTypeError('Did not pass numpy.dtype object', 'Conversion failed for column 0 with type float64')

I probably need to upgrade arrow to 1.X (I currently have 0.17 installed. What are version of arrow are you using ? or if not arrow, fastparquet ?

quasiben commented 4 years ago

After upgrading arrow i was not able to reproduce on Linux:

In [10]: ddfs.to_parquet(path='save', compute=True)

In [11]: import dask, distributed, pyarrow

In [12]: dask.__version__, distributed.__version__, pyarrow.__version__
Out[12]: ('2.25.0', '2.25.0', '1.0.1')
agniszczotka commented 4 years ago

I do use fastparquet. could you check with fastparquet?