microsoft / LightGBM

A fast, distributed, high performance gradient boosting (GBT, GBDT, GBRT, GBM or MART) framework based on decision tree algorithms, used for ranking, classification and many other machine learning tasks.
https://lightgbm.readthedocs.io/en/latest/
MIT License
16.53k stars 3.82k forks source link

[dask] Binding Port Failed when Using the Machines Parameter #4960

Open rudra0713 opened 2 years ago

rudra0713 commented 2 years ago

Description

In a single machine with 8 workers, I am trying to implement Distributed Lightgbm. To understand the role of the machines parameter. I have created a list of machines with 8 new ports (all of these ports are open). However, every time I pass the list of machines, I am getting the error "Binding Socket Failed" for all of the newly added ports. I am trying to figure out what I am doing wrong here.

Reproducible example

from dask.distributed import Client, wait
from sklearn.datasets import make_classification
import dask.array as da
from datetime import datetime
from dask.distributed import get_client
import dask, time
import lightgbm as lgb

n_samples, n_features, num_test_samples = 100000, 30, 100
dask.config.set({'distributed.worker.daemon': False})
num_of_workers = 8

def invoke_dis_lgbm(X, y, number_of_estimators, port_counter):
    client_lgbm = get_client()
    machines_list = []
    additional_ports = [i for i in range(port_counter, port_counter + num_of_workers)]
    machine_adidtional_ports = ['127.0.0.1:' + str(port) for port in additional_ports]
    machines_list += machine_adidtional_ports
    machines = ','.join(m for m in machines_list)

    print("machines -> ", machines) 
    # 127.0.0.1:10008,127.0.0.1:10009,127.0.0.1:10010,127.0.0.1:10011,127.0.0.1:10012,127.0.0.1:10013,127.0.0.1:10014,127.0.0.1:10015

    lgbm_cls = lgb.DaskLGBMClassifier(client=client_lgbm,objective='binary', n_estimators=number_of_estimators,                                
                machines=machines
    )
    lgbm_cls.fit(X, y)
    return

def main(client):
    global port_counter
    print(f'n_samples={n_samples}, n_features={n_features}')
    X_local, y_local = make_classification(n_samples=n_samples, n_features=n_features, random_state=12345)
    number_of_classes = len(set(y_local))
    # X and y are train features and labels
    # X_ and y_ are test features and labels
    X = da.from_array(X_local, chunks=(n_samples//8, n_features), name='train_feature')
    y = da.from_array(y_local, chunks=(n_samples//8), name='train_label')

    print("x partitions count ", X.npartitions)

    X = client.persist(X)
    _ = wait(X)
    y = client.persist(y)
    _ = wait(y)
    port_counter = 10008

    for i in range(100, 105):
        dis_res = invoke_dis_lgbm(X, y, i, port_counter)
    return

if __name__ == '__main__':
    with Client('127.0.0.1:8786') as client:
        main(client)

Here is the full log:

File "test_dis_lgbm_alone_final_git_2.py", line 61, in <module>
    main(client)
  File "test_dis_lgbm_alone_final_git_2.py", line 53, in main
    dis_res = invoke_dis_lgbm(X, y, 100, port_counter)
  File "test_dis_lgbm_alone_final_git_2.py", line 29, in invoke_dis_lgbm
    lgbm_cls.fit(X, y)
  File "/scratch/rudra_mysql/py_new_3.8.6/lib64/python3.8/site-packages/lightgbm/dask.py", line 1170, in fit
    return self._lgb_dask_fit(
  File "/scratch/rudra_mysql/py_new_3.8.6/lib64/python3.8/site-packages/lightgbm/dask.py", line 1050, in _lgb_dask_fit
    model = _train(
  File "/scratch/rudra_mysql/py_new_3.8.6/lib64/python3.8/site-packages/lightgbm/dask.py", line 789, in _train
    results = client.gather(futures_classifiers)
  File "/scratch/rudra_mysql/py_new_3.8.6/lib64/python3.8/site-packages/distributed/client.py", line 1966, in gather
    return self.sync(
  File "/scratch/rudra_mysql/py_new_3.8.6/lib64/python3.8/site-packages/distributed/client.py", line 860, in sync
    return sync(
  File "/scratch/rudra_mysql/py_new_3.8.6/lib64/python3.8/site-packages/distributed/utils.py", line 326, in sync
    raise exc.with_traceback(tb)
  File "/scratch/rudra_mysql/py_new_3.8.6/lib64/python3.8/site-packages/distributed/utils.py", line 309, in f
    result[0] = yield future
  File "/scratch/rudra_mysql/py_new_3.8.6/lib64/python3.8/site-packages/tornado/gen.py", line 762, in run
    value = future.result()
  File "/scratch/rudra_mysql/py_new_3.8.6/lib64/python3.8/site-packages/distributed/client.py", line 1831, in _gather
    raise exception.with_traceback(traceback)
  File "/scratch/rudra_mysql/py_new_3.8.6/lib64/python3.8/site-packages/lightgbm/dask.py", line 322, in _train_part
    model.fit(
  File "/scratch/rudra_mysql/py_new_3.8.6/lib64/python3.8/site-packages/lightgbm/sklearn.py", line 1041, in fit
    super().fit(
  File "/scratch/rudra_mysql/py_new_3.8.6/lib64/python3.8/site-packages/lightgbm/sklearn.py", line 769, in fit
    self._Booster = train(
  File "/scratch/rudra_mysql/py_new_3.8.6/lib64/python3.8/site-packages/lightgbm/engine.py", line 217, in train
    booster = Booster(params=params, train_set=train_set)
  File "/scratch/rudra_mysql/py_new_3.8.6/lib64/python3.8/site-packages/lightgbm/basic.py", line 2570, in __init__
    self.set_network(
  File "/scratch/rudra_mysql/py_new_3.8.6/lib64/python3.8/site-packages/lightgbm/basic.py", line 2714, in set_network
    _safe_call(_LIB.LGBM_NetworkInit(c_str(machines),
  File "/scratch/rudra_mysql/py_new_3.8.6/lib64/python3.8/site-packages/lightgbm/basic.py", line 127, in _safe_call
    raise LightGBMError(_LIB.LGBM_GetLastError().decode('utf-8'))
lightgbm.basic.LightGBMError: Binding port 10010 failed

In the scheduler log, I can see that for all ports, port binding has failed.

Environment Information: Linux Lightgbm: 3.3.2.99 (built from source but I have already tried with pip install) dask: 2021.9.1 distributed: 2021.9.1 python 3.8.6

jameslamb commented 2 years ago

Thanks for your interest in LightGBM. I'll be able to look at this and provide a response in the coming days.

To ensure that maintainers can quickly give you an answer, please provide the following additional information (some of which was requested in the issue template):

Please also try to reduce the size of the example code to the simplest possible example which reproduces this behavior. For example, networking issues are unlikely to be related to parameters like learning_rate or bagging_fraction.

jmoralez commented 2 years ago

@rudra0713 did you run this twice in a row? The ports take a bit of time to be released once used, so if you try to reuse them the binding will most likely fail. Take the following example:

from urllib.parse import urlparse

import dask.array as da
import lightgbm as lgb
from dask.distributed import Client

client = Client(n_workers=2, threads_per_worker=1)
worker_addresses = client.scheduler_info()['workers'].keys()
one_worker_address = next(iter(worker_addresses))
host = urlparse(one_worker_address).hostname
machines = ','.join([f'{host}:{port}' for port in (12400, 12401)])
dX = da.random.random((1_000, 2), chunks=(250, 2))
dy = da.random.random(1_000, chunks=(250,))
lgb.DaskLGBMRegressor(n_estimators=5, machines=machines).fit(dX, dy)  # succeeds
lgb.DaskLGBMRegressor(n_estimators=5, machines=machines).fit(dX, dy)
# LightGBMError: Binding port 12401 failed
rudra0713 commented 2 years ago

@rudra0713 did you run this twice in a row? The ports take a bit of time to be released once used, so if you try to reuse them the binding will most likely fail. Take the following example:

from urllib.parse import urlparse

import dask.array as da
import lightgbm as lgb
from dask.distributed import Client

client = Client(n_workers=2, threads_per_worker=1)
worker_addresses = client.scheduler_info()['workers'].keys()
one_worker_address = next(iter(worker_addresses))
host = urlparse(one_worker_address).hostname
machines = ','.join([f'{host}:{port}' for port in (12400, 12401)])
dX = da.random.random((1_000, 2), chunks=(250, 2))
dy = da.random.random(1_000, chunks=(250,))
lgb.DaskLGBMRegressor(n_estimators=5, machines=machines).fit(dX, dy)  # succeeds
lgb.DaskLGBMRegressor(n_estimators=5, machines=machines).fit(dX, dy)
# LightGBMError: Binding port 12401 failed

No, I have run this code more than 5 times and every time, I restarted the cluster. Plus, there was significant time delays between each try.

rudra0713 commented 2 years ago

@jameslamb Sorry about the missing information. I have added environment details and simplified my code.

rudra0713 commented 2 years ago

On a related note, sometimes, I have seen messages like "Connecting to rank 7 failed, waiting for 120 ms" in the scheduler log. This originates from linkers_socket.cpp file. Can you kindly tell me what does "rank" mean and what's the meaning of "connecting to rank failed"?

jmoralez commented 2 years ago

Ranks are associated to each machine in your cluster, so if you have 8 machines you'll have rank 0, rank 1, ..., rank 7. What that message means is that the cluster couldn't connect to that machine and if those messages keep printing then it most likely means that machine died.

rudra0713 commented 2 years ago

Hi @jmoralez @jameslamb , is there any update on this? My goal is to use the LightGBM socket version and submit multiple sequential calls. For examples,

for i in range(100, 150):
    dis_res = invoke_dis_lgbm(X, y, i, port_counter)

Lightgbm uses random open ports for each trial, but in my production system, I do not have the capability to open a large pool of ports just so LightGBM can choose from them. What I need to do is give LightGBM a fixed set of ports, and make sure, that each trial uses the same set of port. That's why, I was relying on the machines parameter, but as I have already showed, in the log, it says binding port failed. Am I incorrectly setting the machines parameter?

rudra0713 commented 2 years ago

Hi @jmoralez @jameslamb, I think, I have found the underlying problem for the socket binding failed error. As I mentioned already, I am using the machines param so that LightGBM uses a fixed set of port for each sequential trial. However, after each trial finishes (which takes approximately 7-8 sec in my case), it takes almost 60-70 sec, before all those ports are open again! From my runs, I saw that after trial 1, 2-3 ports will be open almost instantly, but the rest are still in use for a long period of time. Does this look like a bug to you or is it happening because of some communications issues between Dask and Lightgbm?

jameslamb commented 2 years ago

it takes almost 60-70 sec, before all those ports are open again.

Implementations of the TCP protocol often place a socket into a state called TIME_WAIT for some period of time (on the order of a few minutes) after the socket has been closed on either side of the connection. That is done to prevent accidental delivery of data meant for one process to another.

This isn't something I'm deeply familiar with, but there are many discussions about this on Stack Overflow and other sites, e.g. https://serverfault.com/a/329846.

Does this look like a bug...?

It does not look like a bug to me. I don't believe current maintainers or the original authors of LightGBM's distributed training anticipated a use case like yours where you want to do many back-to-back training runs in very quick succession using a fixed set of machines + ports.

Am I incorrectly setting the machines parameter?

You are correct that machines is the mechanism to use to limit LightGBM to a specific set of ports. It was designed for exactly the situation you've described, where limited ports are available based on your organization's firewall rules.

I don't know if you're setting machines correctly, as you haven't shared with us how you are setting machines yet. All the examples you've provided so far have been using a single local Dask cluster on 127.0.0.1.

rudra0713 commented 2 years ago

Does this look like a bug...?

It does not look like a bug to me. I don't believe current maintainers or the original authors of LightGBM's distributed training anticipated a use case like yours where you want to do many back-to-back training runs in very quick succession using a fixed set of machines + ports.

For example, let's say, I am doing hyperparameter optimization. I have some number of lightgbm configurations (where number of estimators or max depth of a tree are being varied). I want to submit all these Lightgbm calls parallelly or sequentially in a loop while using a fixed set of machines+ports. My dataset is quite large enough, so I need the support from dask/ distributed. What's your suggestion then regarding how to proceed about this?

I don't know if you're setting machines correctly, as you haven't shared with us how you are setting machines yet. All the examples you've provided so far have been using a single local Dask cluster on 127.0.0.1.

This is how I started using the machines parameter. port_counter is set to a known open port in my node like 10001 or 5501. Basically, additional_ports holds a list of ports that I know to be open in my machine(s).

    client_lgbm = get_client()
    machines_list = [key.split('//')[1].split(":")[0] for key in client_lgbm.has_what()]
    additional_ports = range(port_counter, port_counter + num_of_workers)
    machines_list = [machines_list[index] + ":" + str(port) for index, port in enumerate(additional_ports)]
    machines = ','.join(m for m in machines_list)
    lgbm_cls = lgb.DaskLGBMClassifier(client=client_lgbm, objective='binary', n_estimators=number_of_estimators,
                                machines=machines)
jameslamb commented 2 years ago

Are you familiar with TCP sockets and C++?

I'd support a change to make LightGBM's distributed training more resilient to the situation where it tries to bind to a port that has an existing socket on it in a status like TIME_WAIT.

For example, LightGBM could wait longer before raising the following exception

https://github.com/microsoft/LightGBM/blob/d517ba12f2e7862ac533908304dddbd770655d2b/src/network/linkers_socket.cpp#L131

Alternatively, LightGBM could be more aggressive about forcibly closing sockets and not leaving them in TIME_WAIT (see, for example, this Stack Overflow answer).

If you're not comfortable attempting such contributions, then we could take that as a feature request.


Some things you could try that would work without any code changes in LightGBM: