dask / dask-xgboost

BSD 3-Clause "New" or "Revised" License
162 stars 43 forks source link

Cannot assign requested address #23

Closed Mestalbet closed 4 years ago

Mestalbet commented 6 years ago

Hi,

On a simple Dask XGBoost run I get the error in the subject. The sample code looks like:

from dask_ml.xgboost import XGBRegressor
est = XGBRegressor(...)
x = dd.read_csv('somedata.csv')
y = x.y
del x['y'] 
est.fit(x, y)

And the error is as follows:

---------------------------------------------------------------------------
OSError                                   Traceback (most recent call last)
<ipython-input-4-d50d84593355> in <module>()
      5 y = x.y
      6 del x['y']
----> 7 est.fit(x, y)

/opt/conda/lib/python3.6/site-packages/dask_xgboost/core.py in fit(self, X, y)
    239         xgb_options = self.get_xgb_params()
    240         self._Booster = train(client, xgb_options, X, y,
--> 241                               num_boost_round=self.n_estimators)
    242         return self
    243 

/opt/conda/lib/python3.6/site-packages/dask_xgboost/core.py in train(client, params, data, labels, dmatrix_kwargs, **kwargs)
    167     """
    168     return sync(client.loop, _train, client, params, data,
--> 169                 labels, dmatrix_kwargs, **kwargs)
    170 
    171 

/opt/conda/lib/python3.6/site-packages/distributed/utils.py in sync(loop, func, *args, **kwargs)
    273             e.wait(10)
    274     if error[0]:
--> 275         six.reraise(*error[0])
    276     else:
    277         return result[0]

/opt/conda/lib/python3.6/site-packages/six.py in reraise(tp, value, tb)
    691             if value.__traceback__ is not tb:
    692                 raise value.with_traceback(tb)
--> 693             raise value
    694         finally:
    695             value = None

/opt/conda/lib/python3.6/site-packages/distributed/utils.py in f()
    258             yield gen.moment
    259             thread_state.asynchronous = True
--> 260             result[0] = yield make_coro()
    261         except Exception as exc:
    262             error[0] = sys.exc_info()

/opt/conda/lib/python3.6/site-packages/tornado/gen.py in run(self)
   1097 
   1098                     try:
-> 1099                         value = future.result()
   1100                     except Exception:
   1101                         self.had_exception = True

/opt/conda/lib/python3.6/site-packages/tornado/gen.py in run(self)
   1105                     if exc_info is not None:
   1106                         try:
-> 1107                             yielded = self.gen.throw(*exc_info)
   1108                         finally:
   1109                             # Break up a reference to itself

/opt/conda/lib/python3.6/site-packages/dask_xgboost/core.py in _train(client, params, data, labels, dmatrix_kwargs, **kwargs)
    122     env = yield client._run_on_scheduler(start_tracker,
    123                                          host.strip('/:'),
--> 124                                          len(worker_map))
    125 
    126     # Tell each worker to train on the chunks/parts that it has locally

/opt/conda/lib/python3.6/site-packages/tornado/gen.py in run(self)
   1097 
   1098                     try:
-> 1099                         value = future.result()
   1100                     except Exception:
   1101                         self.had_exception = True

/opt/conda/lib/python3.6/site-packages/tornado/gen.py in run(self)
   1111                             exc_info = None
   1112                     else:
-> 1113                         yielded = self.gen.send(value)
   1114 
   1115                     if stack_context._state.contexts is not orig_stack_contexts:

/opt/conda/lib/python3.6/site-packages/distributed/client.py in _run_on_scheduler(self, function, *args, **kwargs)
   1911                                                      kwargs=dumps(kwargs))
   1912         if response['status'] == 'error':
-> 1913             six.reraise(*clean_exception(**response))
   1914         else:
   1915             raise gen.Return(response['result'])

/opt/conda/lib/python3.6/site-packages/six.py in reraise(tp, value, tb)
    690                 value = tp()
    691             if value.__traceback__ is not tb:
--> 692                 raise value.with_traceback(tb)
    693             raise value
    694         finally:

/opt/conda/lib/python3.6/site-packages/dask_xgboost/core.py in start_tracker()
     30     """ Start Rabit tracker """
     31     env = {'DMLC_NUM_WORKER': n_workers}
---> 32     rabit = RabitTracker(hostIP=host, nslave=n_workers)
     33     env.update(rabit.slave_envs())
     34 

/opt/conda/lib/python3.6/site-packages/dask_xgboost/tracker.py in __init__()
    166         for port in range(port, port_end):
    167             try:
--> 168                 sock.bind((hostIP, port))
    169                 self.port = port
    170                 break

OSError: [Errno 99] Cannot assign requested address

Any help will be greatly appreciated.

Thanks.

mrocklin commented 6 years ago

Hrm, interesting. On line 171 we catch OSErrors and handle them, so I'm a bit surprised that this affected you at all. What is the result of the following code on your machine?

import socket socket.error is OSError

On Mon, Jul 16, 2018 at 2:53 AM, Noah Dolev notifications@github.com wrote:

Hi,

On a simple Dask XGBoost run I get the error in the subject. The sample code looks like:

from dask_ml.xgboost import XGBRegressor est = XGBRegressor(...) x = dd.read_csv('somedata.csv') y = x.ydel x['y'] est.fit(x, y)

And the error is as follows:


OSError Traceback (most recent call last)

in () 5 y = x.y 6 del x['y'] ----> 7 est.fit(x, y) /opt/conda/lib/python3.6/site-packages/dask_xgboost/core.py in fit(self, X, y) 239 xgb_options = self.get_xgb_params() 240 self._Booster = train(client, xgb_options, X, y, --> 241 num_boost_round=self.n_estimators) 242 return self 243 /opt/conda/lib/python3.6/site-packages/dask_xgboost/core.py in train(client, params, data, labels, dmatrix_kwargs, **kwargs) 167 """ 168 return sync(client.loop, _train, client, params, data, --> 169 labels, dmatrix_kwargs, **kwargs) 170 171 /opt/conda/lib/python3.6/site-packages/distributed/utils.py in sync(loop, func, *args, **kwargs) 273 e.wait(10) 274 if error[0]: --> 275 six.reraise(*error[0]) 276 else: 277 return result[0] /opt/conda/lib/python3.6/site-packages/six.py in reraise(tp, value, tb) 691 if value.__traceback__ is not tb: 692 raise value.with_traceback(tb) --> 693 raise value 694 finally: 695 value = None /opt/conda/lib/python3.6/site-packages/distributed/utils.py in f() 258 yield gen.moment 259 thread_state.asynchronous = True --> 260 result[0] = yield make_coro() 261 except Exception as exc: 262 error[0] = sys.exc_info() /opt/conda/lib/python3.6/site-packages/tornado/gen.py in run(self) 1097 1098 try: -> 1099 value = future.result() 1100 except Exception: 1101 self.had_exception = True /opt/conda/lib/python3.6/site-packages/tornado/gen.py in run(self) 1105 if exc_info is not None: 1106 try: -> 1107 yielded = self.gen.throw(*exc_info) 1108 finally: 1109 # Break up a reference to itself /opt/conda/lib/python3.6/site-packages/dask_xgboost/core.py in _train(client, params, data, labels, dmatrix_kwargs, **kwargs) 122 env = yield client._run_on_scheduler(start_tracker, 123 host.strip('/:'), --> 124 len(worker_map)) 125 126 # Tell each worker to train on the chunks/parts that it has locally /opt/conda/lib/python3.6/site-packages/tornado/gen.py in run(self) 1097 1098 try: -> 1099 value = future.result() 1100 except Exception: 1101 self.had_exception = True /opt/conda/lib/python3.6/site-packages/tornado/gen.py in run(self) 1111 exc_info = None 1112 else: -> 1113 yielded = self.gen.send(value) 1114 1115 if stack_context._state.contexts is not orig_stack_contexts: /opt/conda/lib/python3.6/site-packages/distributed/client.py in _run_on_scheduler(self, function, *args, **kwargs) 1911 kwargs=dumps(kwargs)) 1912 if response['status'] == 'error': -> 1913 six.reraise(*clean_exception(**response)) 1914 else: 1915 raise gen.Return(response['result']) /opt/conda/lib/python3.6/site-packages/six.py in reraise(tp, value, tb) 690 value = tp() 691 if value.__traceback__ is not tb: --> 692 raise value.with_traceback(tb) 693 raise value 694 finally: /opt/conda/lib/python3.6/site-packages/dask_xgboost/core.py in start_tracker() 30 """ Start Rabit tracker """ 31 env = {'DMLC_NUM_WORKER': n_workers} ---> 32 rabit = RabitTracker(hostIP=host, nslave=n_workers) 33 env.update(rabit.slave_envs()) 34 /opt/conda/lib/python3.6/site-packages/dask_xgboost/tracker.py in __init__() 166 for port in range(port, port_end): 167 try: --> 168 sock.bind((hostIP, port)) 169 self.port = port 170 break OSError: [Errno 99] Cannot assign requested address Any help will be greatly appreciated. Thanks. — You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHub , or mute the thread .
Mestalbet commented 6 years ago

The result is true . Thanks for looking into it. FYI: The environment is Dask + google-gke.

Mestalbet commented 6 years ago

Any ideas?

Mestalbet commented 6 years ago

Any news?

TomAugspurger commented 6 years ago

It's a bit hard since neither of us can reproduce it. A few things to try:

  1. How are you creating your client?
  2. Print out the host IP and port, try creating a socket and connecting to that manually
  3. Try xgboost in distributed mode without dask (https://xgboost.readthedocs.io/en/latest/tutorials/aws_yarn.html)

IIRC, this code is taken as is from https://github.com/dmlc/rabit. Perhaps there are similar issues there.

Mestalbet commented 6 years ago

Hi,

I create the client exactly as in the example.

from distributed import Client
client = Client('scheduler-ip:8786')

I succeed to connect a socket manually:

import socket
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client_socket.connect(("scheduler-ip",8786))
message = input('Input lowercase sentence:')
client_socket.sendto(message.encode(),("scheduler-ip", 8786))

Inputting "test" returns 4 as expected.

Printing the hostIP and port return the expected scheduler-ip and port 8786.

I really am not sure where the problem is. I can't run your demo code.

TomAugspurger commented 6 years ago

I'm not sure where the problem is either. It'd be nice if you could make a small example using just XGBoost's distributed mode. That'd either pinpoint dask-xgboost as the problem, or in case it's an xgboost issue we could reach out to that community.

aymankh86 commented 6 years ago

Hi, Any update on this isssue? It works well when I run it on local cluster but I got the same error when deploy it and run on the cluster

OSError: [Errno 99] Cannot assign requested address

TomAugspurger commented 6 years ago

I haven't reproduced it yet. I can try on a cluster later.

@aymankh86 can you post your full traceback?

aymankh86 commented 6 years ago
OSError                                   Traceback (most recent call last)
<ipython-input-15-1e550ef8a81e> in <module>()
----> 1 v.get().result()

~/Envs/env/lib/python3.6/site-packages/distributed/client.py in result(self, timeout)
    191                                   raiseit=False)
    192         if self.status == 'error':
--> 193             six.reraise(*result)
    194         elif self.status == 'cancelled':
    195             raise result

~/Envs/env/lib/python3.6/site-packages/six.py in reraise(tp, value, tb)
    690                 value = tp()
    691             if value.__traceback__ is not tb:
--> 692                 raise value.with_traceback(tb)
    693             raise value
    694         finally:

/usr/local/lib/python3.6/site-packages/forecast_api-0.2-py3.6.egg/forecast_api/tasks.py in train_models()

/usr/local/lib/python3.6/site-packages/forecast_api-0.2-py3.6.egg/forecast_api/src/pipeline/training_pipeline.py in run()

/usr/local/lib/python3.6/site-packages/forecast_api-0.2-py3.6.egg/forecast_api/src/models/xgboost.py in fit()

/usr/local/lib/python3.6/site-packages/dask_xgboost/core.py in train()

/usr/local/lib/python3.6/site-packages/distributed/utils.py in sync()

/usr/local/lib/python3.6/site-packages/six.py in reraise()

/usr/local/lib/python3.6/site-packages/distributed/utils.py in f()

/usr/local/lib/python3.6/site-packages/tornado/gen.py in run()

/usr/local/lib/python3.6/site-packages/tornado/gen.py in run()

/usr/local/lib/python3.6/site-packages/dask_xgboost/core.py in _train()

/usr/local/lib/python3.6/site-packages/tornado/gen.py in run()

/usr/local/lib/python3.6/site-packages/tornado/gen.py in run()

/usr/local/lib/python3.6/site-packages/distributed/client.py in _run_on_scheduler()

/usr/local/lib/python3.6/site-packages/six.py in reraise()

/usr/local/lib/python3.6/site-packages/dask_xgboost/core.py in start_tracker()

/usr/local/lib/python3.6/site-packages/dask_xgboost/tracker.py in __init__()

OSError: [Errno 99] Cannot assign requested address
TomAugspurger commented 6 years ago

@Mestalbet, @aymankh86 can you share some details on the networking of your cluster? I was able to reproduce this recently with a helm deployment.

According to my client pod, the scheduler can be reached at tcp://dask-scheduler:8786. But, on the scheduler pod, the address dask-scheduler doesn't resolve to itself.

If I instead ask the scheduler what it's IP is and pass that through, things work.

Could you try out this notebook? https://gist.github.com/c68c06df1f7e68c1ecba6dba2bf67710 (may have to adjust how you connect to the client).

That only real change is accepting a host to pass through to start_tracker.


I'm hopeful we'll be able to resolve this from within dask-xgboost / distributed, rather than having to update your networking settings.

mrocklin commented 6 years ago

So it could be that the scheduler's address is only available on a particular port. When we try to connect on a new port on that same host the address may not be available. Currently it looks like we use the hostname of the scheduler address used by the client, and then some other port. It may be that that port isn't open though.

We should probably open things up a bit to allow users to specify host=, interface=, or a specific port. @TomAugspurger my guess is that for your case of running with Dask's helm chart on a cloud provider that you would probably want to specify one of the network interfaces that are internal to the cloud provider, something like

train(..., interface='...', ...)

Where the interface is the name of the network interface that allows nodes on your cloud provider to talk to each other. (Though I have no idea what this would be).

DigitalPig commented 5 years ago

Hi, I am running helm dark and have the same issue. Using the gist from @TomAugspurger works fine. Any idea how I am able to run dark xgboost from helm then?

palpen commented 5 years ago

I am also running using Helm Dask in a cluster on Kubernetes Engine in GCP and experiencing the same issue.

Here's the error:

---------------------------------------------------------------------------
OSError                                   Traceback (most recent call last)
<ipython-input-63-5da7f2eb79e2> in <module>
----> 1 model = dxgb.train(client, params, X_train_, svar_sum_train)
      2 model

/opt/conda/lib/python3.7/site-packages/dask_xgboost/core.py in train(client, params, data, labels, dmatrix_kwargs, **kwargs)
    167     """
    168     return sync(client.loop, _train, client, params, data,
--> 169                 labels, dmatrix_kwargs, **kwargs)
    170 
    171 

/opt/conda/lib/python3.7/site-packages/distributed/utils.py in sync(loop, func, *args, **kwargs)
    275             e.wait(10)
    276     if error[0]:
--> 277         six.reraise(*error[0])
    278     else:
    279         return result[0]

/opt/conda/lib/python3.7/site-packages/six.py in reraise(tp, value, tb)
    691             if value.__traceback__ is not tb:
    692                 raise value.with_traceback(tb)
--> 693             raise value
    694         finally:
    695             value = None

/opt/conda/lib/python3.7/site-packages/distributed/utils.py in f()
    260             if timeout is not None:
    261                 future = gen.with_timeout(timedelta(seconds=timeout), future)
--> 262             result[0] = yield future
    263         except Exception as exc:
    264             error[0] = sys.exc_info()

/opt/conda/lib/python3.7/site-packages/tornado/gen.py in run(self)
   1131 
   1132                     try:
-> 1133                         value = future.result()
   1134                     except Exception:
   1135                         self.had_exception = True

/opt/conda/lib/python3.7/site-packages/tornado/gen.py in run(self)
   1139                     if exc_info is not None:
   1140                         try:
-> 1141                             yielded = self.gen.throw(*exc_info)
   1142                         finally:
   1143                             # Break up a reference to itself

/opt/conda/lib/python3.7/site-packages/dask_xgboost/core.py in _train(client, params, data, labels, dmatrix_kwargs, **kwargs)
    122     env = yield client._run_on_scheduler(start_tracker,
    123                                          host.strip('/:'),
--> 124                                          len(worker_map))
    125 
    126     # Tell each worker to train on the chunks/parts that it has locally

/opt/conda/lib/python3.7/site-packages/tornado/gen.py in run(self)
   1131 
   1132                     try:
-> 1133                         value = future.result()
   1134                     except Exception:
   1135                         self.had_exception = True

/opt/conda/lib/python3.7/site-packages/tornado/gen.py in run(self)
   1145                             exc_info = None
   1146                     else:
-> 1147                         yielded = self.gen.send(value)
   1148 
   1149                     if stack_context._state.contexts is not orig_stack_contexts:

/opt/conda/lib/python3.7/site-packages/distributed/client.py in _run_on_scheduler(self, function, *args, **kwargs)
   2059                                                      wait=wait)
   2060         if response['status'] == 'error':
-> 2061             six.reraise(*clean_exception(**response))
   2062         else:
   2063             raise gen.Return(response['result'])

/opt/conda/lib/python3.7/site-packages/six.py in reraise(tp, value, tb)
    690                 value = tp()
    691             if value.__traceback__ is not tb:
--> 692                 raise value.with_traceback(tb)
    693             raise value
    694         finally:

/opt/conda/lib/python3.7/site-packages/dask_xgboost/core.py in start_tracker()
     30     """ Start Rabit tracker """
     31     env = {'DMLC_NUM_WORKER': n_workers}
---> 32     rabit = RabitTracker(hostIP=host, nslave=n_workers)
     33     env.update(rabit.slave_envs())
     34 

/opt/conda/lib/python3.7/site-packages/dask_xgboost/tracker.py in __init__()
    166         for port in range(port, port_end):
    167             try:
--> 168                 sock.bind((hostIP, port))
    169                 self.port = port
    170                 break

OSError: [Errno 99] Cannot assign requested address

For what it's worth, here is the content of the yaml file I used when installing Dask using Helm on the cluster

worker:
  replicas: 80
  resources:
    limits:
      cpu: 1
      memory: 4G
    requests:
      cpu: 1
      memory: 4G
  env:
    - name: EXTRA_CONDA_PACKAGES
      value: numba dask-xgboost xarray -c conda-forge
    - name: EXTRA_PIP_PACKAGES
      value: xgboost dask-xgboost pyarrow gcsfs s3fs dask-ml --upgrade

# We want to keep the same packages on the worker and jupyter environments
jupyter:
  enabled: true
  env:
    - name: EXTRA_CONDA_PACKAGES
      value: numba dask-xgboost xarray matplotlib -c conda-forge
    - name: EXTRA_PIP_PACKAGES
      value: xgboost dask-xgboost pyarrow gcsfs s3fs dask-ml --upgrade

scheduler:
  env:
    - name: EXTRA_CONDA_PACKAGES
      value: numba dask-xgboost xarray matplotlib -c conda-forge
    - name: EXTRA_PIP_PACKAGES
      value: xgboost dask-xgboost pyarrow gcsfs s3fs dask-ml --upgrade%
TomAugspurger commented 5 years ago

I'm not really sure what a good solution is here.

Short-term, we can accept an optional host argument to train, and pass that through

# in _train
    if host is None:
        host, port = get_address_host_port(client.scheduler.address)

Then (sophisticated) users can get the real IP address of the scheduler with something like

import subprocess

def get_my_ip():
    return subprocess.check_output(['hostname', '-i']).decode().strip()

host = client.run_on_scheduler(get_my_ip)

I wouldn't recommend that as a real solution, but it may be an OK stopgap.

palpen commented 5 years ago

Thanks @TomAugspurger. Out of curiosity, is this issue unique to Helm deployment of Dask on Kubernetes?

TomAugspurger commented 5 years ago

I'm not sure I understand the issue well enough, but I doubt it's specific to Kubernetes.

On Fri, Mar 8, 2019 at 10:39 AM Palermo Penano notifications@github.com wrote:

Thanks @TomAugspurger https://github.com/TomAugspurger. Out of curiosity, is this issue unique to Helm deployment of Dask on Kubernetes?

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/dask/dask-xgboost/issues/23#issuecomment-470992888, or mute the thread https://github.com/notifications/unsubscribe-auth/ABQHInRRjv621vNWaol5jzUjrSi5ejiwks5vUpIngaJpZM4VQsoC .

javabrett commented 5 years ago

I hit this issue just now, deploying the stable/dask Helm chart on k8s, and adding dask-xgboost as an additional package.

The problem obviously occurs when a worker tries to start the Rabit tracker on/for each worker - it does this via the Scheduler with client._run_on_scheduler.

https://github.com/dask/dask-xgboost/blob/4661c8a1d3a7f6b63ff994b944b6a6231e7c9f31/dask_xgboost/core.py#L139-L144

I'm a bit hazy on the whole n_workers thing here.

https://github.com/dask/dask-xgboost/blob/4661c8a1d3a7f6b63ff994b944b6a6231e7c9f31/dask_xgboost/core.py#L37-L48

Relevant part of my thread-dump:

/opt/conda/lib/python3.7/site-packages/dask_xgboost/core.py in _train(client, params, data, labels, dmatrix_kwargs, **kwargs)
    122     env = yield client._run_on_scheduler(start_tracker,
    123                                          host.strip('/:'),
--> 124                                          len(worker_map))
    125 
    126     # Tell each worker to train on the chunks/parts that it has locally
>-8 ----- >-8 ----- >-8 ----- >-8 ----- >-8 ----- >-8 ----- >-8 ----- >-8 -----
/opt/conda/lib/python3.7/site-packages/dask_xgboost/core.py in start_tracker()
     30     """ Start Rabit tracker """
     31     env = {'DMLC_NUM_WORKER': n_workers}
---> 32     rabit = RabitTracker(hostIP=host, nslave=n_workers)
     33     env.update(rabit.slave_envs())
     34 

/opt/conda/lib/python3.7/site-packages/dask_xgboost/tracker.py in __init__()
    166         for port in range(port, port_end):
    167             try:
--> 168                 sock.bind((hostIP, port))
    169                 self.port = port
    170                 break

OSError: [Errno 99] Cannot assign requested address

error 99 is EADDRNOTAVAIL - could be any of DNS, IP6, permissions for low port numbers??, container perms to bind, already bound etc.

Any diagnosis tricks?

javabrett commented 5 years ago

Port range looks to be hard-coded 9091:9099 ... that can't be good.

https://github.com/dask/dask-xgboost/blob/4661c8a1d3a7f6b63ff994b944b6a6231e7c9f31/dask_xgboost/tracker.py#L160-L164

javabrett commented 5 years ago

I was able to generate this logging for the failed bind, the host:port is:

dask-scheduler:9091

The problem with that when running in a k8s cluster, is that the Rabit bind occurs on the dask-scheduler pod, but it thinks that the correct hostname to bind is dask-scheduler. That name (and matching IP address that it is attempting to bind to), belongs to the service e.g.:

jhub          service/dask-scheduler   LoadBalancer   10.100.45.53     <redacted>    8786:30925/TCP,80:30333/TCP   18h

From a shell on the scheduler:

# hostname
dask-scheduler-6bc59d8cf-8w4fb

# ping dask-scheduler
PING dask-scheduler.jhub.svc.cluster.local (10.100.45.53) 56(84) bytes of data.

# ifconfig
eth0: flags=4163<UP,BROADCAST,RUNNING,MULTICAST>  mtu 9001
        inet 192.168.111.100  netmask 255.255.255.255  broadcast 192.168.111.100
        ether 3e:c3:39:d9:a4:dd  txqueuelen 0  (Ethernet)
        RX packets 40056  bytes 188906889 (180.1 MiB)
        RX errors 0  dropped 0  overruns 0  frame 0
        TX packets 23272  bytes 8636438 (8.2 MiB)
        TX errors 0  dropped 0 overruns 0  carrier 0  collisions 0

lo: flags=73<UP,LOOPBACK,RUNNING>  mtu 65536
        inet 127.0.0.1  netmask 255.0.0.0
        loop  txqueuelen 1000  (Local Loopback)
        RX packets 0  bytes 0 (0.0 B)
        RX errors 0  dropped 0  overruns 0  frame 0
        TX packets 0  bytes 0 (0.0 B)

So clearly this bind for Rabit can never work, since it references addresses of the service proxy, not the actual scheduler pod/container.

Does anyone have enough knowledge of Rabit in XGBoost to know what a reasonable fix might be? Lots of the time this is just bound to 0.0.0.0, but I still worry how the XGBoost distributed network is going to mesh in this environment and find these running Rabit servers.

TomAugspurger commented 5 years ago

The rabbit code was copied directly from xgboost I believe.

I think https://github.com/dask/dask-xgboost/issues/23#issuecomment-420272147 diagnosed the issue, and it seems that was your eventual conclusion?

As far as fixing this, I think the options are to setup the cluster networking so that dask-scheduler always resolves to the scheduler IP, even on the scheduler. Barring that, a host keyword for users to pass things manually.

Actually, a simpler solution comes to mind now. Can’t we just run get_address_host_port on the client early in train, and just use the resolved IP address from then on? Can someone test that?

javabrett commented 5 years ago

Yes, the issue and diagnosis are consistent with my observations ... earlier I was in a hurry and only skimmed those and pasted my logging capture, but the issue is fairly clear now I think.

In terms of a fix, it looks like there's a couple of options:

1. Route Rabit tracker traffic back through the Dask Scheduler Service

2. Route Rabit tracker traffic direct to the Dask Scheduler Pod

===

Either or both of these might require k8s networking tweaks. 1. is more k8s-ish??, because it hides the Dask Scheduler pod behind the service? If the pod gets redeployed, might its address change?

Neither of these tested yet, feedback most welcome.

Edit: error in above - we aren't using the standalone starter for the Rabit tracker (start_rabit_tracker), rather are calling the constructor. It does not have the auto IP mode, so we would need to see if that can be used, maybe by running the standalone entry-path?

TomAugspurger commented 5 years ago

I'm not sure I understand your proposals, but both sound too Kubernetes specific. More so than necessary. I think that

Can’t we just run get_address_host_port on the client early in train, and just use the resolved IP address from then on? Can someone test that?

is worth exploring before anyone else, but I won't have time to.

TomAugspurger commented 5 years ago

Can someone try out this patch?

diff --git a/dask_xgboost/core.py b/dask_xgboost/core.py
index 6bf29d78..669f401d 100644
--- a/dask_xgboost/core.py
+++ b/dask_xgboost/core.py
@@ -16,6 +16,7 @@ except ImportError:

 from dask import delayed
 from dask.distributed import wait, default_client
+from distributed.comm.addressing import resolve_address
 import dask.dataframe as dd
 import dask.array as da

@@ -137,7 +138,7 @@ def _train(client, params, data, labels, dmatrix_kwargs={}, **kwargs):
     ncores = yield client.scheduler.ncores()  # Number of cores per worker

     # Start the XGBoost tracker on the Dask scheduler
-    host, port = parse_host_port(client.scheduler.address)
+    host, port = parse_host_port(resolve_address(client.scheduler.address))
     env = yield client._run_on_scheduler(start_tracker,
                                          host.strip('/:'),
                                          len(worker_map))
javabrett commented 5 years ago

I tested this patch (thank you for it), with some extra logging, on AWS EKS, using the Dask stable/dask Helm chart with latest dask and the required additional packages. It did not resolve the issue, but I did not expect it to either.

The error/logging on the scheduler pod/container is:

distributed.worker - INFO - Run out-of-band function 'start_tracker'
ERROR:root:sock.bind 10.100.165.190:9091
ERROR:root:[Errno 99] Cannot assign requested address
Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/site-packages/dask_xgboost/tracker.py", line 169, in __init__
    sock.bind((hostIP, port))
OSError: [Errno 99] Cannot assign requested address
distributed.worker - WARNING -  Run Failed
Function: start_tracker
args:     ('10.100.165.190', 1)
kwargs:   {}
Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/site-packages/distributed/worker.py", line 2976, in run
    result = function(*args, **kwargs)
  File "/opt/conda/lib/python3.7/site-packages/dask_xgboost/core.py", line 41, in start_tracker
    rabit = RabitTracker(hostIP=host, nslave=n_workers)
  File "/opt/conda/lib/python3.7/site-packages/dask_xgboost/tracker.py", line 169, in __init__
    sock.bind((hostIP, port))
OSError: [Errno 99] Cannot assign requested address

The IP address 10.100.165.190 the code is attempting to bind for the Rabit tracker is the address of the service/dask-scheduler in the cluster:

service/dask-scheduler   LoadBalancer   10.100.165.190   <redacted>.elb.amazonaws.com   8786:31325/TCP,80:31535/TCP   21h

Note that this is obviously not the address available on the node/pod where the actual scheduler container is running:

pod/dask-scheduler-95c894897-l66pq   1/1     Running   0          13m

If I run on that node:

kubectl exec -it dask-scheduler-95c894897-l66pq /sbin/ip address
1: lo: <LOOPBACK,UP,LOWER_UP> mtu 65536 qdisc noqueue state UNKNOWN group default qlen 1000
    link/loopback 00:00:00:00:00:00 brd 00:00:00:00:00:00
    inet 127.0.0.1/8 scope host lo
       valid_lft forever preferred_lft forever
3: eth0@if24: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 9001 qdisc noqueue state UP group default
    link/ether f2:a0:f7:00:2d:a9 brd ff:ff:ff:ff:ff:ff link-netnsid 0
    inet 192.168.147.171/32 brd 192.168.147.171 scope global eth0
       valid_lft forever preferred_lft forever

The Client is (correctly) pointing back to the service address for the scheduler:

kubectl exec -it dask-jupyter-df6d669b8-zdhfk env | grep DASK_SCHEDULER_ADDRESS
DASK_SCHEDULER_ADDRESS=dask-scheduler:8786

So I think for this to work (with this specific Helm chart anyway), I think we have to allow the execution on the scheduler to bind the Rabit tracker to that local interface (in this case it is 192.168.147.171, or use 0.0.0.0. Then the problem is that this address is reachable via the dask-scheduler service, but port 9091 won't be proxied. Additionally, when the Rabit workers are started, they will need a correct environment via DMLC_TRACKER_URI and DMLC_TRACKER_PORT pointing to service/dask-scheduler. Alternatively, the Rabit workers could access the tracker via the internal address (if that works), on the basis that a Rabit/XGBoost job is fairly short-lived and you don't care about using the internal/ephemeral addresses.

TomAugspurger commented 5 years ago

Thanks for testing that out. I'll confess that this is getting beyond my networking knowledge.

At a high-level, this feels like the kind of thing that can be resolved inside dask-xgboost itself. I suspect it shouldn't need any modifications to the cluster networking itself. But I clearly don't understand the issue well enough to say for sure.

javabrett commented 5 years ago

@TomAugspurger thanks, and yes I think you are angling at the right approach. The XGBoost cluster should be short-lived enough that it can be allowed to run with Rabit worker->tracker(on Dask Scheduler node) addressing being internal, not via the k8s service/dask-scheduler as it currently attempts.

Perhaps I will find the time to try different bind strategies and hopefully this can be fixed in a generic way.

javabrett commented 5 years ago

Just a note that this is reproducible on a local Docker Desktop for Mac Kubernetes cluster, so it Helm chart-specific if anything, rather than specific to a particular k8s cluster type e.g. AWS EKS.

javabrett commented 5 years ago

Can anyone suggest a complete reliable test notebook, complete with data? Something that currently runs outside of k8s but (likely) fails in k8s per this issue.

TomAugspurger commented 5 years ago

I haven't tried it, but https://gist.github.com/TomAugspurger/c68c06df1f7e68c1ecba6dba2bf67710 will likely do it.

from distributed import Client
import dask
from dask_xgboost import XGBClassifier
from dask_ml.datasets import make_classification

client = Client(...)
X, y = make_classification(chunks=20)
X, y = dask.persist(X, y)
XGBClassifier().fit(X, y)
denizhankara commented 3 years ago

Any solution to this problem?

jakirkham commented 3 years ago

This library has been superseded by Dask support in XGBoost itself. Would suggest using that instead