dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.56k stars 715 forks source link

Timed out trying to connect to host after 10 s #2880

Closed seanlaw closed 4 years ago

seanlaw commented 5 years ago

I have a dask distributed cluster up and running on 40 workers:

dask_client = Client('localhost:8786')
dask_client.restart()
dask_client

I've restarted everything so no tasks are queued and the scheduler log shows:

distributed.scheduler - INFO - Clear task state

I have a large csr sparse matrix that I am scattering to the cluster:

csr_future = dask_client.scatter(csr, broadcast=True)

After a few seconds, I see:

distributed.scheduler - INFO - Remove worker tcp://10.157.169.65:38615
distributed.core - INFO - Removing comms to tcp://10.157.169.65:38615
distributed.scheduler - INFO - Remove worker tcp://10.157.169.65:33352
distributed.core - INFO - Removing comms to tcp://10.157.169.65:33352
distributed.scheduler - INFO - Register tcp://10.157.169.65:38051
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.157.169.65:38051
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Register tcp://10.157.169.65:46414
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.157.169.65:46414
distributed.core - INFO - Starting established connection

So, it looks like some workers are being removed and new workers are being added back to replace those workers. Around 30 seconds after this, I see multiple tornado errors:

tornado.application - ERROR - Multiple exceptions in yield list
Traceback (most recent call last):
  File "/app/home/miniconda3/lib/python3.7/site-packages/distributed/comm/core.py", line 218, in connect
    quiet_exceptions=EnvironmentError,
  File "/app/home/miniconda3/lib/python3.7/site-packages/tornado/gen.py", line 729, in run
    value = future.result()
tornado.util.TimeoutError: Timeout

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/app/home/miniconda3/lib/python3.7/site-packages/tornado/gen.py", line 501, in callback
    result_list.append(f.result())
  File "/app/home/miniconda3/lib/python3.7/site-packages/tornado/gen.py", line 736, in run
    yielded = self.gen.throw(*exc_info)  # type: ignore
  File "/app/home/miniconda3/lib/python3.7/site-packages/distributed/core.py", line 736, in send_recv_from_rpc
    comm = yield self.pool.connect(self.addr)
  File "/app/home/miniconda3/lib/python3.7/site-packages/tornado/gen.py", line 729, in run
    value = future.result()
  File "/app/home/miniconda3/lib/python3.7/site-packages/tornado/gen.py", line 736, in run
    yielded = self.gen.throw(*exc_info)  # type: ignore
  File "/app/home/miniconda3/lib/python3.7/site-packages/distributed/core.py", line 864, in connect
    connection_args=self.connection_args,
  File "/app/home/miniconda3/lib/python3.7/site-packages/tornado/gen.py", line 729, in run
    value = future.result()
  File "/app/home/miniconda3/lib/python3.7/site-packages/tornado/gen.py", line 736, in run
    yielded = self.gen.throw(*exc_info)  # type: ignore
  File "/app/home/miniconda3/lib/python3.7/site-packages/distributed/comm/core.py", line 230, in connect
    _raise(error)
  File "/app/home/miniconda3/lib/python3.7/site-packages/distributed/comm/core.py", line 207, in _raise
    raise IOError(msg)
OSError: Timed out trying to connect to 'tcp://10.157.169.65:33352' after 10 s: in <distributed.comm.tcp.TCPConnector object at 0x7f2e39007c88>: ConnectionRefusedError: [Errno 111] Connection refused
distributed.core - ERROR - Timed out trying to connect to 'tcp://10.157.169.65:38615' after 10 s: in <distributed.comm.tcp.TCPConnector object at 0x7f2e3900f4a8>: ConnectionRefusedError: [Errno 111] Connection refused
Traceback (most recent call last):
  File "/app/home/miniconda3/lib/python3.7/site-packages/distributed/comm/core.py", line 218, in connect
    quiet_exceptions=EnvironmentError,
  File "/app/home/miniconda3/lib/python3.7/site-packages/tornado/gen.py", line 729, in run
    value = future.result()
tornado.util.TimeoutError: Timeout

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/app/home/miniconda3/lib/python3.7/site-packages/distributed/core.py", line 412, in handle_comm
    result = yield result
  File "/app/home/miniconda3/lib/python3.7/site-packages/tornado/gen.py", line 729, in run
    value = future.result()
  File "/app/home/miniconda3/lib/python3.7/site-packages/tornado/gen.py", line 736, in run
    yielded = self.gen.throw(*exc_info)  # type: ignore
  File "/app/home/miniconda3/lib/python3.7/site-packages/distributed/scheduler.py", line 2496, in scatter
    yield self.replicate(keys=keys, workers=workers, n=n)
  File "/app/home/miniconda3/lib/python3.7/site-packages/tornado/gen.py", line 729, in run
    value = future.result()
  File "/app/home/miniconda3/lib/python3.7/site-packages/tornado/gen.py", line 736, in run
    yielded = self.gen.throw(*exc_info)  # type: ignore
  File "/app/home/miniconda3/lib/python3.7/site-packages/distributed/scheduler.py", line 2903, in replicate
    for w, who_has in gathers.items()
  File "/app/home/miniconda3/lib/python3.7/site-packages/tornado/gen.py", line 729, in run
    value = future.result()
  File "/app/home/miniconda3/lib/python3.7/site-packages/tornado/gen.py", line 501, in callback
    result_list.append(f.result())
  File "/app/home/miniconda3/lib/python3.7/site-packages/tornado/gen.py", line 736, in run
    yielded = self.gen.throw(*exc_info)  # type: ignore
  File "/app/home/miniconda3/lib/python3.7/site-packages/distributed/core.py", line 736, in send_recv_from_rpc
    comm = yield self.pool.connect(self.addr)
  File "/app/home/miniconda3/lib/python3.7/site-packages/tornado/gen.py", line 729, in run
    value = future.result()
  File "/app/home/miniconda3/lib/python3.7/site-packages/tornado/gen.py", line 736, in run
    yielded = self.gen.throw(*exc_info)  # type: ignore
  File "/app/home/miniconda3/lib/python3.7/site-packages/distributed/core.py", line 864, in connect
    connection_args=self.connection_args,
  File "/app/home/miniconda3/lib/python3.7/site-packages/tornado/gen.py", line 729, in run
    value = future.result()
  File "/app/home/miniconda3/lib/python3.7/site-packages/tornado/gen.py", line 736, in run
    yielded = self.gen.throw(*exc_info)  # type: ignore
  File "/app/home/miniconda3/lib/python3.7/site-packages/distributed/comm/core.py", line 230, in connect
    _raise(error)
  File "/app/home/miniconda3/lib/python3.7/site-packages/distributed/comm/core.py", line 207, in _raise
    raise IOError(msg)
OSError: Timed out trying to connect to 'tcp://10.157.169.65:38615' after 10 s: in <distributed.comm.tcp.TCPConnector object at 0x7f2e3900f4a8>: ConnectionRefusedError: [Errno 111] Connection refused

It looks like the time out/connection refused are referring to the same ipaddress/ports where it was trying to Removing comms from earlier up above. I can't seem to resolve this.

In case it matters, I am running these commands in a jupyterlab=0.35.5 that is running next to the dask-scheduler and we are running tornado=6.0.2 with dask=1.2.2.

seanlaw commented 5 years ago

Also, executing dask_client.get_version(check=True) returns a dictionary of version info for the various workers without any warnings.

quasiben commented 5 years ago

@seanlaw how was the cluster initially setup ?

seanlaw commented 5 years ago

@quasiben I have a set bash scripts that basically executes:

dask-scheduler --pid-file /tmp/PID.DASK  # Scheduler node

on the scheduler node and, for the worker nodes, we ssh onto each server and run:

ncores=`grep -c ^processor /proc/cpuinfo`
daskhost=`hostname`
daskdomain=`dnsdomainname`
daskhostdomain="$daskhost.$daskdomain"
nohup dask-worker --pid-file /tmp/PID.DASK --nprocs $ncores --nthreads 1 --local-directory /tmp/dask-worker-space $daskhostdomain:8786 > /dev/null 2> /dev/null &

This is how I've consistently started my clusters unless you are referring to conda installation?

So, unlike many other time out issues where the scheduler has problems connecting to worker at startup, my issue is post startup. I can scatter (with broadcasting) a single variable or a small list but a larger file causes this strange behavior where the scheduler seems to time out when trying to communicate with a worker that it has already began removing communications with and replaced those with a new worker.

jcrist commented 5 years ago

Hmmm, I'm not sure what would be causing that. From the scheduler logs it looks like the workers may be dieing and restarting, and scatter may not be equipped to handle that properly. Can you look at the worker logs to see if there's any revealing output? Workers may die if they use too much memory, or if deserialization leads to irrecoverable errors (e.g. segfaults).

mrocklin commented 5 years ago

I would watch the dashboard during scattering to see if memory is increasing. Although, if it were a memory problem I would also expect that to show up in the logs.

On Wed, Jul 24, 2019 at 8:21 AM Jim Crist notifications@github.com wrote:

Hmmm, I'm not sure what would be causing that. From the scheduler logs it looks like the workers may be dieing and restarting, and scatter may not be equipped to handle that properly. Can you look at the worker logs to see if there's any revealing output? Workers may die if they use too much memory, or if deserialization leads to irrecoverable errors (e.g. segfaults).

— You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHub https://github.com/dask/distributed/issues/2880?email_source=notifications&email_token=AACKZTEOIYMREOO2Q5PAW43QBBXOBA5CNFSM4IGRA2F2YY3PNVWWK3TUL52HS4DFVREXG43VMVBW63LNMVXHJKTDN5WW2ZLOORPWSZGOD2WVZ5Y#issuecomment-514678007, or mute the thread https://github.com/notifications/unsubscribe-auth/AACKZTH3SJ44EVOJMYLGEULQBBXOBANCNFSM4IGRA2FQ .

seanlaw commented 5 years ago

@jcrist I don't see any obvious errors in the worker logs. In case it matters, all of the worker servers are identical and have the same hardware resources.

I am going to try the following:

  1. Kill the scheduler and all 40 workers (spread across 5 servers with 8 workers each)
  2. Start the scheduler, start 8 workers on a single worker server, scatter the csr matrix
  3. See if there is a timeout error
  4. Repeat step 2 on the same server
  5. Repeat steps 2-4 for the remaining 4 worker servers
seanlaw commented 5 years ago

@mrocklin Indeed, the scattering is increasing the memory on all of the worker and shows only about 1GB (out of an available 8GB). 1GB is the upper limit for the memory size of the matrix so we should be all good there and not running out of memory. It is able to successfully scatter on one of the server with 8 workers and this is completed in seconds.

seanlaw commented 5 years ago

So, on individual servers, there are tests where the first trial passes (i.e., scattering is successful without error) and, on the second pass (where we do dask_client.restart()) , we fail with the same error as above.

Occasionally, I see a new AssertionError with the exact same command:

INFO - Remove worker tcp://10.157.169.67:39401
distributed.core - INFO - Removing comms to tcp://10.157.169.67:39401
distributed.scheduler - INFO - Register tcp://10.157.169.67:44945
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.157.169.67:44945
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Remove worker tcp://10.157.169.67:44341
distributed.core - INFO - Removing comms to tcp://10.157.169.67:44341
distributed.scheduler - INFO - Register tcp://10.157.169.67:40134
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.157.169.67:40134
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Remove worker tcp://10.157.169.67:36744
distributed.core - INFO - Removing comms to tcp://10.157.169.67:36744
distributed.scheduler - WARNING - Communication failed during replication: {'status': 'missing-data', 'keys': {'csr_matrix-62581362972a12d8acd999eee5692bfe': ('tcp://10.157.169.67:44341', 'tcp://10.157.169.67:36744', 'tcp://10.157.169.67:39401')}}
distributed.scheduler - WARNING - Communication failed during replication: {'status': 'missing-data', 'keys': {'csr_matrix-62581362972a12d8acd999eee5692bfe': ('tcp://10.157.169.67:44341', 'tcp://10.157.169.67:36744', 'tcp://10.157.169.67:39401')}}
distributed.scheduler - WARNING - Communication failed during replication: {'status': 'missing-data', 'keys': {'csr_matrix-62581362972a12d8acd999eee5692bfe': ('tcp://10.157.169.67:44341', 'tcp://10.157.169.67:36744', 'tcp://10.157.169.67:39401')}}
distributed.core - ERROR -
Traceback (most recent call last):
  File "/app/home/miniconda3/lib/python3.7/site-packages/distributed/core.py", line 412, in handle_comm
    result = yield result
  File "/app/home/miniconda3/lib/python3.7/site-packages/tornado/gen.py", line 729, in run
    value = future.result()
  File "/app/home/miniconda3/lib/python3.7/site-packages/tornado/gen.py", line 736, in run
    yielded = self.gen.throw(*exc_info)  # type: ignore
  File "/app/home/miniconda3/lib/python3.7/site-packages/distributed/scheduler.py", line 2496, in scatter
    yield self.replicate(keys=keys, workers=workers, n=n)
  File "/app/home/miniconda3/lib/python3.7/site-packages/tornado/gen.py", line 729, in run
    value = future.result()
  File "/app/home/miniconda3/lib/python3.7/site-packages/tornado/gen.py", line 742, in run
    yielded = self.gen.send(value)
  File "/app/home/miniconda3/lib/python3.7/site-packages/distributed/scheduler.py", line 2896, in replicate
    assert count > 0
AssertionError
distributed.scheduler - INFO - Register tcp://10.157.169.67:37024
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.157.169.67:37024
distributed.core - INFO - Starting established connection

And this can show up even when the run before AND after are successful at scattering.

seanlaw commented 5 years ago

Kill the scheduler and all 40 workers (spread across 5 servers with 8 workers each) Start the scheduler, start 8 workers on a single worker server, scatter the csr matrix See if there is a timeout error Repeat step 2 on the same server Repeat steps 2-4 for the remaining 4 worker servers

Here are the scattering results for each of the 5 servers (where there was a failure, I would restart and re-run until it passed):

Server 1: Fail, Fail, Pass, Pass
Server 2: Pass, Pass
Server 3: Pass, Fail, Pass
Server 4: Pass, Pass, Fail, Pass
Server 5: Pass, Pass, Fail, Fail, Fail, Pass

I am starting to question whether it is a problem with the dask scheduler and running it on the same server as jupyterlab

seanlaw commented 5 years ago

Interestingly enough, manually "scattering" the data works without any issues:

hosts = list(dask_client.ncores().keys())
csr_futures = []
for host in hosts:
    csr_future = dask_client.scatter(csr, workers=[host])
    csr_futures.append(csr_future)

Of course, this is undesirable since I now have a list of futures that point to "different" data. In the case of using dask_client.scatter, it returns back a single future that can be referenced from any of the servers.

seanlaw commented 5 years ago

From the scheduler logs it looks like the workers may be dieing and restarting, and scatter may not be equipped to handle that properly

@jcrist I think you may be onto something. After I load scatter the data manually and then I go on to perform my delayed computation, I still see:

distributed.scheduler - INFO - Remove worker tcp://10.157.169.67:45644
distributed.core - INFO - Removing comms to tcp://10.157.169.67:45644

But this doesn't cause the delayed computation to throw an exception like we saw with scattering. All of the delayed computation are able to finish.

seanlaw commented 5 years ago

hosts = list(dask_client.ncores().keys()) csr_futures = [] for host in hosts: csr_future = dask_client.scatter(csr, workers=[host]) csr_futures.append(csr_future)

For better efficiency, I switched over to dasks replicate function instead which allows me to keep the same future instead of having multiple futures

hosts = list(dask_client.ncores().keys())
csr_future = dask_client.scatter(csr, workers=[hosts[0]])
dask_client.replicate(csr_future)

This also has the added benefit of being copied over in a tree fashion and so it is lightning fast!

mrocklin commented 5 years ago

Interestingly, all that scatter(..., broadcast=True) does is call scatter once, and then call replicate. It's odd that there would be a behavior change between these two approaches.

On Wed, Jul 24, 2019 at 5:54 PM Sean M. Law notifications@github.com wrote:

hosts = list(dask_client.ncores().keys()) csr_futures = [] for host in hosts: csr_future = dask_client.scatter(csr, workers=[host]) csr_futures.append(csr_future)

For better efficiency, I switched over to dasks replicate function instead which allows me to keep the same future instead of having multiple futures

hosts = list(dask_client.ncores().keys()) csr_future = dask_client.scatter(csr, workers=[hosts[0]]) dask_client.replicate(csr_future)

This also has the added benefit of being copied over in a tree fashion and so it is lightning fast!

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/dask/distributed/issues/2880?email_source=notifications&email_token=AACKZTAU4IE26X4UZPVSMRLQBD2UTA5CNFSM4IGRA2F2YY3PNVWWK3TUL52HS4DFVREXG43VMVBW63LNMVXHJKTDN5WW2ZLOORPWSZGOD2YAMKQ#issuecomment-514852394, or mute the thread https://github.com/notifications/unsubscribe-auth/AACKZTEWPRVYOLV6GNFKWATQBD2UTANCNFSM4IGRA2FQ .

seanlaw commented 4 years ago

Interesting indeed! Are tcp timeout errors somehow handled differently in scatter? It seems like during scatter, if there is a problem with a worker then

  1. that ("old") worker is removed
  2. a "new" worker is added
  3. data is sent to the "new" worker
  4. scatter continues to wait to communicate with the "old" worker but, since the "old" worker has been removed, tcp communication times out, and scatter errors out

Is this mental model correct or incorrect?

Interestingly, all that scatter(..., broadcast=True) does is call scatter once, and then call replicate. It's odd that there would be a behavior change between these two approaches.

athornton commented 4 years ago

This appears to be what I am seeing as well. In my case the super-annoying bit is that my namespace is running out of resources because the workers are not getting reaped after 60s as they should be. I think maybe the unhandled exception on timeout is keeping the exception from bubbling up to whatever layer kills the worker after the 60s threshold.

Here's the stacktrace:

tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOLoop object at 0x7feb77f9b278>>, <Task finished coro=<Worker.heartbeat() done, defined at /opt/lsst/software/stack/python/miniconda3-4.5.12/envs/lsst-scipipe-1172c30/lib/python3.7/site-packages/distributed/worker.py:866> exception=OSError("Timed out trying to connect to 'tcp://10.46.128.14:36210' after 10 s: in <distributed.comm.tcp.TCPConnector object at 0x7feb07444240>: ConnectionRefusedError: [Errno 111] Connection refused")>)
Traceback (most recent call last):
  File "/opt/lsst/software/stack/python/miniconda3-4.5.12/envs/lsst-scipipe-1172c30/lib/python3.7/site-packages/distributed/comm/core.py", line 215, in connect
    quiet_exceptions=EnvironmentError,
tornado.util.TimeoutError: Timeout

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/lsst/software/stack/python/miniconda3-4.5.12/envs/lsst-scipipe-1172c30/lib/python3.7/site-packages/tornado/ioloop.py", line 743, in _run_callback
    ret = callback()
  File "/opt/lsst/software/stack/python/miniconda3-4.5.12/envs/lsst-scipipe-1172c30/lib/python3.7/site-packages/tornado/ioloop.py", line 767, in _discard_future_result
    future.result()
  File "/opt/lsst/software/stack/python/miniconda3-4.5.12/envs/lsst-scipipe-1172c30/lib/python3.7/site-packages/distributed/worker.py", line 875, in heartbeat
    metrics=await self.get_metrics(),
  File "/opt/lsst/software/stack/python/miniconda3-4.5.12/envs/lsst-scipipe-1172c30/lib/python3.7/site-packages/distributed/core.py", line 747, in send_recv_from_rpc
    comm = await self.pool.connect(self.addr)
  File "/opt/lsst/software/stack/python/miniconda3-4.5.12/envs/lsst-scipipe-1172c30/lib/python3.7/site-packages/distributed/core.py", line 874, in connect
    connection_args=self.connection_args,
  File "/opt/lsst/software/stack/python/miniconda3-4.5.12/envs/lsst-scipipe-1172c30/lib/python3.7/site-packages/distributed/comm/core.py", line 227, in connect
    _raise(error)
  File "/opt/lsst/software/stack/python/miniconda3-4.5.12/envs/lsst-scipipe-1172c30/lib/python3.7/site-packages/distributed/comm/core.py", line 204, in _raise
    raise IOError(msg)
OSError: Timed out trying to connect to 'tcp://10.46.128.14:36210' after 10 s: in <distributed.comm.tcp.TCPConnector object at 0x7feb07444240>: ConnectionRefusedError: [Errno 111] Connection refused
athornton commented 4 years ago

Here's something. I ran the problematic call (just a len on a large dataframe) under pdb and eventually got this (sometime overnight):

RuntimeError: Task <Task pending coro=<Client._update_scheduler_info() running at /opt/lsst/software/stack/python/miniconda3-4.5.12/envs/lsst-scipipe-1172c30/lib/python3.7/site-packages/distributed/client.py:1065> cb=[IOLoop.add_future.<locals>.<lambda>() at /opt/lsst/software/stack/python/miniconda3-4.5.12/envs/lsst-scipipe-1172c30/lib/python3.7/site-packages/tornado/ioloop.py:690]> got Future <Future pending> attached to a different loop
tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <zmq.eventloop.ioloop.ZMQIOLoop object at 0x7f2d19eea9b0>>, <Task finished coro=<Client._update_scheduler_info() done, defined at /opt/lsst/software/stack/python/miniconda3-4.5.12/envs/lsst-scipipe-1172c30/lib/python3.7/site-packages/distributed/client.py:1061> exception=RuntimeError('Task <Task pending coro=<Client._update_scheduler_info() running at /opt/lsst/software/stack/python/miniconda3-4.5.12/envs/lsst-scipipe-1172c30/lib/python3.7/site-packages/distributed/client.py:1065> cb=[IOLoop.add_future.<locals>.<lambda>() at /opt/lsst/software/stack/python/miniconda3-4.5.12/envs/lsst-scipipe-1172c30/lib/python3.7/site-packages/tornado/ioloop.py:690]> got Future <Future pending> attached to a different loop')>)

I have no idea what other event loop I could have in there.

athornton commented 4 years ago

More testing seems to show that the behavior is roughly linear in the size of the input. Which is odd, since I thought that was the reason to use dask. We're using 1751 parquet files. I can do smaller collections of them without issue (reading two columns and doing a length; each parquet file is approximately a million rows, and typically between 250MB and 500MB although the largest is 861MB and the smallest 49MB). At some place dependent on size (between 128 and 256 of these parquet files per core/3GB) the scheduler goes out to lunch and doesn't come back when it's deciding how to hand out work to its workers.

Here's the code (I'm using kubespawner to schedule worker nodes; this particular test is with the scheduler in one 12GB/3-core container, and each of two worker nodes in a 12GB/3-core container):

%pdb on

topdir=os.getenv('HOME') + "/parquetry"

​

for i in range(12):

    print("Testing with max {} parquet files".format(2**i))

    ddir=topdir + "/{}-files".format(i)

    df = dd.read_parquet(glob.glob(ddir+"/*"), columns=['l', 'b'], engine='fastparquet')

    ll=len(df)

    print("Len: {}".format(ll))

Here's the output. Which looks like you would expect, until it stops.

Automatic pdb calling has been turned ON
Testing with max 1 parquet files
Len: 1301974
Testing with max 2 parquet files
Len: 1651509
Testing with max 4 parquet files
Len: 2780223
Testing with max 8 parquet files
Len: 5543115
Testing with max 16 parquet files
Len: 12794175
Testing with max 32 parquet files
Len: 26406009
Testing with max 64 parquet files
Len: 59929083
Testing with max 128 parquet files
Len: 122619429
Testing with max 256 parquet files
Len: 246875365
Testing with max 512 parquet files

Here's the eventual error:

tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <zmq.eventloop.ioloop.ZMQIOLoop object at 0x7f13b6654e80>>, <Task finished coro=<ProfileTimePlot.trigger_update.<locals>.cb() done, defined at /opt/lsst/software/stack/python/miniconda3-4.5.12/envs/lsst-scipipe-1172c30/lib/python3.7/site-packages/distributed/dashboard/components.py:551> exception=CommClosedError('in <closed TCP>: Stream is closed')>)
Traceback (most recent call last):
  File "/opt/lsst/software/stack/python/miniconda3-4.5.12/envs/lsst-scipipe-1172c30/lib/python3.7/site-packages/distributed/comm/tcp.py", line 184, in read
    n_frames = await stream.read_bytes(8)
tornado.iostream.StreamClosedError: Stream is closed

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/lsst/software/stack/python/miniconda3-4.5.12/envs/lsst-scipipe-1172c30/lib/python3.7/site-packages/tornado/ioloop.py", line 743, in _run_callback
    ret = callback()
  File "/opt/lsst/software/stack/python/miniconda3-4.5.12/envs/lsst-scipipe-1172c30/lib/python3.7/site-packages/tornado/ioloop.py", line 767, in _discard_future_result
    future.result()
  File "/opt/lsst/software/stack/python/miniconda3-4.5.12/envs/lsst-scipipe-1172c30/lib/python3.7/site-packages/distributed/dashboard/components.py", line 561, in cb
    prof, metadata = await asyncio.gather(prof, metadata)
  File "/opt/lsst/software/stack/python/miniconda3-4.5.12/envs/lsst-scipipe-1172c30/lib/python3.7/site-packages/tornado/gen.py", line 742, in run
    yielded = self.gen.throw(*exc_info)  # type: ignore
  File "/opt/lsst/software/stack/python/miniconda3-4.5.12/envs/lsst-scipipe-1172c30/lib/python3.7/site-packages/distributed/scheduler.py", line 4602, in get_profile
    w: self.rpc(w).profile(start=start, stop=stop, key=key) for w in workers
  File "/opt/lsst/software/stack/python/miniconda3-4.5.12/envs/lsst-scipipe-1172c30/lib/python3.7/site-packages/tornado/gen.py", line 735, in run
    value = future.result()
  File "/opt/lsst/software/stack/python/miniconda3-4.5.12/envs/lsst-scipipe-1172c30/lib/python3.7/site-packages/tornado/gen.py", line 501, in callback
    result_list.append(f.result())
  File "/opt/lsst/software/stack/python/miniconda3-4.5.12/envs/lsst-scipipe-1172c30/lib/python3.7/site-packages/distributed/core.py", line 738, in send_recv_from_rpc
    result = await send_recv(comm=comm, op=key, **kwargs)
  File "/opt/lsst/software/stack/python/miniconda3-4.5.12/envs/lsst-scipipe-1172c30/lib/python3.7/site-packages/distributed/core.py", line 531, in send_recv
    response = await comm.read(deserializers=deserializers)
  File "/opt/lsst/software/stack/python/miniconda3-4.5.12/envs/lsst-scipipe-1172c30/lib/python3.7/site-packages/distributed/comm/tcp.py", line 204, in read
    convert_stream_closed_error(self, e)
  File "/opt/lsst/software/stack/python/miniconda3-4.5.12/envs/lsst-scipipe-1172c30/lib/python3.7/site-packages/distributed/comm/tcp.py", line 132, in convert_stream_closed_error
    raise CommClosedError("in %s: %s" % (obj, exc))
distributed.comm.core.CommClosedError: in <closed TCP>: Stream is closed
tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <zmq.eventloop.ioloop.ZMQIOLoop object at 0x7f13b6654e80>>, <Task finished coro=<ProfileTimePlot.trigger_update.<locals>.cb() done, defined at /opt/lsst/software/stack/python/miniconda3-4.5.12/envs/lsst-scipipe-1172c30/lib/python3.7/site-packages/distributed/dashboard/components.py:551> exception=CommClosedError('in <closed TCP>: Stream is closed')>)
Traceback (most recent call last):
  File "/opt/lsst/software/stack/python/miniconda3-4.5.12/envs/lsst-scipipe-1172c30/lib/python3.7/site-packages/distributed/comm/tcp.py", line 184, in read
    n_frames = await stream.read_bytes(8)
tornado.iostream.StreamClosedError: Stream is closed

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/lsst/software/stack/python/miniconda3-4.5.12/envs/lsst-scipipe-1172c30/lib/python3.7/site-packages/tornado/ioloop.py", line 743, in _run_callback
    ret = callback()
  File "/opt/lsst/software/stack/python/miniconda3-4.5.12/envs/lsst-scipipe-1172c30/lib/python3.7/site-packages/tornado/ioloop.py", line 767, in _discard_future_result
    future.result()
  File "/opt/lsst/software/stack/python/miniconda3-4.5.12/envs/lsst-scipipe-1172c30/lib/python3.7/site-packages/distributed/dashboard/components.py", line 561, in cb
    prof, metadata = await asyncio.gather(prof, metadata)
  File "/opt/lsst/software/stack/python/miniconda3-4.5.12/envs/lsst-scipipe-1172c30/lib/python3.7/site-packages/tornado/gen.py", line 742, in run
    yielded = self.gen.throw(*exc_info)  # type: ignore
  File "/opt/lsst/software/stack/python/miniconda3-4.5.12/envs/lsst-scipipe-1172c30/lib/python3.7/site-packages/distributed/scheduler.py", line 4602, in get_profile
    w: self.rpc(w).profile(start=start, stop=stop, key=key) for w in workers
  File "/opt/lsst/software/stack/python/miniconda3-4.5.12/envs/lsst-scipipe-1172c30/lib/python3.7/site-packages/tornado/gen.py", line 735, in run
    value = future.result()
  File "/opt/lsst/software/stack/python/miniconda3-4.5.12/envs/lsst-scipipe-1172c30/lib/python3.7/site-packages/tornado/gen.py", line 501, in callback
    result_list.append(f.result())
  File "/opt/lsst/software/stack/python/miniconda3-4.5.12/envs/lsst-scipipe-1172c30/lib/python3.7/site-packages/distributed/core.py", line 738, in send_recv_from_rpc
    result = await send_recv(comm=comm, op=key, **kwargs)
  File "/opt/lsst/software/stack/python/miniconda3-4.5.12/envs/lsst-scipipe-1172c30/lib/python3.7/site-packages/distributed/core.py", line 531, in send_recv
    response = await comm.read(deserializers=deserializers)
  File "/opt/lsst/software/stack/python/miniconda3-4.5.12/envs/lsst-scipipe-1172c30/lib/python3.7/site-packages/distributed/comm/tcp.py", line 204, in read
    convert_stream_closed_error(self, e)
  File "/opt/lsst/software/stack/python/miniconda3-4.5.12/envs/lsst-scipipe-1172c30/lib/python3.7/site-packages/distributed/comm/tcp.py", line 132, in convert_stream_closed_error
    raise CommClosedError("in %s: %s" % (obj, exc))
distributed.comm.core.CommClosedError: in <closed TCP>: Stream is closed
athornton commented 4 years ago

I may be on to something here.

If you load a directory with load_parquet, or you load too broad a glob (specifically, that includes _metadata)...I think you end up with a recursive traversal of _metadata, which causes a lot of churning. Maybe.

athornton commented 4 years ago

That's not the whole problem, although it was certainly not helping matters.

It sure looks like, when you do a scatter() or a persist(), it's trying to do all the work on the scheduler head node rather than the workers.

Any pointers to where I should start digging would be appreciated.

athornton commented 4 years ago

Dask 1.2.2/Distributed 1.28.1/Dask-kubernetes 0.8.0 doesn't appear to have this issue.

I'm pretty sure it's in the 1.x->2.x transition or in the dask-kubernetes changes to follow that, but I haven't gotten any farther nailing it down.

athornton commented 4 years ago

Further investigation: the breakage comes between dask 2.1.0 and dask 2.2.0. I suspect that outsourcing the file handling to fsspec is the underlying problem. That's unfortunate, since fsspec is clearly a good idea in the general case.

athornton commented 4 years ago

I also have to keep dask-kubernetes at 0.9.1; I think it's the Adaptive stuff in 0.9.2 that is problematic.

mrocklin commented 4 years ago

I suspect that outsourcing the file handling to fsspec is the underlying problem.

cc @martindurant

@athornton if you wanted to verify this you might consider trying out git bisect

For kubernetes and adaptive stuff you might also be interested in trying the rewrite in https://github.com/dask/dask-kubernetes/pull/162

athornton commented 4 years ago

Started down the bisect path, but maintenance at our long-term data facility plus incompatibility with distributed 2.5 (security attribute disappeared, I think?) meant I didn't get all the way through it today.

My hunch (but it remains to be proven) is that it's going to be between 9aec779 and 77d51a1. Since the failure mode is simply that nothing ever gets farmed out to the workers, and memory keeps climbing on the scheduler node until it keels over...and since simultaneously opening a large number of huge files such that the aggregate, if I were to read them all, is nowhere close to fitting into memory (which is exactly what I am doing by opening 1750 files at once which collectively comprise the Parquet representation of GAIA DR2, with read_parquet()) [the total data is on the order of 1 TB; the memory available in the container is 12GB] is a pretty niche use case (but quite typical of expected LSST workload, so it's not like this is a silly artificial benchmark for me)...I would not be at all surprised to find that that was a case that fsspec hadn't really planned for.

After I get that done I will try the dask-kubernetes PR and see how that works for me. This is all stuff that now will not happen until next week though.

mrocklin commented 4 years ago

The security bug has been fixed in a patch release. 2.5.1 is available on PyPI and is building now on conda-forge.

On Fri, Sep 27, 2019 at 4:43 PM Adam Thornton notifications@github.com wrote:

Started down the bisect path, but maintenance at our long-term data facility plus incompatibility with distributed 2.5 (security attribute disappeared, I think?) meant I didn't get all the way through it today.

My hunch (but it remains to be proven) is that it's going to be between 9aec779 and 77d51a1. Since the failure mode is simply that nothing ever gets farmed out to the workers, and memory keeps climbing on the scheduler node until it keels over...and since simultaneously opening a large number of huge files such that the aggregate, if I were to read them all, is nowhere close to fitting into memory (which is exactly what I am doing by opening 1750 files at once which collectively comprise the Parquet representation of GAIA DR2, with read_parquet()) is a pretty niche use case (but quite typical of expected LSST workload, so it's not like this is a silly artificial benchmark for me)...I would not be at all surprised to find that that was a case that fsspec hadn't really planned for.

After I get that done I will try the dask-kubernetes PR and see how that works for me. This is all stuff that now will not happen until next week though.

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/dask/distributed/issues/2880?email_source=notifications&email_token=AACKZTGILBZO4OYONXXWEW3QLZ46TA5CNFSM4IGRA2F2YY3PNVWWK3TUL52HS4DFVREXG43VMVBW63LNMVXHJKTDN5WW2ZLOORPWSZGOD72FHPQ#issuecomment-536105918, or mute the thread https://github.com/notifications/unsubscribe-auth/AACKZTF342CKPWXQX4K34ODQLZ46TANCNFSM4IGRA2FQ .

bolliger32 commented 4 years ago

We are seeing what I think is a related issue in dask-kubernetes in that sometimes when a scheduler is removed (e.g. from shutting down the kernel), a few workers can be stranded. It looks like they are in repeated loops to find the scheduler, but some IOError prevents them from triggering the death timeout feature. The repeated traceback we see (every 10s or so) is:

Traceback (most recent call last):
  File "/opt/conda/lib/python3.6/site-packages/distributed/comm/core.py", line 222, in connect
    _raise(error)
  File "/opt/conda/lib/python3.6/site-packages/distributed/comm/core.py", line 205, in _raise
    raise IOError(msg)
OSError: Timed out trying to connect to 'tcp://10.12.14.135:37109' after 10 s: in <distributed.comm.tcp.TCPConnector object at 0x7f9334372ac8>: ConnectionRefusedError: [Errno 111] Connection refused

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/conda/lib/python3.6/site-packages/tornado/ioloop.py", line 743, in _run_callback
    ret = callback()
  File "/opt/conda/lib/python3.6/site-packages/tornado/ioloop.py", line 767, in _discard_future_result
    future.result()
  File "/opt/conda/lib/python3.6/site-packages/distributed/worker.py", line 870, in heartbeat
    metrics=await self.get_metrics(),
  File "/opt/conda/lib/python3.6/site-packages/distributed/core.py", line 745, in send_recv_from_rpc
    comm = await self.pool.connect(self.addr)
  File "/opt/conda/lib/python3.6/site-packages/distributed/core.py", line 879, in connect
    connection_args=self.connection_args,
  File "/opt/conda/lib/python3.6/site-packages/distributed/comm/core.py", line 231, in connect
    _raise(error)
  File "/opt/conda/lib/python3.6/site-packages/distributed/comm/core.py", line 205, in _raise
    raise IOError(msg)
OSError: Timed out trying to connect to 'tcp://10.12.14.135:37109' after 10 s: Timed out trying to connect to 'tcp://10.12.14.135:37109' after 10 s: in <distributed.comm.tcp.TCPConnector object at 0x7f9334372ac8>: ConnectionRefusedError: [Errno 111] Connection refused

Haven't been able to diagnose why this isn't triggering a death timeout, and it certainly doesnt happen with all workers or every time a scheduler is shutdown immediately. Any ideas? This is with distributed 2.8.0, dask-kubernetes 0.10.0

TomAugspurger commented 4 years ago

@bolliger32 we're also seeing that in https://github.com/pangeo-data/pangeo-cloud-federation/issues/408 (also dask-kubernetes). I may be able to reproduce locally, so I'll look into this for a bit.

TomAugspurger commented 4 years ago

Seems like this fixes it for me locally

diff --git a/distributed/worker.py b/distributed/worker.py
index e3ef6b26..e5666aca 100644
--- a/distributed/worker.py
+++ b/distributed/worker.py
@@ -882,6 +882,12 @@ class Worker(ServerNode):
                 )
                 self.bandwidth_workers.clear()
                 self.bandwidth_types.clear()
+            except IOError as e:
+                # Scheduler is gone. Respect distributed.comm.timeouts.connect
+                if "Timed out trying to connect" in str(e):
+                    await self.close(report=False)
+                else:
+                    raise e
             except CommClosedError:
                 logger.warning("Heartbeat to scheduler failed")
             finally:

I start a scheduler with dask-scheduler, connect a worker, then kill the scheduler. Going to write up a test now and make a PR

cc @scottyhq

bolliger32 commented 4 years ago

thanks @TomAugspurger !!

alvarouc commented 3 years ago

I am using a local cluster and got the same error. Commenting this change out worked for me. As an alternative could you advise on how to change the "distributed.comm.timeouts.connect" setting?