coiled / feedback

A place to provide Coiled feedback
14 stars 3 forks source link

TimeoutError when using existing cluster #123

Closed rubenvdg closed 3 years ago

rubenvdg commented 3 years ago

I have a cluster running ("test-cluster"), which is in use by a Python process (and currently running stuff).

If (in another Python session) I try to connect to the same cluster I catch a timeout:

import coiled
cluster = coiled.Cluster(name="test-cluster")
Using existing cluster: 'test-cluster'

--------------------------------------------------------
TimeoutError           Traceback (most recent call last)
/opt/conda/envs/coiled/lib/python3.8/site-packages/distributed/comm/core.py in connect()

/opt/conda/envs/coiled/lib/python3.8/asyncio/tasks.py in wait_for()

TimeoutError: 

The above exception was the direct cause of the following exception:

OSError                Traceback (most recent call last)
<ipython-input-7-ef47d0fe196a> in <module>
      1 import coiled
      2 
----> 3 cluster = coiled.Cluster(name="test-cluster")

~/.pyenv/versions/3.8.6/envs/ezra/lib/python3.8/site-packages/coiled/cluster.py in __init__(self, n_workers, configuration, software, worker_cpu, worker_gpu, worker_memory, worker_class, worker_options, scheduler_cpu, scheduler_memory, scheduler_class, scheduler_options, name, asynchronous, cloud, account, shutdown_on_close, backend_options, credentials)
    153         self._name = "coiled.Cluster"  # Used in Dask's Cluster._ipython_display_
    154         if not self.asynchronous:
--> 155             self.sync(self._start)
    156 
    157     @property

~/.pyenv/versions/3.8.6/envs/ezra/lib/python3.8/site-packages/distributed/deploy/cluster.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    187             return future
    188         else:
--> 189             return sync(self.loop, func, *args, **kwargs)
    190 
    191     def _log(self, log):

~/.pyenv/versions/3.8.6/envs/ezra/lib/python3.8/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
    338     if error[0]:
    339         typ, exc, tb = error[0]
--> 340         raise exc.with_traceback(tb)
    341     else:
    342         return result[0]

~/.pyenv/versions/3.8.6/envs/ezra/lib/python3.8/site-packages/distributed/utils.py in f()
    322             if callback_timeout is not None:
    323                 future = asyncio.wait_for(future, callback_timeout)
--> 324             result[0] = yield future
    325         except Exception as exc:
    326             error[0] = sys.exc_info()

~/.pyenv/versions/3.8.6/envs/ezra/lib/python3.8/site-packages/tornado/gen.py in run(self)
    760 
    761                     try:
--> 762                         value = future.result()
    763                     except Exception:
    764                         exc_info = sys.exc_info()

~/.pyenv/versions/3.8.6/envs/ezra/lib/python3.8/site-packages/coiled/cluster.py in _start(self)
    225             )
    226 
--> 227             await self._send_credentials()
    228 
    229             await super()._start()

~/.pyenv/versions/3.8.6/envs/ezra/lib/python3.8/site-packages/coiled/cluster.py in _send_credentials(self)
    403                 if creds:
    404                     # TODO: set up TTL, and update these credentials periodically
--> 405                     await self.scheduler_comm.aws_update_credentials(
    406                         credentials={
    407                             k: creds.get(k)

~/.pyenv/versions/3.8.6/envs/ezra/lib/python3.8/site-packages/distributed/core.py in send_recv_from_rpc(**kwargs)
    788                 comm = await self.live_comm()
    789                 comm.name = "rpc." + key
--> 790                 result = await send_recv(comm=comm, op=key, **kwargs)
    791             except (RPCClosed, CommClosedError) as e:
    792                 raise e.__class__(

~/.pyenv/versions/3.8.6/envs/ezra/lib/python3.8/site-packages/distributed/core.py in send_recv(comm, reply, serializers, deserializers, **kwargs)
    658         if comm.deserialize:
    659             typ, exc, tb = clean_exception(**response)
--> 660             raise exc.with_traceback(tb)
    661         else:
    662             raise Exception(response["text"])

/opt/conda/envs/coiled/lib/python3.8/site-packages/distributed/core.py in handle_comm()

https://cloud.coiled.io/preloads/aws-credentials.py in update_credentials()

/opt/conda/envs/coiled/lib/python3.8/site-packages/distributed/scheduler.py in register_worker_plugin()

/opt/conda/envs/coiled/lib/python3.8/site-packages/distributed/scheduler.py in broadcast()

/opt/conda/envs/coiled/lib/python3.8/site-packages/distributed/utils.py in All()

/opt/conda/envs/coiled/lib/python3.8/site-packages/distributed/scheduler.py in send_message()

/opt/conda/envs/coiled/lib/python3.8/site-packages/distributed/core.py in connect()

/opt/conda/envs/coiled/lib/python3.8/site-packages/distributed/comm/core.py in connect()

OSError: Timed out trying to connect to tls://10.2.11.147:33283 after 10 s

I'm running the default coiled env.

Is this expected behavior? I.e. is it impossible to connect to an existing cluster if it's in use by another process?

FabioRosado commented 3 years ago

Hello, Ruben thank you for your question. You should be able to do so. I tested by running the quickstart and attempted to connect to the cluster using its name while getting the tip amount's mean.

It might be useful to know what the Python process is doing - I had a look at your logs and seen a KeyError on some workers and then a TLS handshake failed with remote log.

Looking at some Worker logs I've noticed this log:

Event loop was unresponsive in Worker for 14.20s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.

What I'm guessing here is that perhaps the Python process made the loop unresponsive at a similar time that you tried to connect and that caused the SSL handshake issue and the timeout?

Can you attempt to launch a cluster, run some computation and attempt to connect to the same cluster in the middle of the computation to see if you can do so? If you , that could prove my suspicion 🤔

rubenvdg commented 3 years ago

Thanks! Will look into it. The workers are indeed reading large (compressed) chunks of data.

FabioRosado commented 3 years ago

Ruben have you seen this issue popup again? Or were you able to do your computations without issues?

rubenvdg commented 3 years ago

Thanks for checking in. We are still a bit in doubt how/where to set the configs (with timeouts) of distributed clusters.

We now do something along the lines of:

cluster = coiled.Cluster(name="clustert")
client = Client(cluster)

def set_config(key: str, value: str) -> str:
    import dask
    dask.config.set({key: value})
    return dask.config.get(key)

client.run_on_scheduler(set_config, "distributed.comm.timeouts.connect", "300s")
client.run(set_config, "distributed.comm.timeouts.connect", "300s")

which seems to work, but we're still unsure if this is the proper way to go.

What is best practice for setting dask config? Should it be set on the workers as well? Also, you can set a timeout via Client, but that seems to be ignored for a distributed cluster.

All in all, we're making progress :-).

rubenvdg commented 3 years ago

We also set distributed.comm.timeouts.tcp to 600 btw. But from the dask github issues, you can see that the timeout handling is a problem more people run into (especially when running long-running tasks on workers).

FabioRosado commented 3 years ago

Thank you for the update - I've also noticed that we might get a timeout error with the latest distributed version when trying to connect to a running cluster. Just something to be aware of if your cluster is running the latest version and locally you are running an older version.

Looking at how you are setting up your timeouts I'd say that's a good way to do it. I will double-check with the team to see if we can set this up in a different way

FabioRosado commented 3 years ago

Hello Ruben, just wanted to update my previous comment. This is the correct way to update the timeout settings on the scheduler, I've added a quick troubleshooting article to our knowledge base, once we implement #75 you will be able to change the configuration for the scheduler or/and workers with the coiled.Cluster constructor.

I am closing this issue now, but if you need any further help please feel free to open a new issue or reach out to us.