dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.58k stars 718 forks source link

Too many file descriptors issue from own code instantiating too many Clients. #1472

Open aavanian opened 7 years ago

aavanian commented 7 years ago

I have been using dask+distributed for a while: I have a python script running every morning that launches a LocalCluster(n_workers=4), load some data, process it a bit, persist and publish as a dataset. From there, several clients (either launched from the same computer or remotely) will connect and do some querying on that same dataset.

Occasionally, the cluster would stop being responsive. I could never pinpoint the cause (the way I launch the cluster prevents me to easily see stderr and I hadn't bother fixing that yet) and a relaunch would fix it.

Recently however, I got a really weird issue: after a few issues like above in the same day and a few cluster relaunch, I couldn't launch a LocalCluster anymore. If launched outside the process that masks stderr, I could see ValueError: too many file descriptors in select() and a bunch of tornado.iostream.StreamClosedError: Stream is closed and distributed.comm.core.CommClosedError: in <closed TCP>: Stream is closed.

But I could still launch dask-scheduler + dask-workers without issue from the command line... Very surprisingly, even after a pc restart, this situation (no LocalCluster working but command-line launch ok) persists.

The next day, working from manually launched scheduler and workers, I was getting logs. The cluster stopped being responsive again but I finally understood the issue: my code is leaking Client instances.

A typical computing function I use would do something akin to:

def compute_thing(*args, **kwargs):
    dask_client = Client(...)   # here's there's some internal configuration so it knows which ip/port to use
    df = dask_client.get_dataset('dataset')
    return _compute_thing(df)  # <- this function does nothing re. scheduling. Apart from the metadata, it works as well on a pandas or dask df

And it looks like the dask_client connections don't get severed automatically (totally possible python gc would choke on that, fair enough, that's on me). And of course, I would call 10s if not 100s such function throughout the day and at some point that's too much:

distributed.scheduler - INFO - Receive client connection: DaskClient-f889ffda-afeb-11e7-abd8-64006a6df23b
distributed.scheduler - INFO - Receive client connection: DaskClient-f8cc46dc-afeb-11e7-abd8-64006a6df23b
distributed.scheduler - INFO - Receive client connection: DaskClient-f90b0ac0-afeb-11e7-abd8-64006a6df23b
distributed.scheduler - INFO - Receive client connection: DaskClient-f948e288-afeb-11e7-abd8-64006a6df23b
distributed.scheduler - INFO - Receive client connection: DaskClient-f989ef36-afeb-11e7-abd8-64006a6df23b
distributed.scheduler - INFO - Receive client connection: DaskClient-f9ce03dc-afeb-11e7-abd8-64006a6df23b
distributed.scheduler - INFO - End scheduler at 'tcp://:8786'
Traceback (most recent call last):
  File "C:\Users\...\Scripts\dask-scheduler-script.py", line 5, in <module>
    sys.exit(distributed.cli.dask_scheduler.go())
  File "C:\Users\...\lib\site-packages\distributed\cli\dask_scheduler.py", line 153, in go
    main()
  File "C:\Users\...\lib\site-packages\click\core.py", line 722, in __call__
    return self.main(*args, **kwargs)
  File "C:\Users\...\lib\site-packages\click\core.py", line 697, in main
    rv = self.invoke(ctx)
  File "C:\Users\...\lib\site-packages\click\core.py", line 895, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "C:\Users\...\lib\site-packages\click\core.py", line 535, in invoke
    return callback(*args, **kwargs)
  File "C:\Users\...\lib\site-packages\distributed\cli\dask_scheduler.py", line 140, in main
    loop.start()
  File "C:\Users\...\lib\site-packages\tornado\ioloop.py", line 863, in start
    event_pairs = self._impl.poll(poll_timeout)
  File "C:\Users\...\lib\site-packages\tornado\platform\select.py", line 63, in poll
    self.read_fds, self.write_fds, self.error_fds, timeout)
ValueError: too many file descriptors in select()

For what it's worth, there were 260 client connections, it failed after receiving the 260th or on receiving the 261st. The scheduler was running with 4x4 workers (by the way, I was surprised that LocalClient(n_workers=4) means 4 processes but launching dask-worker 4 times with default parameters ended up with 16 processes. Note: the pc has 4 cores).

So questions/notes:

End. Sorry wall of text.

system: Windows 10 dask 0.15.2 and 0.15.4 distributed 1.18.2 and 1.19.2

mrocklin commented 7 years ago

Yeah, I wouldn't be surprised to find circular references in the client. If you have any interest in tracking those down and cleaning this up that would be very welcome. But I understand if you have other more pressing things to work on.

There is a client singleton, I recommend using Client.current() or default_client() after you have created a client.

aavanian commented 7 years ago

Oh, great. Did a quick test and that will be easy to use.

As for letting clients go. I'll give it a try but on my off-time so might take a while, especially as it's a bit out of my usual coding experience.

d-chambers commented 7 years ago

I am having a similar issue.

My setup: Ubuntu 16.04, Python 3.6.2, Distributed 1.19.3

If i do this:

from distributed import Client
cl1 = Client()
cl2 = Client()
cl3 = Client()

The following traceback is raised many many times until I kill it, although the directory in /proc is different in each traceback.

tornado.application - ERROR - Multiple exceptions in yield list Traceback (most recent call last): File "/home/isti_ew/anaconda3/lib/python3.6/site-packages/tornado/gen.py", lik result_list.append(f.result()) File "/home/isti_ew/anaconda3/lib/python3.6/site-packages/tornado/concurrent.t raise_exc_info(self._exc_info) File "", line 4, in raise_exc_info File "/home/isti_ew/anaconda3/lib/python3.6/site-packages/tornado/gen.py", lin yielded = self.gen.throw(*exc_info) File "/home/isti_ew/anaconda3/lib/python3.6/site-packages/distributed/deploy/r yield w._start() File "/home/isti_ew/anaconda3/lib/python3.6/site-packages/tornado/gen.py", lin value = future.result() File "/home/isti_ew/anaconda3/lib/python3.6/site-packages/tornado/concurrent.t raise_exc_info(self._exc_info) File "", line 4, in raise_exc_info File "/home/isti_ew/anaconda3/lib/python3.6/site-packages/tornado/gen.py", lin yielded = self.gen.send(value) File "/home/isti_ew/anaconda3/lib/python3.6/site-packages/distributed/nanny.pt assert self.worker_address AssertionError

During handling of the above exception, another exception occurred:

Traceback (most recent call last): File "/home/isti_ew/anaconda3/lib/python3.6/site-packages/tornado/gen.py", lik result_list.append(f.result()) File "/home/isti_ew/anaconda3/lib/python3.6/site-packages/tornado/concurrent.t raise_exc_info(self._exc_info) File "", line 4, in raise_exc_info File "/home/isti_ew/anaconda3/lib/python3.6/site-packages/tornado/gen.py", lir yielded = next(result) File "/home/isti_ew/anaconda3/lib/python3.6/site-packages/distributed/deploy/r silence_logs=self.silence_logs, kwargs) File "/home/istiew/anaconda3/lib/python3.6/site-packages/distributed/nanny.p kwargs) File "/home/istiew/anaconda3/lib/python3.6/site-packages/distributed/node.py deserialize=deserialize, io_loop=self.io_loop) File "/home/istiew/anaconda3/lib/python3.6/site-packages/distributed/core.py self.monitor = SystemMonitor() File "/home/isti_ew/anaconda3/lib/python3.6/site-packages/distributed/system self.proc = psutil.Process() File "/home/istiew/anaconda3/lib/python3.6/site-packages/psutil/init.py" self._init(pid) File "/home/isti_ew/anaconda3/lib/python3.6/site-packages/psutil/init.py"t self.create_time() File "/home/isti_ew/anaconda3/lib/python3.6/site-packages/psutil/init__.py"e self._create_time = self._proc.create_time() File "/home/isti_ew/anaconda3/lib/python3.6/site-packages/psutil/_pslinux.py"r return fun(self, *args, kwargs) File "/home/isti_ew/anaconda3/lib/python3.6/site-packages/psutil/_pslinux.py"e values = self._parse_stat_file() File "/home/isti_ew/anaconda3/lib/python3.6/site-packages/psutil/_common.py",r return fun(self) File "/home/isti_ew/anaconda3/lib/python3.6/site-packages/psutil/_pslinux.py"e with open_binary("%s/%s/stat" % (self._procfs_path, self.pid)) as f: File "/home/isti_ew/anaconda3/lib/python3.6/site-packages/psutil/_pslinux.py"y return open(fname, "rb", kwargs) OSError: [Errno 24] Too many open files: '/proc/38739/stat'

I realize one should not do this, but it seems like an awfully easy way to cause a catastrophic failure.

viclafargue commented 3 years ago

I know that this issue is quite old, but it looks like the problem may still remain. I am running some pytests that require a client connection for every test. After a given number of connections the scheduler crashes. I thought that it might be related to hitting the limit on the number of file descriptors as well. Would you know what could be causing this issue?

Environment :

dask                      2021.2.0           pyhd8ed1ab_0    conda-forge
distributed               2021.2.0         py38h578d9bd_0    conda-forge
ucx                       1.9.0+gcd9efd3       cuda11.0_0    rapidsai-nightly
ucx-proc                  1.0.0                       gpu    rapidsai-nightly
ucx-py                    0.18.0a210225   py38_gcd9efd3_19    rapidsai-nightly

Here is a small reproducer:

scheduler.sh :

export DASK_RMM__POOL_SIZE=500M
export DASK_UCX__CUDA_COPY=True
export DASK_UCX__TCP=True
export DASK_UCX__NVLINK=False
export DASK_UCX__INFINIBAND=True
export UCX_HANDLE_ERRORS=bt

dask-scheduler --protocol ucx --scheduler-file scheduler-file.json > scheduler.log 2>&1 &

client.py :

from dask.distributed import Client

for i in range(3000):
    client = Client(scheduler_file="scheduler-file.json")
    client.close()

Scheduler error :

[7fdc1beea28f:1156 :1:1160] sockcm_iface.c:261  Fatal: sockcm_listener: unable to create handler for new connection
==== backtrace (tid:   1160) ====
 0  /opt/conda/envs/rapids/lib/python3.8/site-packages/ucp/_libs/../../../../libucs.so.0(ucs_handle_error+0x115) [0x7fa147551ee5]
 1  /opt/conda/envs/rapids/lib/python3.8/site-packages/ucp/_libs/../../../../libucs.so.0(ucs_fatal_error_message+0x51) [0x7fa14754ec41]
 2  /opt/conda/envs/rapids/lib/python3.8/site-packages/ucp/_libs/../../../../libucs.so.0(ucs_fatal_error_format+0xf5) [0x7fa14754edf5]
 3  /opt/conda/envs/rapids/lib/python3.8/site-packages/ucp/_libs/../../../.././libuct.so.0(+0x2877d) [0x7fa14751477d]
 4  /opt/conda/envs/rapids/lib/python3.8/site-packages/ucp/_libs/../../../../libucs.so.0(+0x12364) [0x7fa14753e364]
 5  /opt/conda/envs/rapids/lib/python3.8/site-packages/ucp/_libs/../../../../libucs.so.0(+0x124fb) [0x7fa14753e4fb]
 6  /opt/conda/envs/rapids/lib/python3.8/site-packages/ucp/_libs/../../../../libucs.so.0(ucs_async_dispatch_handlers+0x3b) [0x7fa14753e6bb]
 7  /opt/conda/envs/rapids/lib/python3.8/site-packages/ucp/_libs/../../../../libucs.so.0(+0x16343) [0x7fa147542343]
 8  /opt/conda/envs/rapids/lib/python3.8/site-packages/ucp/_libs/../../../../libucs.so.0(ucs_event_set_wait+0x105) [0x7fa147557af5]
 9  /opt/conda/envs/rapids/lib/python3.8/site-packages/ucp/_libs/../../../../libucs.so.0(+0x16500) [0x7fa147542500]
10  /usr/lib/x86_64-linux-gnu/libpthread.so.0(+0x9609) [0x7fa14ef3f609]
11  /usr/lib/x86_64-linux-gnu/libc.so.6(clone+0x43) [0x7fa14ecff293]
=================================

Client error :

sock.c:451  UCX  ERROR recv(fd=1561) failed: Connection reset by peer
sockcm_ep.c:151  UCX  ERROR failed to connect to 172.17.0.2:8786
quasiben commented 3 years ago

@viclafargue I think this error, while related to file descriptors, is most likely due to issues with running UCX with SOCKCM. As you are using Infiniband, RDMACM should be used instead of SOCKCM. We and UCX devs are working to resolve some latent issue with using RDMACM with Dask and this should resolve the underlying FD issue here. cc @pentschev

pentschev commented 3 years ago

I discussed this offline last week with @viclafargue , but latest I heard from him was the he was able to reproduce this with protocol="tcp" as well, is that really the case @viclafargue ? If that is, we're better suited to discuss that case here and UCX elsewhere.

viclafargue commented 3 years ago

Thanks for your answers. Actually, after a closer look, it seems like there was a problem in the way I did the test with the TCP protocol. Indeed, this test ran against a scheduler configured with the TCP protocol do not trigger any error. Hope we will soon have a stable RDMACM.