dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.57k stars 717 forks source link

Broken Pipe when shutting down a simple Client() on KeyboardInterrupt #3384

Open jordaniac89 opened 4 years ago

jordaniac89 commented 4 years ago

Python 3.6.2 dask==2.9.1 distributed==2.9.1

I'm trying to shutdown a client when the main thread catches a keyboard exception with the following code:

if __name__ == "__main__":

    try:

        client = Client()

        # keep it running
        while True:
            pass

    except KeyboardInterrupt as ke:

        if client is not None:

            print('Shutting down client')
            client.shutdown()

However, I receive several connection errors when I do this:

^CShutting down client
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Restarting worker
distributed.nanny - ERROR - Nanny failed to start process
Traceback (most recent call last):
  File "/Users/jordanmiles/Documents/CTPDataLake/env/lib/python3.6/site-packages/distributed/nanny.py", line 522, in start
    await self.process.start()
  File "/Users/jordanmiles/Documents/CTPDataLake/env/lib/python3.6/site-packages/distributed/process.py", line 34, in _call_and_set_future
    res = func(*args, **kwargs)
  File "/Users/jordanmiles/Documents/CTPDataLake/env/lib/python3.6/site-packages/distributed/process.py", line 202, in _start
    process.start()
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/process.py", line 105, in start
    self._popen = self._Popen(self)
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/context.py", line 291, in _Popen
    return Popen(process_obj)
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/popen_forkserver.py", line 35, in __init__
    super().__init__(process_obj)
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/popen_fork.py", line 20, in __init__
    self._launch(process_obj)
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/popen_forkserver.py", line 51, in _launch
    self.sentinel, w = forkserver.connect_to_new_process(self._fds)
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/forkserver.py", line 66, in connect_to_new_process
    client.connect(self._forkserver_address)
ConnectionRefusedError: [Errno 61] Connection refused
distributed.nanny - ERROR - Nanny failed to start process
Traceback (most recent call last):
  File "/Users/jordanmiles/Documents/CTPDataLake/env/lib/python3.6/site-packages/distributed/nanny.py", line 522, in start
    await self.process.start()
  File "/Users/jordanmiles/Documents/CTPDataLake/env/lib/python3.6/site-packages/distributed/process.py", line 34, in _call_and_set_future
    res = func(*args, **kwargs)
  File "/Users/jordanmiles/Documents/CTPDataLake/env/lib/python3.6/site-packages/distributed/process.py", line 202, in _start
    process.start()
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/process.py", line 105, in start
    self._popen = self._Popen(self)
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/context.py", line 291, in _Popen
    return Popen(process_obj)
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/popen_forkserver.py", line 35, in __init__
    super().__init__(process_obj)
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/popen_fork.py", line 20, in __init__
    self._launch(process_obj)
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/popen_forkserver.py", line 51, in _launch
    self.sentinel, w = forkserver.connect_to_new_process(self._fds)
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/forkserver.py", line 66, in connect_to_new_process
    client.connect(self._forkserver_address)
ConnectionRefusedError: [Errno 61] Connection refused
distributed.nanny - ERROR - Nanny failed to start process
Traceback (most recent call last):
  File "/Users/jordanmiles/Documents/CTPDataLake/env/lib/python3.6/site-packages/distributed/nanny.py", line 522, in start
    await self.process.start()
  File "/Users/jordanmiles/Documents/CTPDataLake/env/lib/python3.6/site-packages/distributed/process.py", line 34, in _call_and_set_future
    res = func(*args, **kwargs)
  File "/Users/jordanmiles/Documents/CTPDataLake/env/lib/python3.6/site-packages/distributed/process.py", line 202, in _start
    process.start()
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/process.py", line 105, in start
    self._popen = self._Popen(self)
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/context.py", line 291, in _Popen
    return Popen(process_obj)
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/popen_forkserver.py", line 35, in __init__
    super().__init__(process_obj)
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/popen_fork.py", line 20, in __init__
    self._launch(process_obj)
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/popen_forkserver.py", line 51, in _launch
    self.sentinel, w = forkserver.connect_to_new_process(self._fds)
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/forkserver.py", line 66, in connect_to_new_process
    client.connect(self._forkserver_address)
ConnectionRefusedError: [Errno 61] Connection refused
distributed.nanny - ERROR - Nanny failed to start process
Traceback (most recent call last):
  File "/Users/jordanmiles/Documents/CTPDataLake/env/lib/python3.6/site-packages/distributed/nanny.py", line 522, in start
    await self.process.start()
  File "/Users/jordanmiles/Documents/CTPDataLake/env/lib/python3.6/site-packages/distributed/process.py", line 34, in _call_and_set_future
    res = func(*args, **kwargs)
  File "/Users/jordanmiles/Documents/CTPDataLake/env/lib/python3.6/site-packages/distributed/process.py", line 202, in _start
    process.start()
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/process.py", line 105, in start
    self._popen = self._Popen(self)
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/context.py", line 291, in _Popen
    return Popen(process_obj)
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/popen_forkserver.py", line 35, in __init__
    super().__init__(process_obj)
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/popen_fork.py", line 20, in __init__
    self._launch(process_obj)
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/popen_forkserver.py", line 51, in _launch
    self.sentinel, w = forkserver.connect_to_new_process(self._fds)
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/forkserver.py", line 66, in connect_to_new_process
    client.connect(self._forkserver_address)
ConnectionRefusedError: [Errno 61] Connection refused
Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/queues.py", line 247, in _feed
    send_bytes(obj)
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/connection.py", line 200, in send_bytes
    self._send_bytes(m[offset:offset + size])
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/connection.py", line 404, in _send_bytes
    self._send(header + buf)
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/connection.py", line 368, in _send
    n = write(self._handle, buf)
BrokenPipeError: [Errno 32] Broken pipe
Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/queues.py", line 247, in _feed
    send_bytes(obj)
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/connection.py", line 200, in send_bytes
    self._send_bytes(m[offset:offset + size])
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/connection.py", line 404, in _send_bytes
    self._send(header + buf)
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/connection.py", line 368, in _send
    n = write(self._handle, buf)
BrokenPipeError: [Errno 32] Broken pipe
Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/queues.py", line 247, in _feed
    send_bytes(obj)
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/connection.py", line 200, in send_bytes
    self._send_bytes(m[offset:offset + size])
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/connection.py", line 404, in _send_bytes
    self._send(header + buf)
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/connection.py", line 368, in _send
    n = write(self._handle, buf)
BrokenPipeError: [Errno 32] Broken pipe
Task exception was never retrieved
future: <Task finished coro=<BaseTCPConnector.connect() done, defined at /Users/jordanmiles/Documents/CTPDataLake/env/lib/python3.6/site-packages/distributed/comm/tcp.py:354> exception=CommClosedError('in <distributed.comm.tcp.TCPConnector object at 0x1189a0198>: ConnectionRefusedError: [Errno 61] Connection refused',)>
Traceback (most recent call last):
  File "/Users/jordanmiles/Documents/CTPDataLake/env/lib/python3.6/site-packages/distributed/comm/tcp.py", line 361, in connect
    ip, port, max_buffer_size=MAX_BUFFER_SIZE, **kwargs
  File "/Users/jordanmiles/Documents/CTPDataLake/env/lib/python3.6/site-packages/tornado/tcpclient.py", line 280, in connect
    af, addr, stream = await connector.start(connect_timeout=timeout)
  File "/Users/jordanmiles/Documents/CTPDataLake/env/lib/python3.6/site-packages/tornado/tcpclient.py", line 143, in on_connect_done
    stream = future.result()
tornado.iostream.StreamClosedError: Stream is closed

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/jordanmiles/Documents/CTPDataLake/env/lib/python3.6/site-packages/distributed/comm/tcp.py", line 373, in connect
    convert_stream_closed_error(self, e)
  File "/Users/jordanmiles/Documents/CTPDataLake/env/lib/python3.6/site-packages/distributed/comm/tcp.py", line 130, in convert_stream_closed_error
    raise CommClosedError("in %s: %s: %s" % (obj, exc.__class__.__name__, exc))
distributed.comm.core.CommClosedError: in <distributed.comm.tcp.TCPConnector object at 0x1189a0198>: ConnectionRefusedError: [Errno 61] Connection refused

I've tried using client.shutdown() as well as client.stop() but nothing seems to be fixing it. I was also thinking that the scheduler might be getting closed before the workers, so I attempted to shutdown the workers first with

workers = list(client.scheduler_info()['workers'])
client.run_on_scheduler(lambda scheduler=None: scheduler.retire_workers(workers, close_workers=True))

client.shutdown()

But that just gives me 'NoneType' object has no attribute 'retire_workers' as well as the stacktrace mentioned above. Any help would be greatly appreciated

jordaniac89 commented 4 years ago

FYI, I fixed the 'NoneType' object has no attribute 'retire_workers'. Realized the variable should be called "dask_scheduler" instead of "scheduler". Still receiving the broken pipe stacktraces, though.

TomAugspurger commented 4 years ago

Can you share a bit more about how you start your scheduler and workers? I don't see any unusual output when running your code locally.

jordaniac89 commented 4 years ago

Hi Tom, The first block of code I shared is basically it. Just starting up the LocalCluster that is created when creating the Client with no arguments. I ran it both in PyCharm and through the python3 CLI and receive that same error both ways. I may also try upgrading my python version and see if that resolves it.

mayanksatnalika commented 4 years ago

Can confirm this issue, running it in IPython shell, Python3

from dask.distributed import Client, LocalCluster

cluster = LocalCluster()
client = Client(cluster)

exit

This throws up the same error as posted above

jrbourbeau commented 4 years ago

@jordaniac89 or @mayanksatnalika would either of you be willing to try using the master branch of distributed? I'm not able to reproduce the broken pipe error on Python 3.6 or 3.7 when using the dev version of distributed.

mayanksatnalika commented 4 years ago

Hi, for me the issue happened when I installed in a fresh conda environment, python version Python 3.6.7 , when I installed dask with conda and then dask['complete'] with pip. Do you want me to build from source using the master branch.

jrbourbeau commented 4 years ago

Yeah, that'd be great if you have the time. You can install the current master branch version of Dask into your environment with pip install git+https://github.com/dask/dask.git

jordaniac89 commented 4 years ago

I will also give this a shot when I get a chance.

bebosudo commented 4 years ago

Hi, I'm seeing the same errors using also the latest version from git dask==2.10.1+17.g2627a7d. I setup a for loop and I'm getting broken pipe errors at program termination around 20-30% of the runs.

The errors I'm getting range from the simple:

Traceback (most recent call last):
  File "/opt/software/linux-centos7-ivybridge/gcc/8.2.0/python/3.7.2-cbouetvcy45ux7gwez3367vnomy7fajl/lib/python3.7/multiprocessing/queues.py", line 242, in _feed
    send_bytes(obj)
  File "/opt/software/linux-centos7-ivybridge/gcc/8.2.0/python/3.7.2-cbouetvcy45ux7gwez3367vnomy7fajl/lib/python3.7/multiprocessing/connection.py", line 200, in send_bytes
    self._send_bytes(m[offset:offset + size])
  File "/opt/software/linux-centos7-ivybridge/gcc/8.2.0/python/3.7.2-cbouetvcy45ux7gwez3367vnomy7fajl/lib/python3.7/multiprocessing/connection.py", line 404, in _send_bytes
    self._send(header + buf)
  File "/opt/software/linux-centos7-ivybridge/gcc/8.2.0/python/3.7.2-cbouetvcy45ux7gwez3367vnomy7fajl/lib/python3.7/multiprocessing/connection.py", line 368, in _send
    n = write(self._handle, buf)
BrokenPipeError: [Errno 32] Broken pipe

to the more complex:

Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "/python/3.7.2/lib/python3.7/multiprocessing/forkserver.py", line 243, in main
Traceback (most recent call last):
    fds = reduction.recvfds(s, MAXFDS_TO_SEND + 1)
  File "/python/3.7.2/lib/python3.7/multiprocessing/reduction.py", line 155, in recvfds
  File "/python/3.7.2/lib/python3.7/multiprocessing/queues.py", line 242, in _feed
    send_bytes(obj)
  File "/python/3.7.2/lib/python3.7/multiprocessing/connection.py", line 200, in send_bytes
    self._send_bytes(m[offset:offset + size])
  File "/python/3.7.2/lib/python3.7/multiprocessing/connection.py", line 404, in _send_bytes
    self._send(header + buf)
  File "/python/3.7.2/lib/python3.7/multiprocessing/connection.py", line 368, in _send
    n = write(self._handle, buf)
BrokenPipeError: [Errno 32] Broken pipe
    raise EOFError
EOFError
distributed.nanny - ERROR - Nanny failed to start process
Traceback (most recent call last):
  File "~/.venvs/ogs_test_daskfromgit/lib/python3.7/site-packages/distributed/nanny.py", line 522, in start
    await self.process.start()
  File "~/.venvs/ogs_test_daskfromgit/lib/python3.7/site-packages/distributed/process.py", line 34, in _call_and_set_future
    res = func(*args, **kwargs)
  File "~/.venvs/ogs_test_daskfromgit/lib/python3.7/site-packages/distributed/process.py", line 202, in _start
    process.start()
  File "/python/3.7.2/lib/python3.7/multiprocessing/process.py", line 112, in start
    self._popen = self._Popen(self)
  File "/python/3.7.2/lib/python3.7/multiprocessing/context.py", line 291, in _Popen
    return Popen(process_obj)
  File "/python/3.7.2/lib/python3.7/multiprocessing/popen_forkserver.py", line 35, in __init__
    super().__init__(process_obj)
  File "/python/3.7.2/lib/python3.7/multiprocessing/popen_fork.py", line 20, in __init__
    self._launch(process_obj)
  File "/python/3.7.2/lib/python3.7/multiprocessing/popen_forkserver.py", line 51, in _launch
    self.sentinel, w = forkserver.connect_to_new_process(self._fds)
  File "/python/3.7.2/lib/python3.7/multiprocessing/forkserver.py", line 75, in connect_to_new_process
    reduction.sendfds(client, allfds)
  File "/python/3.7.2/lib/python3.7/multiprocessing/reduction.py", line 145, in sendfds
    sock.sendmsg([msg], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, fds)])
OSError: [Errno 9] Bad file descriptor

My script is now setup like this:

def main(client, args):
    try:
        do_business_logic(client, args)

    # https://docs.python.org/3/library/signal.html#note-on-sigpipe
    except BrokenPipeError as exc:
        lgg.warning("BrokenPipeError captured")
    except socket.error as exc:
        if isinstance(exc.args, tuple) and exc[0] == errno.EPIPE:
            lgg.warning("Broken pipe errno encountered")

if __name__ == "__main__":
    args = parse_cli()
    # The dask client needs to be configured in a `if name == main' clause.
    with Client(n_workers=args.num_processes, threads_per_worker=1) as client:
        lgg.info("Using dask with {}".format(client))

        main(client, args)
        lgg.info("Completed")

That try/except in the main function seems to be ignored. The error shows up after the "Completed" appears in the logs, so when the program has completely terminated its logic. I'd be fine just having a way to silence these errors.

mrocklin commented 4 years ago

There is some chance that this has been resolved with the recent change from forkserver to spawn. If anyone has time to try master (just updated) and report back I would appreciate it.

bebosudo commented 4 years ago

I've been trying the latest versions of both dask and distributed (dask==2.10.1+19.g5f61f7f and distributed==2.10.0+31.g49328dc) and I set my script in a for loop. With the previous forkserver multiprocessing method I was getting a Broken pipe around 1/4th of the times, while for the past hour it has been running smoothly with the spawn method.

I've only encountered a much rarer "Stream is closed exception", also at the termination of the script:

tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOLoop object at 0x2aeacb636c18>>, <Task finished coro=<Worker.heartbeat() done, defined at ~/.venvs/daskfromgit/lib/python3.7/site-packages/distributed/worker.py:883> exception=CommClosedError('in <closed TCP>: Stream is closed')>)
Traceback (most recent call last):
  File "~/.venvs/daskfromgit/lib/python3.7/site-packages/distributed/comm/tcp.py", line 188, in read
    n_frames = await stream.read_bytes(8)
tornado.iostream.StreamClosedError: Stream is closed

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "~/.venvs/daskfromgit/lib/python3.7/site-packages/tornado/ioloop.py", line 743, in _run_callback
    ret = callback()
  File "~/.venvs/daskfromgit/lib/python3.7/site-packages/tornado/ioloop.py", line 767, in _discard_future_result
    future.result()
  File "~/.venvs/daskfromgit/lib/python3.7/site-packages/distributed/worker.py", line 920, in heartbeat
    raise e
  File "~/.venvs/daskfromgit/lib/python3.7/site-packages/distributed/worker.py", line 893, in heartbeat
    metrics=await self.get_metrics(),
  File "~/.venvs/daskfromgit/lib/python3.7/site-packages/distributed/utils_comm.py", line 391, in retry_operation
    operation=operation,
  File "~/.venvs/daskfromgit/lib/python3.7/site-packages/distributed/utils_comm.py", line 379, in retry
    return await coro()
  File "~/.venvs/daskfromgit/lib/python3.7/site-packages/distributed/core.py", line 757, in send_recv_from_rpc
    result = await send_recv(comm=comm, op=key, **kwargs)
  File "~/.venvs/daskfromgit/lib/python3.7/site-packages/distributed/core.py", line 540, in send_recv
    response = await comm.read(deserializers=deserializers)
  File "~/.venvs/daskfromgit/lib/python3.7/site-packages/distributed/comm/tcp.py", line 208, in read
    convert_stream_closed_error(self, e)
  File "~/.venvs/daskfromgit/lib/python3.7/site-packages/distributed/comm/tcp.py", line 123, in convert_stream_closed_error
    raise CommClosedError("in %s: %s" % (obj, exc))
distributed.comm.core.CommClosedError: in <closed TCP>: Stream is closed

May it still be related? I've found a hundred of issues around "stream is closed", so probably it's unrelated?

mrocklin commented 4 years ago

With the previous forkserver multiprocessing method I was getting a Broken pipe around 1/4th of the times, while for the past hour it has been running smoothly with the spawn method.

Woo! Thank you for taking the time to test this out @bebosudo

mrocklin commented 4 years ago

I've only encountered a much rarer "Stream is closed exception", also at the termination of the script:

From the traceback it looks like maybe we shut things down during a heartbeat. Probably we should try-except around this and avoid raising the exception if we're in the process of closing.

bebosudo commented 4 years ago

I've only encountered a much rarer "Stream is closed exception", also at the termination of the script:

From the traceback it looks like maybe we shut things down during a heartbeat. Probably we should try-except around this and avoid raising the exception if we're in the process of closing.

Let me know whether I can help testing out new stuff (feel free to ping me)!

I've done some investigation of the errors, and the second exception is generated in distributed/tcp.py:208, which is inside an if not shutting_down(), which is a function defined as https://github.com/dask/distributed/blob/12a4f2d38f131ddb63849d1be0805f414fa6d816/distributed/utils.py#L982-L985

So it means that the interpreter (the main one or one of the spawned ones?) sometimes does not detect that it's shutting down and it lets the CommClosedError exception propagate.

There's also an else branch (for py <3.5 compatibility, I guess) with a more "handcrafted" setup to detect an interpreter shutdown, by registering a function inside atexit. https://github.com/dask/distributed/blob/12a4f2d38f131ddb63849d1be0805f414fa6d816/distributed/utils.py#L988-L997 I'll try to force using the else branch to see whether the "old way" works better.