microsoft / LightGBM

A fast, distributed, high performance gradient boosting (GBT, GBDT, GBRT, GBM or MART) framework based on decision tree algorithms, used for ranking, classification and many other machine learning tasks.
https://lightgbm.readthedocs.io/en/latest/
MIT License
16.72k stars 3.84k forks source link

[dask] [python-package] DaskLGBMRegressor training error: 'binding port 12402 failed' #3753

Closed jameslamb closed 3 years ago

jameslamb commented 3 years ago

How you are using LightGBM?

LightGBM component: Python package

Environment info

Operating System: Ubuntu 18.04

C++ compiler version: 9.3.0

CMake version: 3.16.3

Python version: 3.8.5

LightGBM version or commit hash: master (https://github.com/microsoft/LightGBM/commit/78d31d9ae34c9c8ab2b0f8704c6962c8d3510062)

Error message and / or logs

Training with DaskLGBMRegressor often fails with an error like this:

LightGBMError: Binding port 12402 failed

The error doesn't ALWAYS happen, and training sometimes succeeds. It also doesn't always reference port 12402. I've found that the first time I call DaskLGBMRegressor.fit(), I don't see this error. After that, subsequent tries often result in the error.

Here's the result of 10 calls of .fit(), with client.restart() run after each one.

  1. SUCCESS
  2. FAIL (Binding port 12403 failed)
  3. FAIL (Binding port 12402 failed)
  4. SUCCESS
  5. FAIL (Binding port 12402 failed)
  6. FAIL (Binding port 12403 failed)
  7. FAIL (Binding port 12401 failed)
  8. FAIL (Binding port 12403 failed)
  9. FAIL (Binding port 12403 failed)
  10. SUCCESS full traceback:
---------------------------------------------------------------------------
LightGBMError                             Traceback (most recent call last)
<ipython-input-17-aea611c2313b> in <module>
     27 )
     28 
---> 29 dask_reg.fit(
     30     client=client,
     31     X=data,

/opt/conda/lib/python3.8/site-packages/lightgbm/dask.py in fit(self, X, y, sample_weight, client, **kwargs)
    263     def fit(self, X, y=None, sample_weight=None, client=None, **kwargs):
    264         """Docstring is inherited from the lightgbm.LGBMRegressor.fit."""
--> 265         return self._fit(LGBMRegressor, X, y, sample_weight, client, **kwargs)
    266     fit.__doc__ = LGBMRegressor.fit.__doc__
    267 

/opt/conda/lib/python3.8/site-packages/lightgbm/dask.py in _fit(self, model_factory, X, y, sample_weight, client, **kwargs)
    209 
    210         params = self.get_params(True)
--> 211         model = _train(client, X, y, params, model_factory, sample_weight, **kwargs)
    212 
    213         self.set_params(**model.get_params())

/opt/conda/lib/python3.8/site-packages/lightgbm/dask.py in _train(client, data, label, params, model_factory, weight, **kwargs)
    151                            for worker, list_of_parts in worker_map.items()]
    152 
--> 153     results = client.gather(futures_classifiers)
    154     results = [v for v in results if v]
    155     return results[0]

/opt/conda/lib/python3.8/site-packages/distributed/client.py in gather(self, futures, errors, direct, asynchronous)
   1983             else:
   1984                 local_worker = None
-> 1985             return self.sync(
   1986                 self._gather,
   1987                 futures,

/opt/conda/lib/python3.8/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    829             return future
    830         else:
--> 831             return sync(
    832                 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
    833             )

/opt/conda/lib/python3.8/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
    338     if error[0]:
    339         typ, exc, tb = error[0]
--> 340         raise exc.with_traceback(tb)
    341     else:
    342         return result[0]

/opt/conda/lib/python3.8/site-packages/distributed/utils.py in f()
    322             if callback_timeout is not None:
    323                 future = asyncio.wait_for(future, callback_timeout)
--> 324             result[0] = yield future
    325         except Exception as exc:
    326             error[0] = sys.exc_info()

/opt/conda/lib/python3.8/site-packages/tornado/gen.py in run(self)
    733 
    734                     try:
--> 735                         value = future.result()
    736                     except Exception:
    737                         exc_info = sys.exc_info()

/opt/conda/lib/python3.8/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
   1848                             exc = CancelledError(key)
   1849                         else:
-> 1850                             raise exception.with_traceback(traceback)
   1851                         raise exc
   1852                     if errors == "skip":

/opt/conda/lib/python3.8/site-packages/lightgbm/dask.py in _train_part()
     77     try:
     78         model = model_factory(**params)
---> 79         model.fit(data, label, sample_weight=weight, **kwargs)
     80     finally:
     81         _safe_call(_LIB.LGBM_NetworkFree())

/opt/conda/lib/python3.8/site-packages/lightgbm/sklearn.py in fit()
    768             callbacks=None, init_model=None):
    769         """Docstring is inherited from the LGBMModel."""
--> 770         super().fit(X, y, sample_weight=sample_weight, init_score=init_score,
    771                     eval_set=eval_set, eval_names=eval_names, eval_sample_weight=eval_sample_weight,
    772                     eval_init_score=eval_init_score, eval_metric=eval_metric,

/opt/conda/lib/python3.8/site-packages/lightgbm/sklearn.py in fit()
    610             init_model = init_model.booster_
    611 
--> 612         self._Booster = train(params, train_set,
    613                               self.n_estimators, valid_sets=valid_sets, valid_names=eval_names,
    614                               early_stopping_rounds=early_stopping_rounds,

/opt/conda/lib/python3.8/site-packages/lightgbm/engine.py in train()
    226     # construct booster
    227     try:
--> 228         booster = Booster(params=params, train_set=train_set)
    229         if is_valid_contain_train:
    230             booster.set_train_data_name(train_data_name)

/opt/conda/lib/python3.8/site-packages/lightgbm/basic.py in __init__()
   2060                     else:
   2061                         raise ValueError("Invalid machines in params.")
-> 2062                     self.set_network(machines,
   2063                                      local_listen_port=params.get("local_listen_port", 12400),
   2064                                      listen_time_out=params.get("listen_time_out", 120),

/opt/conda/lib/python3.8/site-packages/lightgbm/basic.py in set_network()
   2196             Booster with set network.
   2197         """
-> 2198         _safe_call(_LIB.LGBM_NetworkInit(c_str(machines),
   2199                                          ctypes.c_int(local_listen_port),
   2200                                          ctypes.c_int(listen_time_out),

/opt/conda/lib/python3.8/site-packages/lightgbm/basic.py in _safe_call()
     50     """
     51     if ret != 0:
---> 52         raise LightGBMError(_LIB.LGBM_GetLastError().decode('utf-8'))
     53 
     54 

LightGBMError: Binding port 12401 failed

Steps to reproduce

This is based on https://github.com/jameslamb/talks/tree/main/recent-developments-in-lightgbm, but copying in the steps in case that repo changes in the future.

  1. Create a file Dockerfile with the following contents
FROM daskdev/dask-notebook:latest

# based on
# https://github.com/microsoft/LightGBM/pull/3515#pullrequestreview-543175003
ENV LIGHTGBM_DIR=${HOME}/LightGBM
ENV LIGHTGBM_URL="https://github.com/microsoft/LightGBM.git"
ENV LIGHTGBM_COMMIT="78d31d9ae34c9c8ab2b0f8704c6962c8d3510062"

RUN apt update && \
    apt-get install -y \
        build-essential \
        cmake

RUN git clone --recursive ${LIGHTGBM_URL} ${LIGHTGBM_DIR} && \
    cd ${LIGHTGBM_DIR} && \
    git checkout ${LIGHTGBM_COMMIT}

RUN conda install -y -c conda-forge \
    dask-cloudprovider \
    scikit-learn \
    numpy \
    pandas && \
    # install LightGBM
    cd ${LIGHTGBM_DIR}/python-package && \
    python setup.py sdist && \
    pip install dist/lightgbm*.tar.gz && \
    conda clean --all

WORKDIR /home/jovyan/testing
  1. Build an image from that file
docker build --no-cache -t dask-lgbm-testing:1 -f Dockerfile .
  1. Run Jupyter Lab in a container built with that image.
docker run \
    -v $$(pwd):/home/jovyan/testing \
    -p 8888:8888 \
    -p 8787:8787 \
    --name dask-lgbm-test \
   dask-lgbm-testing:1
  1. When that starts up, it will print a message like the one shown below. Copy the URL starting with 127.0.0.1 and paste it into a web browser, to access Jupyter Lab.
[C 22:10:48.337 LabApp] 

    To access the notebook, open this file in a browser:
        file:///home/jovyan/.local/share/jupyter/runtime/nbserver-19-open.html
    Or copy and paste one of these URLs:
        http://4cfa66d00717:8888/?token=0122fc992c4f4485937224a29d0d65e798e2168c5e20286e
     or http://127.0.0.1:8888/?token=0122fc992c4f4485937224a29d0d65e798e2168c5e20286e
  1. In a notebook, run the following code one time.
import dask.array as da
from dask.distributed import Client, LocalCluster, wait
from lightgbm.dask import DaskLGBMRegressor

n_workers = 3
cluster = LocalCluster(n_workers=n_workers)
client = Client(cluster)
client.wait_for_workers(n_workers)

print(f"View the dashboard: {cluster.dashboard_link}")

That should succeed, and if you click the printed link, you should see the Dask diagnostic dashboard.

image

  1. In that same notebook, run the code below multiple times. This code starts with a client.restart(), which clears the memory on all worker processes and removes any work from the scheduler.
client.restart()

num_rows = 1e6
num_features = 1e2
num_partitions = 10
rows_per_chunk = num_rows / num_partitions

data = da.random.random((num_rows, num_features), (rows_per_chunk, num_features))

labels = da.random.random((num_rows, 1), (rows_per_chunk, 1))

data = data.persist()
labels = labels.persist()
_ = wait(data)
_ = wait(labels)

dask_reg = DaskLGBMRegressor(
    silent=False,
    max_depth=5,
    random_state=708,
    objective="regression_l2",
    learning_rate=0.1,
    tree_learner="data",
    n_estimators=10,
    min_child_samples=1,
    n_jobs=-1,
)

dask_reg.fit(
    client=client,
    X=data,
    y=labels,
)

I expect that you'll see a similar pattern as the one noted above. Training will sometimes succeed, but often fail with an error like "cannot bind port XXXX".

Other information

I've noticed that often when this happens, it seems like maybe some of the worker processes were killed and restarted. I don't see messages about that in the logs, but the memory utilization for the workers is really uneven.

image

I've observed this behavior on FargateClusters from dask-cloudprovider and on the dask-kubernetes clusters from Saturn Cloud. So I don't think this issue is specific to the Dask docker image I used in the example below, or to the use of LocalCluster.

I've also observed this behavior using dask-lightgbm built from current master, with LightGBM 3.0.0.

Given all of that, my best guess is that there is some race condition where workers join the LightGBM cluster in a nondeterministic order, or maybe where two workers claim the same rank.

jameslamb commented 3 years ago

@ffineis I've assigned this to myself because I'm actively working on it right now. If you have ideas of stuff to try, leave a comment here.

@StrikerRUS I know you're somewhat of a LightGBM historian :joy: . If you remember similar issues with other LightGBM parallel training, please link them here.

StrikerRUS commented 3 years ago

I know you're somewhat of a LightGBM historian 😂 .

🤣

If you remember similar issues with other LightGBM parallel training, please link them here.

Unfortunately I don't... Probably @imatiach-msft faced something similar.

As a side note, I'm not sure but I guess --depth=1 will not allow to execute git checkout ${LIGHTGBM_COMMIT} with LIGHTGBM_COMMIT other than the latest commit in the repo at the time of cloning the repo.

jameslamb commented 3 years ago

As a side note, I'm not sure but I guess --depth=1 will not allow to execute git checkout ${LIGHTGBM_COMMIT} with LIGHTGBM_COMMIT other than the latest commit in the repo at the time of cloning the repo.

ha oh yeah, guess I got lucky with the commit I chose. Fixed.

jameslamb commented 3 years ago

This morning, I tested the theory that LightGBM just cleaning up the network correctly. I tried to test this by changing local_listen_port to a new value on each training run, so that each run was looking at a different and non-overlapping range of ports. But that did not work.

I added this to the beginning of each test iteration

local_listen_port += n_workers

And then ran 10 times in a row again.

n_workers = 3
local_listen_port = 12400
...
client.restart()

local_listen_port += n_workers
print(f"local_listen_port: {local_listen_port}")

num_rows = 1e6
num_features = 1e2
num_partitions = 10
rows_per_chunk = num_rows / num_partitions

data = da.random.random((num_rows, num_features), (rows_per_chunk, num_features))

labels = da.random.random((num_rows, 1), (rows_per_chunk, 1))

data = data.persist()
labels = labels.persist()
_ = wait(data)
_ = wait(labels)

dask_reg = DaskLGBMRegressor(
    silent=False,
    max_depth=5,
    random_state=708,
    objective="regression_l2",
    learning_rate=0.1,
    tree_learner="data",
    n_estimators=10,
    min_child_samples=1,
    n_jobs=-1,
    local_listen_port=local_listen_port
)

dask_reg.fit(client=client, X=data, y=labels)
  1. SUCCESS
  2. FAIL (Binding port 12409 failed)
  3. SUCCESS
  4. FAIL (Binding port 12412 failed)
  5. FAIL (Binding port 12415 failed)
  6. SUCCESS
  7. FAIL (Binding port 12421 failed)
  8. SUCCESS
  9. FAIL (Binding port 12427 failed)
  10. FAIL (Binding port 12430 failed)
jameslamb commented 3 years ago

NOTE I'm going to keep posting debugging findings here, but please feel free to unsubscribe from this issue. I'll open a PR or @ people when I think I've found a solution.


Summary

I tried that MPI version of LightGBM and found that this issue doesn't occur, but only because lightgbm.dask doesn't correctly use the MPI version of LightGBM. That can be a separate issue.

how I tested and what I found I tried building the MPI version of LigthGBM (https://lightgbm.readthedocs.io/en/latest/Installation-Guide.html?highlight=mpi#build-mpi-version). I added this to the Dockerfile ```shell RUN apt install -y openmpi-bin libopenmpi-dev ``` and changed the install ```shell pip install dist/lightgbm*.tar.gz --install-option=--mpi ``` I found that with the MPI version of LightGBM, **this issue does not happen**! 10 straight successful runs. It's confusing though, I see information in the logs that says that only one worker is participating: ```text [LightGBM] [Info] Local rank: 0, total number of machines: 1 [LightGBM] [Warning] num_threads is set=2, n_jobs=-1 will be ignored. Current value: num_threads=2 [LightGBM] [Warning] Accuracy may be bad since you didn't explicitly set num_leaves OR 2^max_depth > num_leaves. (num_leaves=31). [LightGBM] [Info] Local rank: 0, total number of machines: 1 [LightGBM] [Info] Local rank: 0, total number of machines: 1 ... [LightGBM] [Warning] Only find one worker, will switch to serial tree learner ``` But I can see in the Dask diagnostic dashboard that all three workers in my cluster are participating in training. I'm worried that maybe for the MPI version, the three worker processes are training three independent models, and then `lightgbm` is just ignoring two of them. When I run `dask_reg.to_local().booster_.trees_to_dataframe()`, I can see that the root node in the first tree only has 4e5 records. That's 40% of the training data, which is about what I'd expect one of the workers to have, since my example has three workers splitting a 1e6-row dataset that has 10 partitions. ![image](https://user-images.githubusercontent.com/7608904/104491574-0d368980-5598-11eb-8adc-86d0587787a5.png) When training succeeds in the socket-based version of `lightgbm`, I can see that all three workers' work was combined into the final model because the `count` for the root node on the first tree is 1e6 (the total number of rows in the training data). ![image](https://user-images.githubusercontent.com/7608904/104491755-4c64da80-5598-11eb-9bf2-f69098a2356c.png)
imatiach-msft commented 3 years ago

@jameslamb I think this is all expected, I don't see anything wrong with lightgbm - ports could be used for a variety of reasons. The problem is with the dask-lightgbm code, since it doesn't try different ports. In mmlspark, we have special retry logic that tries a specific port and, if it doesn't work, tries the next port.

imatiach-msft commented 3 years ago

@jameslamb this is written in scala but it might be useful:

https://github.com/Azure/mmlspark/blob/master/src/main/scala/com/microsoft/ml/spark/lightgbm/TrainUtils.scala#L411

note how we try different ports starting from a user-specified default listen port up to the number of tasks on the machine - we don't assume that some port range will just always work. Once we have the ports we are interested in we send them back to the driver which aggregates them and then sends them back to all of the workers, which then call network init and start the parallel training.

ffineis commented 3 years ago

Nice! Then it just sounds like the _build_network_params method in dask.py needs to be made a little more adaptable than the existing

addr_port_map = {addr: (local_listen_port + i) for i, addr in enumerate(worker_addresses)}

If/when the fix for finding available ports is getting developed - it'd be worth keeping in mind this related issue I've been coming across when worker memory reaches a critical mass and dask restarts the worker. Perhaps this issue could be tackled with port trying in the same go?

Screen Shot 2021-01-13 at 1 57 33 PM

When dask restarts a worker, the new worker has a different port number, so the call to get_worker().address in _train_part isn't keeping up with the list of worker addresses defined in train and we get a KeyException (the old worker address is no longer referring to a current Dask worker).

This issue was called out in dask-lightgbm just prior to 3515. Just mentioning this because it seems related to the issue at hand here.

jameslamb commented 3 years ago

thanks for noting that! and to @imatiach-msft for the ideas. I think that one issue I'm seeing is that lightgbm.dask is returning before it's cleaned up the sockets it created.

I tried running training with my example above, and in a shell I ran the following

netstat -an | grep 124

During training on a LocalCluster with three workers, I get a result like this

tcp    99154      0 127.0.0.1:55612         127.0.0.1:12402         ESTABLISHED
tcp        0 198308 127.0.0.1:12402         127.0.0.1:55612         ESTABLISHED
tcp        0      0 127.0.0.1:39462         127.0.0.1:12401         ESTABLISHED
tcp        0  39434 127.0.0.1:12401         127.0.0.1:39462         ESTABLISHED
tcp      412      0 127.0.0.1:55614         127.0.0.1:12402         ESTABLISHED
tcp        0    412 127.0.0.1:12402         127.0.0.1:55614         ESTABLISHED

Once training ends, that same command returns this

tcp        0      0 127.0.0.1:12402         127.0.0.1:55612         TIME_WAIT  
tcp        0      0 127.0.0.1:12401         127.0.0.1:39462         TIME_WAIT  
tcp        0      0 127.0.0.1:12402         127.0.0.1:55614         TIME_WAIT

I ran that every few seconds and it returned that result for about 2 minutes, then returned nothing. Once I saw that that command didn't return anything, re-running training succeeded.

So I think there are two issues:

  1. sockets created by LightGBM are still around in TIME_WAIT state for a few minutes after training ends
  2. lightgbm.dask is trying a fixed set of ports

I think the "look for available ports first" solution is going to fix this. Hopefully I'll have a PR up shortly :grinning:

More details

I'm not that familiar with the low-level details of TCP, but this blog post seems relevant to what's happening: http://www.serverframework.com/asynchronousevents/2011/01/time-wait-and-its-design-implications-for-protocols-and-scalable-servers.html.

The reason that TIME_WAIT can affect system scalability is that one socket in a TCP connection that is shut down cleanly will stay in the TIME_WAIT state for around 4 minutes. If many connections are being opened and closed quickly then socket's in TIME_WAIT may begin to accumulate on a system; you can view sockets in TIME_WAIT using netstat.

imatiach-msft commented 3 years ago

@jameslamb I agree with your findings - it would be great to find a cleaner way to close ports if possible. However, the current implementation is wrong anyway because it assumes all ports in those ranges are available. In practice, any application could be running and using some of those ports already - and the only way to find out if they are already used is actually to just try and bind to them. So even if the ports are closed in a cleaner way, it would still be better to find open ports first before assuming they are already open.

jameslamb commented 3 years ago

it would still be better to find open ports first before assuming they are already open.

totally agree, thanks for pointing me in the right direction!! I almost have something working right now, will open a PR soon.

github-actions[bot] commented 1 year ago

This issue has been automatically locked since there has not been any recent activity since it was closed. To start a new related discussion, open a new issue at https://github.com/microsoft/LightGBM/issues including a reference to this.