henrikbostrom / crepes

Python package for conformal prediction
BSD 3-Clause "New" or "Revised" License
449 stars 35 forks source link

Enhancement: Improve the scalability of crepes #31

Closed nbdangro01 closed 7 months ago

nbdangro01 commented 7 months ago

Hello, I am trying to use the crepes library at scale in a production environment (appx 100 million rows and 75 columns) but I am struggling with the amount of time required to fit the DifficultyEstimator and calibrating the CP wrapper. See below for the python code.

` from crepes import WrapRegressor from crepes.extras import DifficultyEstimator, binning import pandas as pd

def calibrate_cp( rf: WrapRegressor, x_tr: pd.DataFrame, y_tr: pd.DataFrame, x_ca: pd.DataFrame, y_ca: pd.DataFrame ) -> Union[WrapRegressor, DifficultyEstimator, list]: """Calibrates the conformal predictive system wrapper.

Args:
    rf (WrapRegressor): A wrapped regression model
    x_tr_n (pd.DataFrame): X train
    y_tr_n (pd.DataFrame): Y train
    x_ca_n (pd.DataFrame): X calibration
    y_ca_n (pd.DataFrame): Y calibration

Returns:
    Union[WrapRegressor, DifficultyEstimator, list]: Returns the calibrated wrapper, DifficultyEstimator (NearestNeighbors), and thersholds of the bins.
"""
de_kkn = DifficultyEstimator()
logging.info("Fitting the DifficultyEstimator")
de_kkn.fit(x_tr.to_numpy(), y=y_tr.to_numpy(), scaler=True)
logging.info("Calculate sigmas")
sigmas_cal = de_kkn.apply(x_ca.to_numpy())
logging.info("Calculate bins")
bins_cal, bin_thresholds = binning(rf.predict(x_ca), bins=5)
logging.info("Calibrate CP wrapper")
rf.calibrate(x_ca, y_ca.to_numpy().reshape((len(y_ca.to_numpy()),)), sigmas=sigmas_cal, bins=bins_cal, cps=True)
return rf, de_kkn, bin_thresholds

`

I therefore want to improve the scalability of crepes by adding GPU support (Any other suggestions for performance enhancements are also welcome). This could for example be achieved by allowing for alternative implementations of kkn such as faiss: https://towardsdatascience.com/make-knn-300-times-faster-than-scikit-learns-in-20-lines-5e29d74e76bb Where the user would then set an optional parameter when initializing the class or a global config variable deciding which backend to use.

I thought I'd see if there is any interest in having this in the repository. If not I'll just make it in my own repo.

nbdangro01 commented 7 months ago

Here is some code you could replace sklearns NearestNeighbors with: `import faiss

class FaissNearestNeighbors: def init(self, n_neighbors=5): self.index = None self.y = None self.k = n_neighbors self.fitted = False

def fit(
    self,
    X,
    n_gpus=None,
    task_type="GPU",
    co=None,
    lossy_acceleration=False,
    n_centroids=100,
):
    self.task_type = task_type
    self.lossy_acceleration = lossy_acceleration
    d = X.shape[1]
    X = X.astype(np.float32)

    index = faiss.IndexFlatL2(d)
    if lossy_acceleration:
        index = faiss.IndexIVFPQ(index, d, n_centroids, 1, 8)
    if task_type == "GPU":
        if not n_gpus:
            n_gpus = -1
        if co:
            co_settings = co
            co = faiss.GpuMultipleClonerOptions()
            co.shard = co_settings.get("shard", False)
            co.usePrecomputed = co_settings.get("usePrecomputed", False)
        index = faiss.index_cpu_to_all_gpus(index, co=co, ngpu=n_gpus)
    if lossy_acceleration:
        index.train(X)

    index.add(X)
    self.fitted = True
    self.index = index

def convert_index_from_gpu_to_cpu(self):
    assert self.task_type == "GPU"
    assert self.fitted == True
    self.index = faiss.index_gpu_to_cpu(self.index)
    self.task_type = "CPU"

def kneighbors(self, X, return_distance=True, chunk_size=None, nprobe=5):
    if self.lossy_acceleration:
        self.index.nprobe = nprobe            
    if chunk_size:
        distances, indices = np.empty((0, self.k), dtype=np.float32), np.empty((0, self.k), dtype=int)
        for i in range(0, len(X), chunk_size):
            chunk = X.astype(np.float32)[i : i + chunk_size]
            chunk_distances, chunk_indices = self.index.search(chunk, k=self.k)
            distances = np.append(distances, chunk_distances, axis=0)
            indices = np.append(indices, chunk_indices, axis=0)
    else:
        distances, indices = self.index.search(X.astype(np.float32), k=self.k)
    if return_distance:
        return distances, indices
    else:
        return indices

`

nbdangro01 commented 7 months ago

While the list comprehensions can be parallelized using multiprocessing: ` from joblib import Parallel, delayed def calculate_std(indexes, y): return np.std(y[indexes])

sigmas = Parallel(n_jobs=-1)(delayed(calculate_std)(indexes, y) for indexes in neighbor_indexes) sigmas = np.array(sigmas) `