NVIDIA-Merlin / Transformers4Rec

Transformers4Rec is a flexible and efficient library for sequential and session-based recommendation and works with PyTorch.
https://nvidia-merlin.github.io/Transformers4Rec/main
Apache License 2.0
1.07k stars 142 forks source link

[QST] examples/tutorial/02-ETL-with-NVTabular.ipynb #776

Open zwei2016 opened 3 months ago

zwei2016 commented 3 months ago

❓ Questions & Help

Details

I have executed 01 successfully and one issue appeared in this block chunk "5.4. Grouping interactions into sessions":

workflow = nvt.Workflow(filtered_sessions)
dataset = nvt.Dataset(df)
# Learn features statistics necessary of the preprocessing workflow
# The following will generate schema.pbtxt file in the provided folder and export the parquet files.
workflow.fit_transform(dataset).to_parquet(os.path.join(INPUT_DATA_DIR, "processed_nvt"))

Error message shows workflow.fit_transform(dataset) caused a KeyError: 'category_id_price_sum'. But I didn't find this feature in the tutorial. Maybe I missed something? anyone could help me out?

The detailed error message is as follows:

KeyError Traceback (most recent call last) Cell In[48], line 5 2 dataset = nvt.Dataset(df) 3 # Learn features statistics necessary of the preprocessing workflow 4 # The following will generate schema.pbtxt file in the provided folder and export the parquet files. ----> 5 workflow.fit_transform(dataset).to_parquet(os.path.join(INPUT_DATA_DIR, "processed_nvt"))

File ~/miniconda3/lib/python3.10/site-packages/nvtabular/workflow/workflow.py:236, in Workflow.fit_transform(self, dataset) 216 def fit_transform(self, dataset: Dataset) -> Dataset: 217 """Convenience method to both fit the workflow and transform the dataset in a single 218 call. Equivalent to calling workflow.fit(dataset) followed by 219 workflow.transform(dataset) (...) 234 transform 235 """ --> 236 self.fit(dataset) 237 return self.transform(dataset)

File ~/miniconda3/lib/python3.10/site-packages/nvtabular/workflow/workflow.py:213, in Workflow.fit(self, dataset) 199 def fit(self, dataset: Dataset) -> "Workflow": 200 """Calculates statistics for this workflow on the input dataset 201 202 Parameters (...) 211 This Workflow with statistics calculated on it 212 """ --> 213 self.executor.fit(dataset, self.graph) 214 return self

File ~/miniconda3/lib/python3.10/site-packages/merlin/dag/executors.py:466, in DaskExecutor.fit(self, dataset, graph, refit) 462 if not current_phase: 463 # this shouldn't happen, but lets not infinite loop just in case 464 raise RuntimeError("failed to find dependency-free StatOperator to fit") --> 466 self.fit_phase(dataset, current_phase) 468 # Remove all the operators we processed in this phase, and remove 469 # from the dependencies of other ops too 470 for node in current_phase:

File ~/miniconda3/lib/python3.10/site-packages/merlin/dag/executors.py:532, in DaskExecutor.fit_phase(self, dataset, nodes, strict) 530 stats.append(node.op.fit(node.input_columns, Dataset(ddf))) 531 else: --> 532 stats.append(node.op.fit(node.input_columns, transformed_ddf)) 533 except Exception: 534 LOG.exception("Failed to fit operator %s", node.op)

File ~/miniconda3/lib/python3.10/site-packages/nvtabular/ops/join_groupby.py:154, in JoinGroupby.fit(self, col_selector, ddf) 151 # Cannot use "device" caching if the data is pandas-backed 152 self.cat_cache = "host" if self.cat_cache == "device" else self.cat_cache --> 154 dsk, key = nvt_cat._category_stats( 155 ddf, 156 nvt_cat.FitOptions( 157 col_selector, 158 self.cont_names, 159 self.stats, 160 self.out_path, 161 0, 162 self.split_out, 163 self.on_host, 164 concat_groups=False, 165 name_sep=self.name_sep, 166 split_every=self.split_every, 167 ), 168 ) 169 return Delayed(key, dsk)

File ~/miniconda3/lib/python3.10/site-packages/nvtabular/ops/categorify.py:1559, in _category_stats(ddf, options) 1556 if options.agg_list == []: 1557 options.agg_list = ["count"] -> 1559 return _groupby_to_disk(ddf, None, options)

File ~/miniconda3/lib/python3.10/site-packages/nvtabular/ops/categorify.py:1485, in _groupby_to_disk(ddf, write_func, options) 1476 dsk_split[c][(reduce_2_name, s)] = ( 1477 _bottom_level_groupby, 1478 [(split_name, 0, c, s)], (...) 1481 False, 1482 ) 1484 # Make DataFrame collection for column-group result -> 1485 _meta = _bottom_level_groupby( 1486 [_grouped_meta_col[c]], 1487 col, 1488 options, 1489 spill=False, 1490 ) 1491 _divisions = (None,) * (options.split_out[col_str] + 1) 1492 graph = HighLevelGraph.from_collections(reduce_2_name, dsk_split[c], dependencies=[grouped])

File ~/miniconda3/lib/python3.10/site-packages/nvtx/nvtx.py:116, in annotate.call..inner(*args, kwargs) 113 @wraps(func) 114 def inner(*args, *kwargs): 115 libnvtx_push_range(self.attributes, self.domain.handle) --> 116 result = func(args, kwargs) 117 libnvtx_pop_range(self.domain.handle) 118 return result

File ~/miniconda3/lib/python3.10/site-packages/nvtabular/ops/categorify.py:1097, in _bottom_level_groupby(dfs, col_selector, options, spill) 1095 name_mean = _make_name((col_selector.names + [cont_col, "mean"]), sep=options.name_sep) 1096 required.append(name_mean) -> 1097 gb[name_mean] = gb[name_sum] / gb[name_count] 1099 if "min" in options.agg_list: 1100 name_min = _make_name((col_selector.names + [cont_col, "min"]), sep=options.name_sep)

File ~/miniconda3/lib/python3.10/site-packages/nvtx/nvtx.py:116, in annotate.call..inner(*args, kwargs) 113 @wraps(func) 114 def inner(*args, *kwargs): 115 libnvtx_push_range(self.attributes, self.domain.handle) --> 116 result = func(args, kwargs) 117 libnvtx_pop_range(self.domain.handle) 118 return result

File ~/miniconda3/lib/python3.10/site-packages/cudf/core/dataframe.py:1336, in DataFrame.getitem(self, arg) 1274 """ 1275 If arg is a str or int type, return the column Series. 1276 If arg is a slice, return a new DataFrame with all columns (...) 1333 8 8 8 8 1334 """ 1335 if _is_scalar_or_zero_d_array(arg) or isinstance(arg, tuple): -> 1336 return self._get_columns_by_label(arg, downcast=True) 1338 elif isinstance(arg, slice): 1339 return self._slice(arg)

File ~/miniconda3/lib/python3.10/site-packages/nvtx/nvtx.py:116, in annotate.call..inner(*args, kwargs) 113 @wraps(func) 114 def inner(*args, *kwargs): 115 libnvtx_push_range(self.attributes, self.domain.handle) --> 116 result = func(args, kwargs) 117 libnvtx_pop_range(self.domain.handle) 118 return result

File ~/miniconda3/lib/python3.10/site-packages/cudf/core/dataframe.py:1995, in DataFrame._get_columns_by_label(self, labels, downcast) 1986 @_cudf_nvtx_annotate 1987 def _get_columns_by_label( 1988 self, labels, *, downcast=False 1989 ) -> Self | Series: 1990 """ 1991 Return columns of dataframe by labels 1992 1993 If downcast is True, try and downcast from a DataFrame to a Series 1994 """ -> 1995 ca = self._data.select_by_label(labels) 1996 if downcast: 1997 if is_scalar(labels):

File ~/miniconda3/lib/python3.10/site-packages/cudf/core/column_accessor.py:351, in ColumnAccessor.select_by_label(self, key) 349 if any(isinstance(k, slice) for k in key): 350 return self._select_by_label_with_wildcard(key) --> 351 return self._select_by_label_grouped(key)

File ~/miniconda3/lib/python3.10/site-packages/cudf/core/column_accessor.py:496, in ColumnAccessor._select_by_label_grouped(self, key) 495 def _select_by_label_grouped(self, key: Any) -> ColumnAccessor: --> 496 result = self._grouped_data[key] 497 if isinstance(result, column.ColumnBase): 498 # self._grouped_data[key] = self._data[key] so skip validation 499 return self.class( 500 data={key: result}, 501 multiindex=self.multiindex, 502 verify=False, 503 )

KeyError: 'category_id_price_sum'