dask / dask-kubernetes

Native Kubernetes integration for Dask
https://kubernetes.dask.org
BSD 3-Clause "New" or "Revised" License
312 stars 148 forks source link

KubeCluster(scheduler_service_wait_timeout) parameter is not working. #367

Closed mb-qco closed 3 years ago

mb-qco commented 3 years ago

Hi,

Thank you for providing Dask functionality on Kubernetes.

I am encountering a problem with the KubeCluster() method whilst deploying in mode=remote, by passing a scheduler_pod_template of type kubernetes.client.v1pod, and configuring Dask with the scheduler service of type LoadBalancer.

This issue refers to the scheduler_service_wait_timeout parameter in particular. When setting the value to 300 (as in three hundred seconds and type of int), the KubeCluster() method still times out after 30s and I get an error, even if my scheduler Pod and Service are running without issue. Regardless of which value I pass to the parameter, I timeout after 30 seconds. This timeout triggers a cleanup and kills the Pod and Service in my EKS cluster accordingly.

I've read through the code, and I do not see where this timeout issue actually occurs within the dask-kubernetes package, as it appears to be defined in the dask.distributed (in partciular in distributed/deploy/spec.py) package specifically.

What happened:

I get a Timeout error after 30 seconds.

What you expected to happen:

I (hopefully) do not get a Timeout error since provisioning an ELB in EKS should normally take roughly 120 seconds. Ideally, I get no errors, but in this case, proof that the service timeout parameter works is a valid proof of having solved the issue.

Minimal Complete Verifiable Example:


from kubernetes import client as k8sclient
from dask_kubernetes import KubeCluster, KubeConfig

scheduler_pod = k8sclient.V1Pod(
                    metadata=k8sclient.V1ObjectMeta(annotations={}),
                    spec=k8sclient.V1PodSpec(
                        containers=[
                            k8sclient.V1Container(
                                name="scheduler",
                                image="daskdev/dask:latest",
                                args=[
                                    "dask-scheduler"
                                ])],
                        tolerations=[
                            k8sclient.V1Toleration(
                                effect="NoSchedule",
                                operator="Equal",
                                key="nodeType",
                                value="scheduler")]
                    )
                )

dask.config.set({"kubernetes.scheduler-service-type": "LoadBalancer"})
dask.config.set({"kubernetes.scheduler-service-wait-timeout": 300})

auth = KubeConfig(config_file="~/.kube/config")

cluster = KubeCluster(pod_template="worker.yaml",
                      namespace='dask',
                      deploy_mode="remote",
                      n_workers=0,
                      scheduler_service_wait_timeout=300,
                      scheduler_pod_template=scheduler_pod)

Anything else we need to know?:

My EKS Cluster is Private. The Jupyter Lab machines are in a different subnet from the EKS cluster, but in the same VPC. Deployment mode is set to remote with KubeCluster().

We do not want to use Port-Forwarding from the Jupyter Lab machines.

Environment:

jacobtomlinson commented 3 years ago

Thanks for raising this, I'll dig into it. Can you please share the timeout error too?

mb-qco commented 3 years ago

Yes, so there are two different Error messages. This one is directly from the logs of the scheduler:

Traceback (most recent call last):
  File "/opt/conda/bin/dask-scheduler", line 11, in <module>
    sys.exit(go())
  File "/opt/conda/lib/python3.8/site-packages/distributed/cli/dask_scheduler.py", line 217, in go
    main()
  File "/opt/conda/lib/python3.8/site-packages/click/core.py", line 1137, in __call__
    return self.main(*args, **kwargs)
  File "/opt/conda/lib/python3.8/site-packages/click/core.py", line 1062, in main
    rv = self.invoke(ctx)
  File "/opt/conda/lib/python3.8/site-packages/click/core.py", line 1404, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/opt/conda/lib/python3.8/site-packages/click/core.py", line 763, in invoke
    return __callback(*args, **kwargs)
  File "/opt/conda/lib/python3.8/site-packages/distributed/cli/dask_scheduler.py", line 208, in main
    loop.run_sync(run)
  File "/opt/conda/lib/python3.8/site-packages/tornado/ioloop.py", line 529, in run_sync
    raise TimeoutError("Operation timed out after %s seconds" % timeout)
tornado.util.TimeoutError: Operation timed out after None seconds
rpc error: code = Unknown desc = Error: No such container: 5bbb2b30edb95cc08166c14098909c37cc7269d8484de28e47

This is the error from the Jupyter Notebook:

gaierror                                  Traceback (most recent call last)
/opt/miniconda3/envs/mattia_env/lib/python3.8/site-packages/distributed/comm/core.py in connect(addr, timeout, deserialize, handshake_overrides, **connection_args)
    282         try:
--> 283             comm = await asyncio.wait_for(
    284                 connector.connect(loc, deserialize=deserialize, **connection_args),

/opt/miniconda3/envs/mattia_env/lib/python3.8/asyncio/tasks.py in wait_for(fut, timeout, loop)
    493         if fut.done():
--> 494             return fut.result()
    495         else:

/opt/miniconda3/envs/mattia_env/lib/python3.8/site-packages/distributed/comm/tcp.py in connect(self, address, deserialize, **connection_args)
    390         try:
--> 391             stream = await self.client.connect(
    392                 ip, port, max_buffer_size=MAX_BUFFER_SIZE, **kwargs

/opt/miniconda3/envs/mattia_env/lib/python3.8/site-packages/tornado/tcpclient.py in connect(self, host, port, af, ssl_options, max_buffer_size, source_ip, source_port, timeout)
    264         else:
--> 265             addrinfo = await self.resolver.resolve(host, port, af)
    266         connector = _Connector(

/opt/miniconda3/envs/mattia_env/lib/python3.8/site-packages/distributed/_concurrent_futures_thread.py in run(self)
     65         try:
---> 66             result = self.fn(*self.args, **self.kwargs)
     67         except BaseException as e:

/opt/miniconda3/envs/mattia_env/lib/python3.8/site-packages/tornado/netutil.py in resolve(self, host, port, family)
    443     ) -> List[Tuple[int, Any]]:
--> 444         return _resolve_addr(host, port, family)
    445 

/opt/miniconda3/envs/mattia_env/lib/python3.8/site-packages/tornado/netutil.py in _resolve_addr(host, port, family)
    381     # so the addresses we return should still be usable with SOCK_DGRAM.
--> 382     addrinfo = socket.getaddrinfo(host, port, family, socket.SOCK_STREAM)
    383     results = []

/opt/miniconda3/envs/mattia_env/lib/python3.8/socket.py in getaddrinfo(host, port, family, type, proto, flags)
    917     addrlist = []
--> 918     for res in _socket.getaddrinfo(host, port, family, type, proto, flags):
    919         af, socktype, proto, canonname, sa = res

gaierror: [Errno -2] Name or service not known

The above exception was the direct cause of the following exception:

OSError                                   Traceback (most recent call last)
/opt/miniconda3/envs/mattia_env/lib/python3.8/site-packages/distributed/deploy/spec.py in _start(self)
    317         try:
--> 318             await super()._start()
    319         except Exception as e:

/opt/miniconda3/envs/mattia_env/lib/python3.8/site-packages/distributed/deploy/cluster.py in _start(self)
     77     async def _start(self):
---> 78         comm = await self.scheduler_comm.live_comm()
     79         await comm.write({"op": "subscribe_worker_status"})

/opt/miniconda3/envs/mattia_env/lib/python3.8/site-packages/distributed/core.py in live_comm(self)
    752         if not open or comm.closed():
--> 753             comm = await connect(
    754                 self.address,

/opt/miniconda3/envs/mattia_env/lib/python3.8/site-packages/distributed/comm/core.py in connect(addr, timeout, deserialize, handshake_overrides, **connection_args)
    306     else:
--> 307         raise OSError(
    308             f"Timed out trying to connect to {addr} after {timeout} s"

OSError: Timed out trying to connect to tcp://*.elb.amazonaws.com:8786 after 30 s

The above exception was the direct cause of the following exception:

RuntimeError                              Traceback (most recent call last)
/tmp/ipykernel_10671/3868758728.py in <module>
      3 dask.config.get("kubernetes.scheduler-service-wait-timeout")
      4 
----> 5 cluster = KubeCluster(pod_template="worker.yaml",
      6                       namespace='dask',
      7                       deploy_mode="remote",

/opt/miniconda3/envs/mattia_env/lib/python3.8/site-packages/dask_kubernetes/core.py in __init__(self, pod_template, name, namespace, n_workers, host, port, env, auth, idle_timeout, deploy_mode, interface, protocol, dashboard_address, security, scheduler_service_wait_timeout, scheduler_pod_template, **kwargs)
    464         self.auth = auth
    465         self.kwargs = kwargs
--> 466         super().__init__(**self.kwargs)
    467 
    468     def _get_pod_template(self, pod_template, pod_type):

/opt/miniconda3/envs/mattia_env/lib/python3.8/site-packages/distributed/deploy/spec.py in __init__(self, workers, scheduler, worker, asynchronous, loop, security, silence_logs, name, shutdown_on_close, scheduler_sync_interval)
    281         if not self.asynchronous:
    282             self._loop_runner.start()
--> 283             self.sync(self._start)
    284             self.sync(self._correct_state)
    285 

/opt/miniconda3/envs/mattia_env/lib/python3.8/site-packages/distributed/deploy/cluster.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    212             return future
    213         else:
--> 214             return sync(self.loop, func, *args, **kwargs)
    215 
    216     def _log(self, log):

/opt/miniconda3/envs/mattia_env/lib/python3.8/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
    324     if error[0]:
    325         typ, exc, tb = error[0]
--> 326         raise exc.with_traceback(tb)
    327     else:
    328         return result[0]

/opt/miniconda3/envs/mattia_env/lib/python3.8/site-packages/distributed/utils.py in f()
    307             if callback_timeout is not None:
    308                 future = asyncio.wait_for(future, callback_timeout)
--> 309             result[0] = yield future
    310         except Exception:
    311             error[0] = sys.exc_info()

/opt/miniconda3/envs/mattia_env/lib/python3.8/site-packages/tornado/gen.py in run(self)
    760 
    761                     try:
--> 762                         value = future.result()
    763                     except Exception:
    764                         exc_info = sys.exc_info()

/opt/miniconda3/envs/mattia_env/lib/python3.8/site-packages/dask_kubernetes/core.py in _start(self)
    593         self.name = self.pod_template.metadata.generate_name
    594 
--> 595         await super()._start()
    596 
    597     @classmethod

/opt/miniconda3/envs/mattia_env/lib/python3.8/site-packages/distributed/deploy/spec.py in _start(self)
    320             self.status = Status.failed
    321             await self._close()
--> 322             raise RuntimeError(f"Cluster failed to start. {str(e)}") from e
    323 
    324     def _correct_state(self):

RuntimeError: Cluster failed to start. Timed out trying to connect to tcp://*.elb.amazonaws.com:8786 after 30 s
jacobtomlinson commented 3 years ago

Ah ok, that looks like AWS is provisioning the ELB but then the ELB takes time to actually become responsive. So the Dask Kubernetes service is being created successfully and then Distributed is timing out trying to connect to the ELB.

Instead of setting the Dask Kubernetes timeout you should be setting distributed.comm.timeouts.connect.

mb-qco commented 3 years ago

Will try this @jacobtomlinson and get back to you ASAP.

mb-qco commented 3 years ago

@jacobtomlinson So, the ELB has been up for over 5 minutes, and no timeout errors after using:

dask.config.set({"distributed.comm.timeouts.connect": 300})

However, in my Jupter Notebook, the Cell with the KubeCluster() invocation is still thinking / hanging. The logs on the scheduler pod show the following:

distributed.comm.tcp - INFO - Connection from tcp://*.*.*.*:24088 closed before handshake completed
distributed.comm.tcp - INFO - Connection from tcp://*.*.*.*:5657 closed before handshake completed
distributed.comm.tcp - INFO - Connection from tcp://*.*.*.*:57676 closed before handshake completed

I think this is a networking thing within the cluster. After reviewing the IP, it comes from a subnet within the EKS cluster.

jacobtomlinson commented 3 years ago

Can you confirm that the Python and Dask versions are the same in the Jupyter Notebook and the container image that you are using (looks like the default daskdev/dask:latest)?

mb-qco commented 3 years ago

@jacobtomlinson they could be a few months off. I will try converging them to the same version and let you know.

EDIT: Here is the conda list output of dask related packages that I am using on the machine running Jupyter Notebook / Labs. Turns out they all are as recent as they can be, except python.

dask: 2021.9.1

dask-kubernetes: 2021.3.1

kubernetes: 18.20.0

python: 3.8.11

distributed: 2021.9.1

I think the only difference is the python version, as it is 3.8.12 inside daskdev/dask:latest

mb-qco commented 3 years ago

More details:

from distributed.versions import get_versions

get_versions()

returns the following from within Jupyter Lab (client side):

{'host': {'python': '3.8.11.final.0',
  'python-bits': 64,
  'OS': 'Linux',
  'OS-release': '5.11.0-1017-aws',
  'machine': 'x86_64',
  'processor': 'x86_64',
  'byteorder': 'little',
  'LC_ALL': 'None',
  'LANG': 'C.UTF-8'},
 'packages': {'python': '3.8.11.final.0',
  'dask': '2021.09.1',
  'distributed': '2021.09.1',
  'msgpack': '1.0.2',
  'cloudpickle': '2.0.0',
  'tornado': '6.1',
  'toolz': '0.11.1',
  'numpy': '1.21.2',
  'pandas': '1.3.3',
  'lz4': None,
  'blosc': None}}
mb-qco commented 3 years ago

@jacobtomlinson checking the IPs which the scheduler is trying to do the TCP handshake with - all of them refer to the DaemonSet pods aws-node-* created on each Kubernetes Node by default. I checked the version of the CNI they use, and here is the output:

jacobtomlinson commented 3 years ago

While the cluster is hanging are you able to open another notebook and connect a Client object directly to tcp://*.elb.amazonaws.com:8786?

mb-qco commented 3 years ago

@jacobtomlinson turns out this gave me a Timeout error. I opened a new notebook and did:

from dask.distributed import Client
client = Client(address="tcp://*.elb.amazonaws.com:8786")

And got:

---------------------------------------------------------------------------
TimeoutError                              Traceback (most recent call last)
/opt/miniconda3/lib/python3.8/site-packages/distributed/comm/core.py in connect(addr, timeout, deserialize, handshake_overrides, **connection_args)
    285         try:
--> 286             comm = await asyncio.wait_for(
    287                 connector.connect(loc, deserialize=deserialize, **connection_args),

/opt/miniconda3/lib/python3.8/asyncio/tasks.py in wait_for(fut, timeout, loop)
    489             await _cancel_and_wait(fut, loop=loop)
--> 490             raise exceptions.TimeoutError()
    491     finally:

TimeoutError: 

The above exception was the direct cause of the following exception:

OSError                                   Traceback (most recent call last)
/tmp/ipykernel_4113/2974483473.py in <module>
----> 1 client = Client(address="tcp://*.elb.amazonaws.com:8786")

/opt/miniconda3/lib/python3.8/site-packages/distributed/client.py in __init__(self, address, loop, timeout, set_as_default, scheduler_file, security, asynchronous, name, heartbeat_interval, serializers, deserializers, extensions, direct_to_workers, connection_limit, **kwargs)
    762             ext(self)
    763 
--> 764         self.start(timeout=timeout)
    765         Client._instances.add(self)
    766 

/opt/miniconda3/lib/python3.8/site-packages/distributed/client.py in start(self, **kwargs)
   1008             self._started = asyncio.ensure_future(self._start(**kwargs))
   1009         else:
-> 1010             sync(self.loop, self._start, **kwargs)
   1011 
   1012     def __await__(self):

/opt/miniconda3/lib/python3.8/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
    324     if error[0]:
    325         typ, exc, tb = error[0]
--> 326         raise exc.with_traceback(tb)
    327     else:
    328         return result[0]

/opt/miniconda3/lib/python3.8/site-packages/distributed/utils.py in f()
    307             if callback_timeout is not None:
    308                 future = asyncio.wait_for(future, callback_timeout)
--> 309             result[0] = yield future
    310         except Exception:
    311             error[0] = sys.exc_info()

/opt/miniconda3/lib/python3.8/site-packages/tornado/gen.py in run(self)
    760 
    761                     try:
--> 762                         value = future.result()
    763                     except Exception:
    764                         exc_info = sys.exc_info()

/opt/miniconda3/lib/python3.8/site-packages/distributed/client.py in _start(self, timeout, **kwargs)
   1098 
   1099         try:
-> 1100             await self._ensure_connected(timeout=timeout)
   1101         except (OSError, ImportError):
   1102             await self._close()

/opt/miniconda3/lib/python3.8/site-packages/distributed/client.py in _ensure_connected(self, timeout)
   1155 
   1156         try:
-> 1157             comm = await connect(
   1158                 self.scheduler.address, timeout=timeout, **self.connection_args
   1159             )

/opt/miniconda3/lib/python3.8/site-packages/distributed/comm/core.py in connect(addr, timeout, deserialize, handshake_overrides, **connection_args)
    308             await asyncio.sleep(backoff)
    309     else:
--> 310         raise OSError(
    311             f"Timed out trying to connect to {addr} after {timeout} s"
    312         ) from active_exception

OSError: Timed out trying to connect to tcp://*.elb.amazonaws.com:8786 after 30 s
jacobtomlinson commented 3 years ago

It definitely sounds like the load balancer is not pointing to the Kubernetes service correctly.

mb-qco commented 3 years ago

Hmm, that is interesting. Are there other ways to override the scheduler-service-template before creating the cluster? For example, adding annotations ? The reason why I ask is because Kubernetes is not complaining about the template as it is defined, and the name for the external URL on the ELB resolves to an IP after 2.5 minutes indicating that it should be able to forward traffic from the ELB to the ClusterIP.

jacobtomlinson commented 3 years ago

kubernetes.scheduler-service-template is a configurable option.

mb-qco commented 3 years ago

@jacobtomlinson I can confirm that this is an issue with how KubeCluster() works, and not with the way the LoadBalancer works as the Dashboard is perfectly accessible on 8787 on the ELB External IP.

However, I am unable to get the distributed.Client connection which is automatically attempted when invoking the KubeCluster() method. That wouldn't really be necessary.

Regarding kubernetes.scheduler-service-template, I just leave this as default, except for changing the type from ClusterIP to LoadBalancer. The EKS Cloud Controller Manager (CCM) handles the LB thereafter.

There are a couple of things which I would explore:

  1. Why would the Scheduler contact the Container Network Interface (CNI) Pods associated with the EKS-CNI DaemonSet? Is this to become aware of IP addresses for "future" workers? Isn't it workers registering with the scheduler, and since these are pods, the Node IPs would not be relevant as the Pod IPs differ? Also, these EKS-CNI <-> Scheduler connections just close, and it is not specified anywhere in the documentation why this should happen.
  2. There are no configuration options available in dask.config.distributed.comm.tcp, or dask.config.kubernetes that map to the CNI communication in particular.
  3. The invocation of KubeCluster() I currently use does not necessarily do anything wrong. I mean, I do get a working LoadBalancer Service which I can use to access the Dask Scheduler Dashboard. Additionally, the Pod is running until the Client connection is attempted. Once the Client connection fails, both the Service and the running Pod are terminated.
jacobtomlinson commented 3 years ago

If the dashboard is working then that's great. But if you are unable to connect a client on port 8786 via the LB then things aren't working correctly.

  1. I'm not sure why this would be happening. When does the scheduler try to do this? When you try to connect a client?
  2. The scheduler and workers will try to communicate directly over pod networking. What configuration options are you looking for?
  3. It sounds like communication from wherever you are creating KubeCluster to the scheduler isn't setting up correctly and it cant speak to it. Either there is an issue with the way the LB/service/pod is being created, or there is some new feature in CNI that Dask Kubernetes is handling incorrectly.
mb-qco commented 3 years ago

@jacobtomlinson Turns out that this can indeed be a security group rule issue. I just realized that I am actually accessing the Dashboard from a different subnet than where the distributed.Client inovcation takes place.

I am testing with the configuration changes on the security group now and will let you know in case this works :smile:

mb-qco commented 3 years ago

After changing the ingress rules, I encountered another issue. I need to bind the IP of the LoadBalancer to belong in one of the private subnets associated with my EKS cluster. However, it seems the kubernetes.scheduler-service-template does not support annotations at the moment.

Here is my config before running KubeCluster():

# Set all Kubernetes configuration options here
# Define kubernetes.client.V1Pod
from kubernetes import client as k8sclient

# We need to construct the specification using ObjectMeta, PodSpec, Container
scheduler_pod = k8sclient.V1Pod(
                    kind="Pod",
                    metadata=k8sclient.V1ObjectMeta(annotations={}),
                    spec=k8sclient.V1PodSpec(
                        containers=[
                            k8sclient.V1Container(
                                name="scheduler",
                                image="daskdev/dask:latest",
                                args=[
                                    "dask-scheduler"
                                ])],
                        tolerations=[
                            k8sclient.V1Toleration(
                                effect="NoSchedule",
                                operator="Equal",
                                key="nodeType",
                                value="dask-scheduler")]
                    )
                )
# List Pod
scheduler_pod

dask_scheduler_service_template = {'apiVersion': 'v1',
                                   'kind': 'Service',
                                   'annotations' : {
                                      'service.beta.kubernetes.io/aws-load-balancer-internal': "true",
                                   },
                                   'spec': {
                                       'selector': {
                                           'dask.org/cluster-name': '',
                                           'dask.org/component': 'scheduler'
                                       },
                                      'ports': [{
                                          'name': 'comm',
                                          'protocol': 'TCP',
                                          'port': 8786,
                                          'targetPort': 8786
                                      },
                                       {'name': 'dashboard',
                                        'protocol': 'TCP',
                                        'port': 8787,
                                        'targetPort': 8787}]}}

dask.config.set({"kubernetes.scheduler-service-type": "LoadBalancer"})
dask.config.set({"kubernetes.scheduler-service-wait-timeout": 50})
dask.config.set({"kubernetes.namespace": "dask"})
dask.config.set({"kubernetes.scheduler-template": scheduler_pod})
dask.config.set({"kubernetes.scheduler-service-template": dask_scheduler_service_template})

dask.config.get("kubernetes.scheduler-service-template")

When I then invoke KubeCluster() like this:

cluster = KubeCluster(pod_template="worker.yaml",
                      deploy_mode="remote",
                      n_workers=0,
                      scheduler_pod_template=scheduler_pod)

It only creates a public IP for the LoadBalancer.

mb-qco commented 3 years ago

@jacobtomlinson thanks for the help so far. Could you point me in the direction where I can change the package so it actually is able to add the annotation on the service?

jacobtomlinson commented 3 years ago

Your template isn't quite right. annotations isn't a top-level key, it should be under the metadata key. So it should probably look like this.

dask.config.set(
    {
        "kubernetes.scheduler-service-template": {
            "apiVersion": "v1",
            "kind": "Service",
            "metadata": {
                "annotations": {
                    "service.beta.kubernetes.io/aws-load-balancer-internal": "true",
                },
            },
            "spec": {
                "selector": {
                    "dask.org/cluster-name": "",
                    "dask.org/component": "scheduler",
                },
                "ports": [
                    {
                        "name": "comm",
                        "protocol": "TCP",
                        "port": 8786,
                        "targetPort": 8786,
                    },
                    {
                        "name": "dashboard",
                        "protocol": "TCP",
                        "port": 8787,
                        "targetPort": 8787,
                    },
                ],
            },
        }
    }
)
mb-qco commented 3 years ago

I just noticed this myself as well :100: Config-heaven, haha! Thanks @jacobtomlinson.

mb-qco commented 3 years ago

@jacobtomlinson I can now confirm that I was able to get the KubeCluster() method to work. Further testing on functionality is now taking place. I had to change some network firewall rules to make this work in the end and avoid the TimeOutError.

Many thanks for the help!