dmlc / xgboost

Scalable, Portable and Distributed Gradient Boosting (GBDT, GBRT or GBM) Library, for Python, R, Java, Scala, C++ and more. Runs on single machine, Hadoop, Spark, Dask, Flink and DataFlow
https://xgboost.readthedocs.io/en/stable/
Apache License 2.0
26.27k stars 8.73k forks source link

[pyspark] SparkXGBRanker inaccurate eval metric output given different number of workers & can not pass multiple eval metrics #8608

Open tracykyle93 opened 1 year ago

tracykyle93 commented 1 year ago

Hi! Thanks a lot for the quick release of 1.7.2 to hotfix #8491, now SparkXGBRanker can work more properly. Based on 1.7.2, I also have encountered 2 issues of SparkXGBRanker regarding evaluation metrics,

A sample dataset is attached to reproduce the results https://github.com/lezzhov/learning_to_rank/tree/main/learning_to_rank/data/train.txt

1) inaccurate eval metric output given different number of workers while I use num_workers=1 and num_workers=4 and same parameter set to train the model, the results of model._xgb_sklearn_model.best_score are very different, I also write a manual eval function and the results also do not align with model._xgb_sklearn_model.best_score under both cases, the eval metric I used for early stopping is "NDCG@10"

internal ndcg@10 on validation data ~ model._xgb_sklearn_model.best_score

num_workers = 1 manual train ndcg@10 -- 0.5931561486501443, internal ndcg@10 on validation data -- 0.46678797961901586, manual valid ndcg@10 -- 0.43402427434921265

num_workers = 4 manual train ndcg@10 -- 0.5499394648169217, internal ndcg@10 on validation data -- 0.6978460945534216, manual valid ndcg@10 -- 0.43996799528598785

Could you take a look at the eval metric logics and fix it? Here are the code to reproduce:

from pyspark.sql import SparkSession, SQLContext, Row
from pyspark.sql.types import *
from pyspark.mllib.linalg import Vectors,VectorUDT, SparseVector
from pyspark.sql.functions import monotonically_increasing_id 
import pyspark.sql.functions as F
from pyspark.ml.functions import vector_to_array
from xgboost.spark import SparkXGBRanker
import numpy as np
import random

def read_libsvm(filepath, spark_session):
    with open(filepath, 'r') as f:
        raw_data = [x.split() for x in f.readlines()]
    outcome = [int(x[0]) for x in raw_data]
    qid = [int(x[1].split(':')[1]) for x in raw_data]
    index_value_dict = list()
    for row in raw_data:
        index_value_dict.append(dict([(int(x.split(':')[0]), float(x.split(':')[1]))
                                       for x in row[2:]]))
    max_idx = max([max(x.keys()) for x in index_value_dict])
    rows = [
        Row(
            label=outcome[i],
            group_id = qid[i],
            feat_vector=SparseVector(max_idx + 1, index_value_dict[i])
        )
        for i in range(len(index_value_dict))
    ]
    df = spark_session.createDataFrame(rows)
    return df

def map_column_value_to_rn(data, input_column, rn_column):
    distinct_column_values = data.select(input_column).distinct()
    map_to_rn = F.udf(lambda x: random.random(), FloatType()).asNondeterministic()
    column_value_to_rn_mapping = distinct_column_values.withColumn(rn_column, map_to_rn(input_column))
    return data.join(column_value_to_rn_mapping, how='inner', on=input_column)

def split_data(data, split_by, valid_split=.2, test_split=.0):
    RN_COLUMN = 'rn'
    data = map_column_value_to_rn(data, split_by, RN_COLUMN)
    data.cache()
    train_data = data.filter(data[RN_COLUMN] >= test_split)
    train_data = train_data.withColumn('validationCol', F.when(F.col(RN_COLUMN) >= test_split + valid_split, F.lit(False)).otherwise(F.lit(True)))
    train_data = train_data.drop(RN_COLUMN)
    train_data_group = train_data.select('group_id').distinct()
    train_data_group = train_data_group.coalesce(1).withColumn("qid", monotonically_increasing_id())
    train_data = train_data.join(F.broadcast(train_data_group), ['group_id'])
    return train_data

def dcg(y_true, y_score, k, formula='traditional'):
    order = np.argsort(y_score)[::-1]
    y_true = np.take(y_true, order[:k])
    if formula=='alternative':
        gain = 2 ** y_true - 1
    else:
        gain = y_true 
    discounts = np.log2(np.arange(len(y_true)) + 2)
    return np.sum(gain / discounts)

@F.udf(returnType=FloatType())
def ndcg(y_true, y_score, k):
    formula = 'alternative'
    if not k:
        k = len(y_true)
    return float(dcg(y_true, y_score, k, formula) / dcg(y_true, y_true, k, formula))

def get_ndcg(model, df):
    transformed_df = model.transform(df)
    transformed_df_group = transformed_df.groupby('group_id').agg(F.count('label').alias('group'), F.count('prediction').alias('group_pred'), F.collect_list('prediction').alias('pred'), F.collect_list('label').alias('label'))
    transformed_df_group = transformed_df_group.withColumn('ndcg@10', ndcg(F.col('label'), F.col('pred'), F.lit(10)))
    ndcg10_df = transformed_df_group.where(~F.isnan(F.col('ndcg@10'))).select(F.mean(F.col('ndcg@10')).alias('ndcg@10')).cache()
    ndcg10 = ndcg10_df.collect()[0][0]
    return ndcg10

sparkSession = (SparkSession
                .builder
                .config('spark.dynamicAllocation.enabled', 'false')
                .appName('clean-up')
                .enableHiveSupport()
                .getOrCreate())

sqlContext = SQLContext(sparkSession)

df = read_libsvm("train.txt", sparkSession)

df = df.withColumn('features', vector_to_array('feat_vector', dtype="float32"))

df_train = split_data(df, 'group_id', valid_split=.2, test_split=.0)

params = {"objective": "rank:ndcg",
        "eval_metric": "ndcg@10"}

ranker1 = SparkXGBRanker(qid_col="qid", features_col="features", validation_indicator_col="validationCol", early_stopping_rounds=5, num_workers=1)
ranker1.setParams(**params)
model1 = ranker1.fit(df_train)
ndcg10_train1 = get_ndcg(model1, df_train.filter(F.col("validationCol") == False))
ndcg10_valid1 = get_ndcg(model1, df_train.filter(F.col("validationCol") == True))
print('manual train ndcg@10 -- {}, internal ndcg@10 on validation data -- {}, manual valid ndcg@10 -- {}'.format(ndcg10_train1, model1._xgb_sklearn_model.best_score, ndcg10_valid1))
ranker2 = SparkXGBRanker(qid_col="qid", features_col="features", validation_indicator_col="validationCol", early_stopping_rounds=5, num_workers=4)
ranker1.setParams(**params)
model2 = ranker2.fit(df_train)
ndcg10_train2 = get_ndcg(model2, df_train.filter(F.col("validationCol") == False))
ndcg10_valid2 = get_ndcg(model2, df_train.filter(F.col("validationCol") == True))
print('manual train ndcg@10 -- {}, internal ndcg@10 on validation data -- {}, manual valid ndcg@10 -- {}'.format(ndcg10_train2, model2._xgb_sklearn_model.best_score, ndcg10_valid2))

###################################################

2) SparkXGBRanker can not pass multiple eval metrics SparkXGBRanker does not support passing multiple eval metrics yet, could you add this feature into roadmap?

params = {"objective": "rank:ndcg", "eval_metric": ["ndcg@5", "ndcg@10"]}

ranker3 = SparkXGBRanker(qid_col="qid", features_col="features", validation_indicator_col="validationCol", early_stopping_rounds=5, num_workers=1)

ranker3.setParams(**params)

model3 = ranker3.fit(df_train)

The error is that:

ValueError                                Traceback (most recent call last)
Cell In [9], line 6
      4 ranker3 = SparkXGBRanker(qid_col="qid", features_col="features", validation_indicator_col="validationCol", early_stopping_rounds=5, num_workers=1)
      5 ranker3.setParams(**params)
----> 6 model3 = ranker3.fit(df_train)

File /usr/lib/spark/python/pyspark/ml/base.py:161, in Estimator.fit(self, dataset, params)
    159         return self.copy(params)._fit(dataset)
    160     else:
--> 161         return self._fit(dataset)
    162 else:
    163     raise ValueError("Params must be either a param map or a list/tuple of param maps, "
    164                      "but got %s." % type(params))

File /opt/conda/default/lib/python3.8/site-packages/xgboost/spark/core.py:647, in _SparkXGBEstimator._fit(self, dataset)
    645 def _fit(self, dataset):
    646     # pylint: disable=too-many-statements, too-many-locals
--> 647     self._validate_params()
    648     label_col = col(self.getOrDefault(self.labelCol)).alias(alias.label)
    650     select_cols = [label_col]

File /opt/conda/default/lib/python3.8/site-packages/xgboost/spark/estimator.py:358, in SparkXGBRanker._validate_params(self)
    357 def _validate_params(self):
--> 358     super()._validate_params()
...
    305         self.isDefined(self.validationIndicatorCol)
    306         and self.getOrDefault(self.validationIndicatorCol)
    307     ):

ValueError: Only string type 'eval_metric' param is allowed.
trivialfis commented 1 year ago

Let me take a deeper look into LTR in general after sorting out https://github.com/dmlc/xgboost/pull/8272 .