NannyML / nannyml

nannyml: post-deployment data science in python
https://www.nannyml.com/
Apache License 2.0
1.96k stars 139 forks source link

NannyML should support Incremental Learning #125

Closed prempiyush closed 1 year ago

prempiyush commented 2 years ago

Motivation: describe the problem to be solved Real world use cases have large data sets that can not fit in memory. Doing performance estimation on such datasets is not possible with current implementation of NannyML. Since, NannyML defines a Chunk as A subset of data that acts as a logical unit during calculations. it would be great, that these chunks are accepted instead of the complete dataframe while doing the fit of CBPE. I am assuming that this might introduce some discrepancies in the result with respect to statistical sampling errors and such.

Describe the solution you'd like For very large datasets, it would be helpful that besides the fit method for CBPE, there is also a partial_fit method.

e.g. I am hoping to do one of the following:

  1. If there are a large number of files,
    
    listfile = ['file1,'file2]
    estimator = nml.CBPE(
    y_pred_proba='y_pred_proba',
    y_pred='y_pred',
    y_true='work_home_actual',
    timestamp_column_name='timestamp',
    metrics=['roc_auc', 'f1'],
    chunk_size=5000,
    problem_type='classification_binary',
    )

for f in listfile: for reference_df in pd.read_csv(f, sep=',', iterator=True, chunksize=10000): estimator.partial_fit(reference_df)


OR 

2. If data is being read from a database..

engine = sqlalchemy.create_engine('connection_string') estimator = nml.CBPE( y_pred_proba='y_pred_proba', y_pred='y_pred', y_true='work_home_actual', timestamp_column_name='timestamp', metrics=['roc_auc', 'f1'], chunk_size=5000, problem_type='classification_binary', )

with db2.connect().execution_options(stream_results=True) as conn: query = "Select work_home_actual, y_pred, y_pred_proba from table"

for reference_df in pd.read_sql(query, conn, chunksize=10000):
    estimator.partial_fit(reference_df)

**Describe alternatives you've considered**
I tried reading the large dataset (essentially a directory of csv files) in a PySpark dataframe, hoping that the new Pandas API on Spark 3.2 could be my saviour, but it eventually failed with the following error:

ValueError Traceback (most recent call last) File ~/work/code/python-playground/.venv/lib/python3.9/site-packages/nannyml/base.py:210, in AbstractEstimator.fit(self, reference_data, *args, *kwargs) 209 reference_data = reference_data.copy() --> 210 return self._fit(reference_data, args, **kwargs) 211 except InvalidArgumentsException:

File ~/work/code/python-playground/.venv/lib/python3.9/site-packages/nannyml/performance_estimation/confidence_based/_cbpe_binary_classification.py:71, in _BinaryClassificationCBPE._fit(self, reference_data, *args, **kwargs) 70 for metric in self.metrics: ---> 71 metric.fit(reference_data) 73 # Fit calibrator if calibration is needed

File ~/work/code/python-playground/.venv/lib/python3.9/site-packages/nannyml/performance_estimation/confidence_based/metrics.py:76, in Metric.fit(self, reference_data) 73 reference_chunks = self.estimator.chunker.split( 74 reference_data, 75 ) ---> 76 self.lower_threshold, self.upper_threshold = self._alert_thresholds(reference_chunks) 78 # Calculate confidence bands

File ~/work/code/python-playground/.venv/lib/python3.9/site-packages/nannyml/performance_estimation/confidence_based/metrics.py:138, in Metric._alert_thresholds(self, reference_chunks, std_num, lower_limit, upper_limit) 135 def _alert_thresholds( 136 self, reference_chunks: List[Chunk], std_num: int = 3, lower_limit: int = 0, upper_limit: int = 1 137 ) -> Tuple[float, float]: --> 138 realized_chunk_performance = [self.realized_performance(chunk.data) for chunk in reference_chunks] 139 deviation = np.std(realized_chunk_performance) * std_num

File ~/work/code/python-playground/.venv/lib/python3.9/site-packages/nannyml/performance_estimation/confidence_based/metrics.py:138, in (.0) 135 def _alert_thresholds( 136 self, reference_chunks: List[Chunk], std_num: int = 3, lower_limit: int = 0, upper_limit: int = 1 137 ) -> Tuple[float, float]: --> 138 realized_chunk_performance = [self.realized_performance(chunk.data) for chunk in reference_chunks] 139 deviation = np.std(realized_chunk_performance) * std_num

File ~/work/code/python-playground/.venv/lib/python3.9/site-packages/nannyml/performance_estimation/confidence_based/metrics.py:246, in BinaryClassificationAUROC.realized_performance(self, data) 245 def realized_performance(self, data: pd.DataFrame) -> float: --> 246 y_predproba, , y_true = self._common_cleaning(data) 248 if y_true is None:

File ~/work/code/python-playground/.venv/lib/python3.9/site-packages/nannyml/performance_estimation/confidence_based/metrics.py:166, in Metric._common_cleaning(self, data) 165 y_true = data[self.estimator.y_true] --> 166 y_true = y_true[~y_pred_proba.isna()] 167 y_pred_proba = y_pred_proba[~y_true.isna()]

File ~/work/code/python-playground/.venv/lib/python3.9/site-packages/pyspark/pandas/series.py:6264, in Series.getitem(self, key) 6263 return self.iloc[key] -> 6264 return self.loc[key] 6265 except SparkPandasIndexingError:

File ~/work/code/python-playground/.venv/lib/python3.9/site-packages/pyspark/pandas/indexing.py:443, in LocIndexerLike.getitem(self, key) 441 temp_col = verify_temp_column_name(psdf, "__temp_col__") --> 443 psdf[temp_col] = key 444 return type(self)(psdf[name].rename(self._psdf_or_psser.name))[psdf[temp_col]]

File ~/work/code/python-playground/.venv/lib/python3.9/site-packages/pyspark/pandas/frame.py:11801, in DataFrame.setitem(self, key, value) 11799 yield (psdf._psser_for(this_label), this_label)

11801 psdf = align_diff_frames(assign_columns, self, value, fillna=False, how="left") 11802 elif isinstance(value, list):

File ~/work/code/python-playground/.venv/lib/python3.9/site-packages/pyspark/pandas/utils.py:383, in align_diff_frames(resolve_func, this, that, fillna, how, preserve_order_column) 382 # 1. Perform the join given two dataframes. --> 383 combined = combine_frames(this, that, how=how, preserve_order_column=preserve_order_column) 385 # 2. Apply the given function to transform the columns in a batch and keep the new columns.

File ~/work/code/python-playground/.venv/lib/python3.9/site-packages/pyspark/pandas/utils.py:307, in combine_frames(this, how, preserve_order_column, *args) 306 else: --> 307 raise ValueError(ERROR_MESSAGE_CANNOT_COMBINE)

ValueError: Cannot combine the series or dataframe because it comes from a different dataframe. In order to allow this operation, enable 'compute.ops_on_diff_frames' option.

During handling of the above exception, another exception occurred:

CalculatorException Traceback (most recent call last) File :10, in

File ~/work/code/python-playground/.venv/lib/python3.9/site-packages/nannyml/base.py:216, in AbstractEstimator.fit(self, reference_data, *args, **kwargs) 214 raise 215 except Exception as exc: --> 216 raise CalculatorException(f"failed while fitting {str(self)}.\n{exc}")

CalculatorException: failed while fitting _BinaryClassificationCBPE. Cannot combine the series or dataframe because it comes from a different dataframe. In order to allow this operation, enable 'compute.ops_on_diff_frames' option.



**Additional context**
Add any other context or screenshots about the feature request here.
nnansters commented 2 years ago

Hi @prempiyush ,

that is indeed an excellent suggestion. We had something like this on the radar as well.

The difficulty is indeed fitting reference data. We'll have to review all of our calculators and estimators to check if we can find a way to fit partially and then "aggregate" or combine them. You can use sampling as a workaround for now, but this is definitely not a proper solution to the problem.

Performing calculations/estimations on analysis data seems simpler at first glance, since we can use the "chunks" as a means of parallelization, allowing us to distribute computations more intuitively using something like Spark or Ray (to be experimented with).

It looks like you were already using this in Spark?

prempiyush commented 2 years ago

@nnansters Thank you, Niels.

No, I was not successful in running this on Spark. I was hoping that the new Pandas API on Spark will abstract the major work for me.. but looks like the code indeed needs to be enhanced.

stale[bot] commented 1 year ago

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

stale[bot] commented 1 year ago

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.