microsoft / LightGBM

A fast, distributed, high performance gradient boosting (GBT, GBDT, GBRT, GBM or MART) framework based on decision tree algorithms, used for ranking, classification and many other machine learning tasks.
https://lightgbm.readthedocs.io/en/latest/
MIT License
16.54k stars 3.82k forks source link

Trainings binary logloss increasing in some iterations in voting-parallel setting #4414

Open freetz-tiplu opened 3 years ago

freetz-tiplu commented 3 years ago

Description

Some time ago I encountered the problem that when I did not use min_data_in_leaf with a higher value than default, that the training's binary logloss would increase in some iterations. Spikes would occur which varied in size. Some small but some really big as shown in the example. distributed_metrics I always used gbdt. The spikes only occured in a distributed setting and i could observe them for data and voting-parallel training. I did not test it in a feature parallel setting. After some testing I could see that the source of the error had to lie in LightGBMs C++-Code, but could not find the specific location. Then I saw this issue #4026. I observed that in my trees the leaf-values became really high as well in the iterations where spikes occured. After installing the fix #4185. This solved the problem for the data-parallel case and improved solutions for voting-parallel. But small spikes would still occur in the voting-parallel case. distributed_metrics_voting

The issue is mainly there to point out that #4026 is not yet fully resolved for the voting-parallel case. Perhaps the increase in binary_loglosses has another cause in the voting-parallel case as well

Reproducible example

I did not manage to create a reproducible example for the error does not happen every time.

Environment info

LightGBM version or commit hash: first version: 3.1.1.99 version with fix: 3.2.1.99

Command(s) you used to install LightGBM

git clone --recursive https://github.com/microsoft/LightGBM
cd LightGBM
mkdir build
cd build
cmake ..
make -j4
StrikerRUS commented 3 years ago

cc @shiyu1994

shiyu1994 commented 3 years ago

@freetz-tiplu Thanks for using LightGBM. What's the size of your data and how many features are there? Could you please turn up top_k parameter to see if the fluctuation occurs. Ideally if top_k equals the total number of features, voting parallel should work the same as data parallel.

freetz-tiplu commented 3 years ago

@shiyu1994 Sorry for the late reply. In one example I had 60k samples and 10k features and the problem occurred. setting top_k to the number of features strangely made no difference. Still, a relatively large degradation of binary_logloss occurred in one iteration. However, if I use the same setting again (same training config, data and features) but set tree_learner: datathe problem no longer occurs.

shiyu1994 commented 3 years ago

@freetz-tiplu Could you provide an example, even if the error does not occur every time? I can run the example multiple times to catch the error. It would be very helpful for us to identify the problem. Thank you!

freetz-tiplu commented 3 years ago

I had an example but when I converted my data to CSV-files and tested it on the C++-Code, the problem did not occur. I loaded the CSVs in python and tested it again with dask and two local workers but the spikes did not occur. When I used my normal data format however, I could observe single spikes again. I will investigate this further and will hopefully be able to prepare an example or maybe find the issue somewhere else. What I can say is that I could observe the spikes when I had a lot of binary features. I can't just share the data because of privacy reasons. But I will definitely report back

freetz-tiplu commented 3 years ago

I created an example but

  1. the smallest example I could find where the problem occured has 27k samples for each worker. The CSVs were too big, which is why I made them publicly available here: https://www.dropbox.com/sh/1y7x08hlskfbw4a/AADxoTx0_9Cjo352SZBSD_l0a?dl=0 There is one CSV for each worker. The first column denotes the label and the the first row contains the column number as feature names. The example contains 9151 features and a lot of them are binary.
  2. I could not find an example where the spikes would occur using the lightgbm-executable directly . I could only recreate the spikes by creating a small python example with two local workers using dask. I will provide the python code for this test below but I will nevertheless explain at the end how I tested it with C++ only (maybe I made a mistake in the config files)

Python example:

import lightgbm as lgb
import pandas as pd
import numpy as np
import lightgbm.callback as lgb_callback
import scipy
import dask.distributed

"""
How to start:
1. Start dask-scheduler:
> dask-scheduler --host 127.0.0.1          
2. Start both dask-workers:
First terminal (workerA): 
> dask-worker tcp://127.0.0.1:8786 --name workerA --nthreads 1 --memory-limit="10 GiB"              
Second terminal (workerB):
> dask-worker tcp://127.0.0.1:8786 --name workerB --nthreads 1 --memory-limit="10 GiB"              
3. Start the skript using python
"""
######################################Helper####################################
def check_metrics(worker_met):
    scores = worker_met['train']['logsumexp_logloss_from_raw']
    last_value = np.inf
    for i, value in enumerate(scores):
        if value > last_value:
            print(f"Spike in iteration {i}: last_value: {last_value}, value: {value}, diff: {value - last_value}")
        last_value = value

######################################Callbacks####################################
def logsumexp_logoss(y_true, raw_score):
    y_true = y_true.astype(float)
    raw_score = raw_score.astype(float)
    return np.mean(np.array([
        scipy.special.logsumexp([0, -s]) if y else scipy.special.logsumexp([0, s])
        for y, s in zip(y_true, raw_score)]))

def record_evaluation_logsumexp(eval_result, evals):
    eval_result.clear()

    def callback(env: lgb_callback.CallbackEnv):
        bst = env.model
        for lgbdata, eval_name in evals:
            raw_prediction_score = bst.predict(lgbdata.get_data(), raw_score=True)
            y_true = lgbdata.get_label().astype(bool)
            score = logsumexp_logoss(y_true, raw_prediction_score)
            eval_result.setdefault(eval_name, {})
            eval_result[eval_name].setdefault('logsumexp_logloss_from_raw', []).append(score)
    callback.order = 30
    return callback

######################################Dask distributed functions####################################
def set_data_path(p, local_listen_port):
    worker = dask.distributed.get_worker()
    worker._data_path = p
    worker._local_listen_port = local_listen_port
    print(f"Data path set: {p}")

def train_on_workers(lgb_params):
    worker = dask.distributed.get_worker()
    worker_name = worker.name
    print(f"Train on worker {worker_name}")

    lgb_params['local_listen_port'] = worker._local_listen_port

    # load from CSV
    train = pd.read_csv(worker._data_path)
    y = train.iloc[:, 0].values
    x = train.iloc[:, 1:].values
    train_data = lgb.Dataset(x, label=y)

    train_sets = [train_data]
    train_names = ['train']

    metrics = {}
    callbacks = [record_evaluation_logsumexp(metrics, list(zip(train_sets, train_names)))]

    bst = lgb.train(lgb_params, train_set=train_data, callbacks=callbacks, verbose_eval=3)

    return metrics

######################################Main####################################
train_A = "path/to//train27kA.csv"      # TOCHANGE
train_B = "path/to//train27kB.csv"      # TOCHANGE

dask_scheduler = "127.0.0.1:8786"

worker_lgb_ports = {
    'workerA': 12345,
    'workerB': 12346
}
dask_client = dask.distributed.Client(dask_scheduler)
workers_info = dask_client.scheduler_info()["workers"]
worker_addresses = {worker_details["name"]: worker_address
                    for worker_address, worker_details in workers_info.items()}

paths_set = []
for worker_name, worker_address in worker_addresses.items():
    if 'A' in worker_name:
        p = train_A
        local_listen_port = worker_lgb_ports['workerA']
    else:
        p = train_B
        local_listen_port = worker_lgb_ports['workerB']
    paths_set.append(
        dask_client.submit(set_data_path, p=p, pkl_path=None, local_listen_port=local_listen_port, workers=worker_address, pure=False)) 
dask_client.gather(paths_set)

machines = "127.0.0.1:12346,127.0.0.1:12345"

lgb_params = {
    'boosting_type': 'gbdt',
    'objective': 'binary',
    'metric': 'binary_logloss',
    'metric_freq': 1,
    'is_training_metric': True,
    # deterministic
    'deterministic': True,
    'seed': 1,
    'data_random_seed': 0,
    'force_row_wise': True,
    # data and tree
    'num_trees': 60,
    'num_leaves': 31,
    'num_threads': 1,
    'tree_learner': 'voting',
    'top_k': 20,
    # machines
    'num_machines': 2,
    'machines': machines,
    'local_listen_port': 12346,
    'device_type': 'cpu',
    'verbose': 3,
}

metrics_fut = [dask_client.submit(train_on_workers, lgb_params=lgb_params, workers=worker_address, pure=False)
               for worker_address in worker_addresses.values()]
metrics = dask_client.gather(metrics_fut)

print("Check first metrics")
check_metrics(metrics[0])
print("\nCheck second metrics")
check_metrics(metrics[1])

The result I get when I execute this example looks like this: Check first metrics
Spike in 24: last_value: 0.1525975067299966, value: 0.15707188402542382, diff: 0.004474377295427212
Check second metrics
Spike in 47: last_value: 0.11228408356132608, value: 0.12197120057152273, diff: 0.009687117010196658


When testing the data using the lightgbm-executable only, I used the following configurations mlist.txt:

127.0.0.1 12345
127.0.0.1 12346

trainA/B.conf:

task = train
boosting_type = gbdt
objective = binary
metric = binary_logloss
metric_freq = 1
is_training_metric = true

deterministic = True
seed = 1
data_random_seed = 0
force_row_wise = True

data = train27kA.csv  /   train27kB.csv       #Depending on the worker
num_trees = 60
num_leaves = 31
num_threads = 1
tree_learner = voting
top_k = 20

num_machines = 2
machine_list_file = mlist.txt
local_listen_port = 12345 / 12346            #Depending on the worker
device_type = cpu
label_column = 0

Also, the logloss differs slightly in general between the python example and the C++ example starting from the first iteration. Maybe this can help you as well.

shiyu1994 commented 2 years ago

Has this been fixed by #4542? If not, we should reopen this issue.

jameslamb commented 2 years ago

oh no I don't think so! Maybe something in the language I used in that issue led to this being closed automatically.

guolinke commented 2 years ago

@freetz-tiplu The assumption of voting parallel is, there are enough data samples per node (machine), so the local best features could be the global best features. Therefore, when local samples are not enough, it is hard to say what will happen.

freetz-tiplu commented 2 years ago

@guolinke I don't think that this should be a problem in my case. Or do you propose that this might be the reason why the error still occured in my example mentioned above?

shiyu1994 commented 2 years ago

@freetz-tiplu Thanks a lot for preparing the reproducible example. I'm back to look into this.

abhineetgupta commented 7 months ago

I'm experiencing this issue with lightgbm 4.1.0.

I can't tell from the discussion whether this issue was fixed, either by https://github.com/microsoft/LightGBM/pull/4542 or by https://github.com/microsoft/LightGBM/issues/5153. I see that the issue is open so perhaps it's unresolved.