dask / dask-kubernetes

Native Kubernetes integration for Dask
https://kubernetes.dask.org
BSD 3-Clause "New" or "Revised" License
312 stars 148 forks source link

xgboost "Allreduce failed" with the dask operator specifically #898

Closed droctothorpe closed 4 months ago

droctothorpe commented 4 months ago

Hey, @jacobtomlinson!

We're trying to run a training job that uses xgboost + dask + distributed + dask-kubernetes. It works fine as long as provision the dask cluster with KubeCluster classic. As soon as we provision the cluster with KubeCluster operator, the same exact computation FAILS with the following error:

│ main     raise XGBoostError(py_str(_LIB.XGBGetLastError()))                                                                                                                                                                                                                  
│ main xgboost.core.XGBoostError: [18:22:16] ../rabit/include/rabit/internal/utils.h:86: Allreduce failed                                                                                                                                                                      
│ main time="2024-07-19T18:22:18.609Z" level=info msg="sub-process exited" argo=true error="<nil>"                                                                                                                                                                             
│ main Error: exit status 1

All other dependencies are identical! We're even using the same version of dask-kubernetes (2023.10.0).

I'm sorry for not providing MCVE. It would be really hard to extricate the replicate the proprietary dataset.

Have you ever run into anything like this before? Any advice would be greatly appreciated. Thank you!

Environment:

droctothorpe commented 4 months ago

So it turns out that this error can surface when the memory-limit flag is not passed to the worker command. The operator's make_cluster_spec doesn't set or explicitly expose this argument (link), whereas classic did (link) and even recommended setting it to the same value as requests (and vice versa) (link).

How would you feel about exposing it via make_cluster_spec? Would you prefer that it default to auto? Happy to contribute if you think this contribution makes sense (bandwidth permitting).

jacobtomlinson commented 4 months ago

With the operator the flag is not set, so it defaults to auto. It would be interesting to know what value that results in on your cluster. If you're setting your Pod memory resource limit then I would expect auto to correctly detect that limit and we shouldn't need to specify that. Could you check the worker info and find out what value it detects?

If I remember correctly we added setting things explicitly in the classic mode because in the early days Kubernetes/linux cgroups didn't always correctly report the right memory limit and would report the limit of the whole node. I wouldn't expect that to be an issue these days, but perhaps the problem you ran into here shows that it does.

I'm very conservative about adding new options to make_cluster_spec and KubeCluster because it is a slippery slope. If you want to set that memory limit flag with the operator you can do it like this:

from dask_kubernetes.operator import KubeCluster, make_cluster_spec

cluster_spec = make_cluster_spec(name="foo")
cluster_spec["spec"]["worker"]["spec"]["containers"][0]["args"] += ["--memory-limit", "4GB"]

cluster = KubeCluster(custom_cluster_spec=cluster_spec)

I've been thinking lately about making the customizing your cluster API a little more pleasant, and I think that would be useful here. I'll open a separate issue for that.

droctothorpe commented 4 months ago

Thanks, @jacobtomlinson!

Could you check the worker info and find out what value it detects?

Dumb question: how do I check what "auto" evaluates to?

jacobtomlinson commented 4 months ago

You should be able to see the worker memory on the dashboard.

<your dashboard url>/info/main/workers.html

droctothorpe commented 4 months ago

Looks like it was inferred to a reasonable value and setting it manually was a red herring. Thanks, @jacobtomlinson!

jacobtomlinson commented 4 months ago

So did you find what was causing the Allreduce failed error?

droctothorpe commented 4 months ago

Turns out it was cluster autoscaler trying to bin-pack workers and the specific xgboost computation (RFE) not being resilient to worker loss.

Interestingly, we didn't see this behavior in classic when conducting the same computation. Take this with a huge grain of salt, but I'm wondering if it has something to do with these annotations. Were they excluded from the operator spec for any specific reason?

In any case, adding spec["metadata"]["annotations"]["cluster-autoscaler.kubernetes.io/safe-to-evict"] = "false" appears to have resolved the problem. We're adding it to the DaskClusterConfig class I mentioned as a no_evict property and setting it to True by default. Would it make sense a sensible default for KubeCluster as well? Also spec["metadata"]["annotations"]["karpenter.sh/do-not-evict"] == "true" for Karpenter (it's funny that Karpenter inverts the boolean).

jacobtomlinson commented 4 months ago

Were they excluded from the operator spec for any specific reason?

Those tolerations are only there so that users can set up dedicated Dask nodes by adding the corresponding taint. Features like this were added by users who needed specific functionality. When we wrote the operator we intentionally left out a load of things like this because it's much easier to just add them yourself now and it helps keep the package more simple.

Would it make sense a sensible default for KubeCluster as well?

Generally Dask workers should be safe to evict. When they recieve a SIGTERM they attempt to hand off any tasks and memory to another worker before shutting down. I think it's actually XGBoost that isn't resillient to this, so I don't think we should add a default just for that single use case.

I think this definitely feeds into #899. Adding a concenience method to make it easy to add annotations would be very helpful.