microsoft / LightGBM

A fast, distributed, high performance gradient boosting (GBT, GBDT, GBRT, GBM or MART) framework based on decision tree algorithms, used for ranking, classification and many other machine learning tasks.
https://lightgbm.readthedocs.io/en/latest/
MIT License
16.6k stars 3.83k forks source link

Forwarding values to custom loss function for semiparametric estimation. #5917

Closed meh2135 closed 3 months ago

meh2135 commented 1 year ago

Summary

Forward columns of your dataset directly to a custom objective function.

Motivation

This is useful for semi-parametric models like poisson process regression where y | x, t ~ Poisson(mu(x) * t) and we wish to learn mu(), or a variance regression where mean is known, ex: y | x, m ~ Logistic(m, exp(s(x)) and we wish to learn s().

It's possible that lightgbm would pick up on that model if you were to just feed it those parameters as features with no additional context, but consider the case where you're using the poisson process regression for causal inference, and the the time intervals are correlated with treatment status. Including them naively could lead to unpredictable biases.

Description

My optimal intferface would look something like this

reg = CustomReg(objective=obj_func_with_param_named_time)
reg.fit(x, y, custom_objective_train_params={"time":t})

I recognize that this is probably not widely useful enough to add as a feature to main, and might need some custom work for each use case, but I'm really struggling to make it work for my case. I've tried modifying the Dataset, the objective function wrapper, eval function wrapper, and inheriting from LGBMRegressor modifying the fit method to look for loc and log_scale parameters, as below, but something seems to be going wrong during fitting. My best guess is that there is some sorting by covariates happening at each boosting stage that doesn't apply to my hackily tacked on dataset attributes, so they end up misaligned with predictions and labels. I'm hoping that doesn't mean it requires digging into the c++...

I would REALLY appreciate anyones advice on how to do this.

from typing import Any, Callable, Dict, List, Optional, Tuple, Union
import copy
from inspect import signature
import numpy as np
import lightgbm as lgb

_choose_param_value = lgb.sklearn._choose_param_value
_ConfigAliases = lgb.sklearn._ConfigAliases
_log_warning = lgb.sklearn._log_warning

def copy_signature(base):
    def decorator(func):
        func.__signature__ = signature(base)
        return func
    return decorator

class Dataset(lgb.sklearn.Dataset):

    def __init__(self, data, label=None, reference=None,
                 weight=None, group=None, init_score=None, silent='warn',
                 loc = None, log_scale=None,
                 feature_name='auto', categorical_feature='auto', params=None,
                 free_raw_data=True):
        """Initialize Dataset.

        Parameters
        ----------
        data : str, pathlib.Path, numpy array, pandas DataFrame, H2O DataTable's Frame, scipy.sparse, Sequence, list of Sequence or list of numpy array
            Data source of Dataset.
            If str or pathlib.Path, it represents the path to a text file (CSV, TSV, or LibSVM) or a LightGBM Dataset binary file.
        label : list, numpy 1-D array, pandas Series / one-column DataFrame or None, optional (default=None)
            Label of the data.
        reference : Dataset or None, optional (default=None)
            If this is Dataset for validation, training data should be used as reference.
        weight : list, numpy 1-D array, pandas Series or None, optional (default=None)
            Weight for each instance.
        group : list, numpy 1-D array, pandas Series or None, optional (default=None)
            Group/query data.
            Only used in the learning-to-rank task.
            sum(group) = n_samples.
            For example, if you have a 100-document dataset with ``group = [10, 20, 40, 10, 10, 10]``, that means that you have 6 groups,
            where the first 10 records are in the first group, records 11-30 are in the second group, records 31-70 are in the third group, etc.
        init_score : list, list of lists (for multi-class task), numpy array, pandas Series, pandas DataFrame (for multi-class task), or None, optional (default=None)
            Init score for Dataset.
        silent : bool, optional (default=False)
            Whether to print messages during construction.
        feature_name : list of str, or 'auto', optional (default="auto")
            Feature names.
            If 'auto' and data is pandas DataFrame, data columns names are used.
        categorical_feature : list of str or int, or 'auto', optional (default="auto")
            Categorical features.
            If list of int, interpreted as indices.
            If list of str, interpreted as feature names (need to specify ``feature_name`` as well).
            If 'auto' and data is pandas DataFrame, pandas unordered categorical columns are used.
            All values in categorical features should be less than int32 max value (2147483647).
            Large values could be memory consuming. Consider using consecutive integers starting from zero.
            All negative values in categorical features will be treated as missing values.
            The output cannot be monotonically constrained with respect to a categorical feature.
        params : dict or None, optional (default=None)
            Other parameters for Dataset.
        free_raw_data : bool, optional (default=True)
            If True, raw data is freed after constructing inner Dataset.
        """
        self.handle = None
        self.data = data
        self.label = label
        self.reference = reference
        self.weight = weight
        self.loc = loc
        self.log_scale = log_scale
        self.group = group
        self.init_score = init_score
        self.silent = silent
        self.feature_name = feature_name
        self.categorical_feature = categorical_feature
        self.params = copy.deepcopy(params)
        self.free_raw_data = free_raw_data
        self.used_indices = None
        self.need_slice = True
        self._predictor = None
        self.pandas_categorical = None
        self.params_back_up = None
        self.feature_penalty = None
        self.monotone_constraints = None
        self.version = 0
        self._start_row = 0  # Used when pushing rows one by one.

    def get_loc(self) -> Optional[np.ndarray]:
        if self.loc is None:
            # self.weight = self.get_field('weight')
            raise ValueError("loc is not set, re-initialize dataset")
        return self.loc

    def get_log_scale(self) -> Optional[np.ndarray]:
        if self.log_scale is None:
            # self.weight = self.get_field('weight')
            raise ValueError("log_scale is not set, re-initialize dataset.")
        return self.log_scale

class _ObjectiveFunctionWrapper(lgb.sklearn._ObjectiveFunctionWrapper):
    """Proxy class for objective function."""

    def __call__(self, preds, dataset):
        """Call passed function with appropriate arguments.

        Parameters
        ----------
        preds : array-like of shape = [n_samples] or shape = [n_samples * n_classes] (for multi-class task)
            The predicted values.
        dataset : Dataset
            The training dataset.

        Returns
        -------
        grad : array-like of shape = [n_samples] or shape = [n_samples * n_classes] (for multi-class task)
            The value of the first order derivative (gradient) of the loss
            with respect to the elements of preds for each sample point.
        hess : array-like of shape = [n_samples] or shape = [n_samples * n_classes] (for multi-class task)
            The value of the second order derivative (Hessian) of the loss
            with respect to the elements of preds for each sample point.
        """
        labels = dataset.get_label()
        params = signature(self.func).parameters
        # print(self.func)
        # print(f"obj params {params}")
        # raise ValueError()
        argc = len(params)
        extra_params = {}
        if "loc" in params:
            extra_params["loc"] = dataset.get_loc()
        if "log_scale" in params:
            extra_params["log_scale"] = dataset.get_log_scale() 
        # print("extra_params of wrapper", extra_params)
        argc = argc - len(extra_params)
        if argc == 2:
            grad, hess = self.func(labels, preds, **extra_params)
        elif argc == 3:
            grad, hess = self.func(labels, preds, dataset.get_group(), **extra_params)
        else:
            raise TypeError(f"Self-defined objective function should have 2 or 3 arguments, got {argc}")
        """weighted for objective"""
        weight = dataset.get_weight()
        if weight is not None:
            """only one class"""
            if len(weight) == len(grad):
                grad = np.multiply(grad, weight)
                hess = np.multiply(hess, weight)
            else:
                num_data = len(weight)
                num_class = len(grad) // num_data
                if num_class * num_data != len(grad):
                    raise ValueError("Length of grad and hess should equal to num_class * num_data")
                for k in range(num_class):
                    for i in range(num_data):
                        idx = k * num_data + i
                        grad[idx] *= weight[i]
                        hess[idx] *= weight[i]
        return grad, hess

class _EvalFunctionWrapper(lgb.sklearn._EvalFunctionWrapper):
    """Proxy class for evaluation function."""

    def __call__(self, preds, dataset):
        """Call passed function with appropriate arguments.

        Parameters
        ----------
        preds : array-like of shape = [n_samples] or shape = [n_samples * n_classes] (for multi-class task)
            The predicted values.
        dataset : Dataset
            The training dataset.

        Returns
        -------
        eval_name : str
            The name of evaluation function (without whitespace).
        eval_result : float
            The eval result.
        is_higher_better : bool
            Is eval result higher better, e.g. AUC is ``is_higher_better``.
        """
        labels = dataset.get_label()
        params = signature(self.func).parameters
        argc = len(params)

        extra_params = {}
        if "loc" in params:
            extra_params["loc"] = dataset.get_loc()
        if "log_scale" in params:
            extra_params["log_scale"] = dataset.get_log_scale() 
        # print("extra_params ef wrapper", extra_params)
        argc = argc - len(extra_params)
        if argc == 2:
            return self.func(labels, preds, **extra_params)
        elif argc == 3:
            return self.func(labels, preds, dataset.get_weight(), **extra_params)
        elif argc == 4:
            return self.func(labels, preds, dataset.get_weight(), dataset.get_group(), **extra_params)
        else:
            raise TypeError(f"Self-defined eval function should have 2, 3 or 4 arguments, got {argc}")

class PassThroughLGBMRegressor(lgb.sklearn.LGBMRegressor):
    def fit(self, X, y,
            sample_weight=None, init_score=None, group=None,
            loc=None, log_scale=None,
            eval_set=None, 
            eval_loc=None, eval_log_scale=None,
            eval_names=None, eval_sample_weight=None,
            eval_class_weight=None, eval_init_score=None, eval_group=None,
            eval_metric=None, early_stopping_rounds=None, verbose='warn',
            feature_name='auto', categorical_feature='auto',
            callbacks=None, init_model=None):
        """Docstring is set after definition, using a template."""
        if self._objective is None:
            if isinstance(self, lgb.sklearn.LGBMRegressor):
                self._objective = "regression"
            elif isinstance(self, lgb.sklearn.LGBMClassifier):
                self._objective = "binary"
            elif isinstance(self, lgb.sklearn.LGBMRanker):
                self._objective = "lambdarank"
            else:
                raise ValueError("Unknown LGBMModel type.")
        if callable(self._objective):
            self._fobj = _ObjectiveFunctionWrapper(self._objective)
        else:
            self._fobj = None

        params = self.get_params()
        # user can set verbose with kwargs, it has higher priority
        if self.silent != "warn":
            _log_warning("'silent' argument is deprecated and will be removed in a future release of LightGBM. "
                         "Pass 'verbose' parameter via keyword arguments instead.")
            silent = self.silent
        else:
            silent = True
        if not any(verbose_alias in params for verbose_alias in _ConfigAliases.get("verbosity")) and silent:
            params['verbose'] = -1
        params.pop('silent', None)
        params.pop('importance_type', None)
        params.pop('n_estimators', None)
        params.pop('class_weight', None)
        if isinstance(params['random_state'], np.random.RandomState):
            params['random_state'] = params['random_state'].randint(np.iinfo(np.int32).max)
        for alias in _ConfigAliases.get('objective'):
            params.pop(alias, None)
        if self._n_classes is not None and self._n_classes > 2:
            for alias in _ConfigAliases.get('num_class'):
                params.pop(alias, None)
            params['num_class'] = self._n_classes
        if hasattr(self, '_eval_at'):
            eval_at = self._eval_at
            for alias in _ConfigAliases.get('eval_at'):
                if alias in params:
                    _log_warning(f"Found '{alias}' in params. Will use it instead of 'eval_at' argument")
                    eval_at = params.pop(alias)
            params['eval_at'] = eval_at
        params['objective'] = self._objective
        if self._fobj:
            params['objective'] = 'None'  # objective = nullptr for unknown objective

        # Do not modify original args in fit function
        # Refer to https://github.com/microsoft/LightGBM/pull/2619
        eval_metric_list = copy.deepcopy(eval_metric)
        if not isinstance(eval_metric_list, list):
            eval_metric_list = [eval_metric_list]

        # Separate built-in from callable evaluation metrics
        eval_metrics_callable = [_EvalFunctionWrapper(f) for f in eval_metric_list if callable(f)]
        eval_metrics_builtin = [m for m in eval_metric_list if isinstance(m, str)]

        # register default metric for consistency with callable eval_metric case
        original_metric = self._objective if isinstance(self._objective, str) else None
        if original_metric is None:
            # try to deduce from class instance
            if isinstance(self, lgb.sklearn.LGBMRegressor):
                original_metric = "l2"
            elif isinstance(self, lgb.sklearn.LGBMClassifier):
                original_metric = "multi_logloss" if self._n_classes > 2 else "binary_logloss"
            elif isinstance(self, lgb.sklearn.LGBMRanker):
                original_metric = "ndcg"

        # overwrite default metric by explicitly set metric
        params = _choose_param_value("metric", params, original_metric)

        # concatenate metric from params (or default if not provided in params) and eval_metric
        params['metric'] = [params['metric']] if isinstance(params['metric'], (str, type(None))) else params['metric']
        params['metric'] = [e for e in eval_metrics_builtin if e not in params['metric']] + params['metric']
        params['metric'] = [metric for metric in params['metric'] if metric is not None]

        if not isinstance(X, (lgb.sklearn.pd_DataFrame, lgb.sklearn.dt_DataTable)):
            _X, _y = lgb.sklearn._LGBMCheckXY(X, y, accept_sparse=True, force_all_finite=False, ensure_min_samples=2)
            if sample_weight is not None:
                sample_weight = _LGBMCheckSampleWeight(sample_weight, _X)
        else:
            _X, _y = X, y

        if self._class_weight is None:
            self._class_weight = self.class_weight
        if self._class_weight is not None:
            class_sample_weight = lgb.sklearn._LGBMComputeSampleWeight(self._class_weight, y)
            if sample_weight is None or len(sample_weight) == 0:
                sample_weight = class_sample_weight
            else:
                sample_weight = np.multiply(sample_weight, class_sample_weight)

        self._n_features = _X.shape[1]
        # copy for consistency
        self._n_features_in = self._n_features

        def _construct_dataset(X, y, sample_weight, init_score, group, params,
                               categorical_feature='auto', loc=loc, log_scale=log_scale):
            return Dataset(X, label=y, weight=sample_weight, group=group,
                           init_score=init_score, params=params,
                           categorical_feature=categorical_feature, loc=loc, log_scale=log_scale)

        train_set = _construct_dataset(_X, _y, sample_weight, init_score, group, params,
                                       categorical_feature=categorical_feature, loc=loc, log_scale=log_scale)

        valid_sets = []
        if eval_set is not None:
            # raise NotImplementedError("eval_set is not supported in sklearn interface.")

            def _get_meta_data(collection, name, i):
                if collection is None:
                    return None
                elif isinstance(collection, list):
                    return collection[i] if len(collection) > i else None
                elif isinstance(collection, dict):
                    return collection.get(i, None)
                else:
                    raise TypeError(f"{name} should be dict or list")

            if isinstance(eval_set, tuple):
                eval_set = [eval_set]

            print("eval_set", eval_set)
            for i, valid_data in enumerate(eval_set):
                # reduce cost for prediction training data
                if valid_data[0] is X and valid_data[1] is y:
                    valid_set = train_set
                else:
                    valid_weight = _get_meta_data(eval_sample_weight, 'eval_sample_weight', i)
                    valid_class_weight = _get_meta_data(eval_class_weight, 'eval_class_weight', i)
                    if valid_class_weight is not None:
                        if isinstance(valid_class_weight, dict) and self._class_map is not None:
                            valid_class_weight = {self._class_map[k]: v for k, v in valid_class_weight.items()}
                        valid_class_sample_weight = _LGBMComputeSampleWeight(valid_class_weight, valid_data[1])
                        if valid_weight is None or len(valid_weight) == 0:
                            valid_weight = valid_class_sample_weight
                        else:
                            valid_weight = np.multiply(valid_weight, valid_class_sample_weight)
                    valid_init_score = _get_meta_data(eval_init_score, 'eval_init_score', i)
                    valid_group = _get_meta_data(eval_group, 'eval_group', i)
                    valid_set = _construct_dataset(valid_data[0], valid_data[1],
                                                   valid_weight, valid_init_score, valid_group, params, loc=eval_loc, log_scale=eval_log_scale)
                valid_sets.append(valid_set)

        if isinstance(init_model, lgb.sklearn.LGBMModel):
            init_model = init_model.booster_

        if early_stopping_rounds is not None and early_stopping_rounds > 0:
            _log_warning("'early_stopping_rounds' argument is deprecated and will be removed in a future release of LightGBM. "
                         "Pass 'early_stopping()' callback via 'callbacks' argument instead.")
            params['early_stopping_rounds'] = early_stopping_rounds

        if callbacks is None:
            callbacks = []
        else:
            callbacks = copy.copy(callbacks)  # don't use deepcopy here to allow non-serializable objects

        if verbose != 'warn':
            _log_warning("'verbose' argument is deprecated and will be removed in a future release of LightGBM. "
                         "Pass 'log_evaluation()' callback via 'callbacks' argument instead.")
        else:
            if callbacks:  # assume user has already specified log_evaluation callback
                verbose = False
            else:
                verbose = True
        callbacks.append(lgb.sklearn.log_evaluation(int(verbose)))

        evals_result = {}
        callbacks.append(lgb.sklearn.record_evaluation(evals_result))

        self._Booster = lgb.engine.train(
            params=params,
            train_set=train_set,
            num_boost_round=self.n_estimators,
            valid_sets=valid_sets,
            valid_names=eval_names,
            fobj=self._fobj,
            feval=eval_metrics_callable,
            init_model=init_model,
            feature_name=feature_name,
            callbacks=callbacks
        )

        if evals_result:
            self._evals_result = evals_result
        else:  # reset after previous call to fit()
            self._evals_result = None

        if self._Booster.best_iteration != 0:
            self._best_iteration = self._Booster.best_iteration
        else:  # reset after previous call to fit()
            self._best_iteration = None

        self._best_score = self._Booster.best_score

        self.fitted_ = True

        # free dataset
        self._Booster.free_dataset()
        del train_set, valid_sets
        return self
jameslamb commented 1 year ago

Thanks for using LightGBM.

My best guess is that there is some sorting by covariates happening at each boosting stage that doesn't apply to my hackily tacked on dataset attributes

I'm confident that LightGBM does not shuffle the data. Row i in the Dataset you create refers to the same observation as row i in the provided raw data.

something is going wrong during fitting

I don't totally understand what you're trying to do here... this is a very large amount of provided code, and it's not clear to me which parts I should be looking at or what "something is going wrong during fitting" means specifically. If you could provide a much smaller example of what you're trying to accomplish, it'd help.

But even without that, I think I can still provide some useful resources.

First, see this example of using a custom objective function in the Python package: https://github.com/microsoft/LightGBM/blob/fe69fa9f43f6fc051730e65008184658c991106e/examples/python-guide/advanced_example.py#L139-L147. That's a classification example, but the interface is identical... you provide a function which, at each iteration, is given two things:

and which is responsible for computing gradients and hessians.

It sounds to me like you want to be able to access other data inside such a custom objective function which is not features in the Dataset. I can think of a few options for doing that:

# option 1: make objective a callable class, store that other data on the class
class CustomObjective:
    def __init__(self, time: np.ndarray):
        self.time = time

    def __call__(self, preds: np.ndarray, train_data: lightgbm.Dataset) -> Tuple[np.ndarray, np.ndarray]:
        # your code that calculates the gradient and hessian
        grad, hess = your_custom_code(preds, train_data, self.time)
        return grad, hess

# option 2: store arbitrary data on the `Dataset` object itself
dtrain = lgb.Dataset(data=X, label=y, ...)
dtrain.time = time

def custom_objective(preds: np.ndarray, train_data: lightgbm.Dataset) -> Tuple[np.ndarray, np.ndarray]:
        # your code that calculates the gradient and hessian
        grad, hess = your_custom_code(preds, train_data, train_data.time)
        return grad, hess

lgb.train(train_set=dtrain, params={"objective": custom_objective}, ...)

See the note on writing custom objective functions at https://lightgbm.readthedocs.io/en/latest/pythonapi/lightgbm.train.html#lightgbm.train.

meh2135 commented 1 year ago

Hi @jameslamb thanks for the quick response, and apologies for the verbose code blob and ambiguous question... I was on a flight managing my screaming 1.5 yr old at the time, and just wasn't thinking. Very kind of you to answer in spite of that.

Your response is definitely helpful. Seems like I'm generally on the right track.

By "something is going wrong during training," I meant that when I give it the ground truth location params and a well behaved conditional scale function, it produces very poor results.

Given your input and some further investigation, my guess is that it boils down to either the Dataset.construct method somehow wiping out the .loc attribute I attached, or something squirrelly going on with the eval function.

I think my next move is to just drop the sklearn wrapper, which is only complicating things. I'll post more here once I have it either solved or when I hit another block.

In case you're interested, the relevant chunks of the code above are Redefinition of dataset where I set .loc and .log_scale


    def __init__(self, data, label=None, reference=None,
                 weight=None, group=None, init_score=None, silent='warn',
                 loc = None, log_scale=None,
                 feature_name='auto', categorical_feature='auto', params=None,
                 free_raw_data=True):
        ...
        self.loc = loc
        ...

This chunk of _CustomObjectiveFunctionWrapper (and something equivalent in eval function wrapper) where I pass the particular args I'm looking for on to the objective function as kwargs.

      params = signature(self.func).parameters
      argc = len(params)
      extra_params = {}
      if "loc" in params:
          extra_params["loc"] = dataset.get_loc()
      if "log_scale" in params:
          extra_params["log_scale"] = dataset.get_log_scale() 
      # print("extra_params of wrapper", extra_params)
      argc = argc - len(extra_params)
      if argc == 2:
          grad, hess = self.func(labels, preds, **extra_params)
      elif argc == 3:
          grad, hess = self.func(labels, preds, dataset.get_group(), **extra_params)
      else:
          raise TypeError(f"Self-defined objective function should have 2 or 3 arguments, got {argc}")

And finally the redefinition of _construct_dataset where I include the loc and log_scale args.

def _construct_dataset(X, y, sample_weight, init_score, group, params,
                               categorical_feature='auto', loc=loc, log_scale=log_scale):
            return Dataset(X, label=y, weight=sample_weight, group=group,
                           init_score=init_score, params=params,
                           categorical_feature=categorical_feature, loc=loc, log_scale=log_scale)
jameslamb commented 1 year ago

Ha no problem at all! I'm happy to help... although I personally don't know anything much about poisson process regression, so I can only help in the "here's how LightGBM works" sense.

I definitely recommend trying to make this work using the lightgbm.train() interface (instead of the scikit-learn estimators) first, as you mentioned. That'll help you narrow things down a lot, and should make it possible to implement what you want using one of the approaches I mentioned with 0 changes to lightgbm's internals... since a custom objective function passed to lightgbm.train() will be given the Dataset object directly (instead of the extracted predictions, labels, and weights as numpy arrays).

jameslamb commented 3 months ago

I'm going to close this as there hasn't been a new comment in the last year. Come back any time if you need more help or have other questions 👋🏻