NVIDIA-Merlin / NVTabular

NVTabular is a feature engineering and preprocessing library for tabular data designed to quickly and easily manipulate terabyte scale datasets used to train deep learning based recommender systems.
Apache License 2.0
1.04k stars 143 forks source link

[BUG] The `schema.pbtxt` file generated from NVT workflow is not as expected #1156

Closed rnyak closed 2 years ago

rnyak commented 2 years ago

Describe the bug I am trying to generate schema.pbtxt file from NVTabular workflow using the following script, and I get the schema.pbtxt as like that:

feature {
  name: "item_id-count"
  annotation {
    tag: "categorical"
  }
}
feature {
  name: "day-first"
  annotation {
    tag: "categorical"
  }
}
feature {
  name: "session_id"
  type: INT
  int_domain {
    name: "session_id"
    min: 0
    max: 10002
    is_categorical: true
  }
  annotation {
    tag: "categorical"
    extra_metadata {
      type_url: "type.googleapis.com/google.protobuf.Struct"
      value: "\n\021\n\013num_buckets\022\002\010\000\n\033\n\016freq_threshold\022\t\021\000\000\000\000\000\000\000\000\n\025\n\010max_size\022\t\021\000\000\000\000\000\000\000\000\n\030\n\013start_index\022\t\021\000\000\000\000\000\000\360?\n5\n\010cat_path\022)\032\'.//categories/unique.session_id.parquet\nG\n\017embedding_sizes\0224*2\n\030\n\013cardinality\022\t\021\000\000\000\000\000\211\303@\n\026\n\tdimension\022\t\021\000\000\000\000\000`q@"
    }
  }
}
feature {
  name: "category-list_trim"
  annotation {
    tag: "categorical"
    tag: "list"
  }
}
feature {
  name: "item_id-list_trim"
  annotation {
    tag: "categorical"
    tag: "list"
  }
}
feature {
  name: "timestamp/age_days-list_trim"
  annotation {
    tag: "categorical"
    tag: "list"
  }
}
feature {
  name: "timestamp/weekday/sin-list_trim"
  annotation {
    tag: "categorical"
    tag: "list"
  }
}

The issue in this schema.pbtxt file is that there is no min and max values for categoricals which is required for Transformes4Rec model.

Steps/Code to reproduce bug

NUM_ROWS = 1000
long_tailed_item_distribution = np.clip(np.random.lognormal(3., 1., NUM_ROWS).astype(np.int32), 1, 50000)

# generate random item interaction features 
df = pd.DataFrame(np.random.randint(70000, 80000, NUM_ROWS), columns=['session_id'])
df['item_id'] = long_tailed_item_distribution

# generate category mapping for each item-id
df['category'] = pd.cut(df['item_id'], bins=334, labels=np.arange(1, 335)).astype(np.int32)
df['timestamp/age_days'] = np.random.uniform(0, 1, NUM_ROWS)
df['timestamp/weekday/sin']= np.random.uniform(0, 1, NUM_ROWS)

# generate day mapping for each session 
map_day = dict(zip(df.session_id.unique(), np.random.randint(1, 10, size=(df.session_id.nunique()))))
df['day'] =  df.session_id.map(map_day)

# Categorify categorical features
categ_feats = ['session_id', 'item_id', 'category'] >> nvt.ops.Categorify(start_index=1)

# Define Groupby Workflow
groupby_feats = categ_feats + ['day', 'timestamp/age_days', 'timestamp/weekday/sin']

# Groups interaction features by session and sorted by timestamp
groupby_features = groupby_feats >> nvt.ops.Groupby(
    groupby_cols=["session_id"], 
    aggs={
        "item_id": ["list", "count"],
        "category": ["list"],     
        "day": ["first"],
        "timestamp/age_days": ["list"],
        'timestamp/weekday/sin': ["list"],
        },
    name_sep="-")

# Select and truncate the sequential features
sequence_features_truncated = (groupby_features['category-list', 'item_id-list', 
                                          'timestamp/age_days-list', 'timestamp/weekday/sin-list']) >> \
                            nvt.ops.ListSlice(0,20) >> nvt.ops.Rename(postfix = '_trim')

# Filter out sessions with length 1 (not valid for next-item prediction training and evaluation)
MINIMUM_SESSION_LENGTH = 2
selected_features = groupby_features['item_id-count', 'day-first', 'session_id'] + sequence_features_truncated
filtered_sessions = selected_features >> nvt.ops.Filter(f=lambda df: df["item_id-count"] >= MINIMUM_SESSION_LENGTH)

workflow = nvt.Workflow(filtered_sessions)
dataset = nvt.Dataset(df, cpu=False)
# Generating statistics for the features
workflow.fit(dataset)
workflow.transform(dataset).to_parquet(
    './schema',
    out_files_per_proc=1,
)

schema_path = Path('./schema')
proto_schema = Schema.read_protobuf(schema_path / "schema.pbtxt")

Expected behavior The schema.pbtxt file should be like the following. The min max of continuous is not important but min max` values for categoricals are must-have.

feature {
  name: "session_id"
  type: INT
  int_domain {
    name: "session_id"
    min: 1
    max: 100001
    is_categorical: false
  }
  annotation {
    tag: "groupby_col"
  }
}
feature {
  name: "category-list_trim"
  value_count {
    min: 2
    max: 20
  }
  type: INT
  int_domain {
    name: "category-list_trim"
    min: 1
    max: 400
    is_categorical: true
  }
  annotation {
    tag: "list"
    tag: "categorical"
    tag: "item"
  }
}
feature {
  name: "item_id-list_trim"
  value_count {
    min: 2
    max: 20
  }
  type: INT
  int_domain {
    name: "item_id/list"
    min: 1
    max: 50005
    is_categorical: true
  }
  annotation {
    tag: "item_id"
    tag: "list"
    tag: "categorical"
    tag: "item"
  }
}
feature {
  name: "timestamp/age_days-list_trim"
  value_count {
    min: 2
    max: 20
  }
  type: FLOAT
  float_domain {
    name: "timestamp/age_days-list_trim"
    min: 0.0000003
    max: 0.9999999
  }
  annotation {
    tag: "continuous"
    tag: "list"
  }
}
feature {
  name: "timestamp/weekday/sin-list_trim"
  value_count {
    min: 2
    max: 20
  }
  type: FLOAT
  float_domain {
    name: "timestamp/weekday-sin_trim"
    min: 0.0000003
    max: 0.9999999
  }
  annotation {
    tag: "time"
    tag: "list"
  }
}

Environment details (please complete the following information):

benfred commented 2 years ago

Will be fixed by https://github.com/NVIDIA-Merlin/Transformers4Rec/pull/274

samhedin commented 1 year ago

Is this solved? When I generate schema.pbtxt with nvtabular it saves max values but not min values.

Script to reproduce

based on https://github.com/NVIDIA-Merlin/Transformers4Rec/blob/main/examples/tutorial/02-ETL-with-NVTabular.ipynb

#!/usr/bin/env python
# coding: utf-8

# This notebook is created using the latest stable [merlin-pytorch](https://catalog.ngc.nvidia.com/orgs/nvidia/teams/merlin/containers/merlin-pytorch) container.
#
# **Launch the docker container**
# ```
# docker run -it --gpus device=0 -p 8000:8000 -p 8001:8001 -p 8002:8002 -p 8888:8888 -v <path_to_data>:/workspace/data/  nvcr.io/nvidia/merlin/merlin-pytorch:22.XX
# ```
# This script will mount your local data folder that includes your data files to `/workspace/data` directory in the merlin-pytorch docker container.

# ## Overview

# This notebook demonstrates how to use NVTabular to perform the feature engineering that is needed to model the `YOOCHOOSE` dataset which contains a collection of sessions from a retailer. Each session  encapsulates the click events that the user performed in that session.
#
# The dataset is available on [Kaggle](https://www.kaggle.com/chadgostopp/recsys-challenge-2015). You need to download it and copy to the `DATA_FOLDER` path. Note that we are only using the `yoochoose-clicks.dat` file.
#

import os
import glob
import numpy as np
import gc

import cudf
import cupy
import nvtabular as nvt
from merlin.dag import ColumnSelector
from merlin.schema import Schema, Tags

DATA_FOLDER = "."
FILENAME_PATTERN = 'yoochoose-clicks.dat'
DATA_PATH = os.path.join(DATA_FOLDER, FILENAME_PATTERN)

OUTPUT_FOLDER = "./yoochoose_transformed"
OVERWRITE = False

interactions_df = cudf.read_csv(DATA_PATH, sep=',',
                                names=['session_id','timestamp', 'item_id', 'category'],
                                dtype=['int', 'datetime64[s]', 'int', 'int'])

print("Count with in-session repeated interactions: {}".format(len(interactions_df)))
# Sorts the dataframe by session and timestamp, to remove consecutive repetitions
interactions_df.timestamp = interactions_df.timestamp.astype(int)
interactions_df = interactions_df.sort_values(['session_id', 'timestamp'])
past_ids = interactions_df['item_id'].shift(1).fillna()
session_past_ids = interactions_df['session_id'].shift(1).fillna()
# Keeping only no consecutive repeated in session interactions
interactions_df = interactions_df[~((interactions_df['session_id'] == session_past_ids) & (interactions_df['item_id'] == past_ids))]
print("Count after removed in-session repeated interactions: {}".format(len(interactions_df)))

items_first_ts_df = interactions_df.groupby('item_id').agg({'timestamp': 'min'}).reset_index().rename(columns={'timestamp': 'itemid_ts_first'})
interactions_merged_df = interactions_df.merge(items_first_ts_df, on=['item_id'], how='left').head(100000)
interactions_merged_df.head()

# Let's save the interactions_merged_df to disk to be able to use in the inference step.

# In[7]:

interactions_merged_df.to_parquet(os.path.join(DATA_FOLDER, 'interactions_merged_df.parquet'))

# free gpu memory
del interactions_df, session_past_ids, items_first_ts_df
gc.collect()

# ##  Define a preprocessing workflow with NVTabular

# NVTabular is a feature engineering and preprocessing library for tabular data designed to quickly and easily manipulate terabyte scale datasets used to train deep learning based recommender systems. It provides a high level abstraction to simplify code and accelerates computation on the GPU using the RAPIDS cuDF library.
#
# NVTabular supports different feature engineering transformations required by deep learning (DL) models such as Categorical encoding and numerical feature normalization. It also supports feature engineering and generating sequential features.
#
# More information about the supported features can be found <a href=https://nvidia-merlin.github.io/NVTabular/main/index.html> here. </a>

# ### Feature engineering: Create and Transform items features

# In this cell, we are defining three transformations ops:
#
# - 1. Encoding categorical variables using `Categorify()` op. We set `start_index` to 1, so that encoded null values start from `1` instead of `0` because we reserve `0` for padding the sequence features.
# - 2. Deriving temporal features from timestamp and computing their cyclical representation using a custom lambda function.
# - 3. Computing the item recency in days using a custom Op. Note that item recency is defined as the difference between the first occurrence of the item in dataset and the actual date of item interaction.
#
# For more ETL workflow examples, visit NVTabular [example notebooks](https://github.com/NVIDIA-Merlin/NVTabular/tree/main/examples).

# In[9]:

# Encodes categorical features as contiguous integers
cat_feats = ColumnSelector(['session_id', 'category', 'item_id']) >> nvt.ops.Categorify(start_index=1)

# create time features
session_ts = ColumnSelector(['timestamp'])
session_time = (
    session_ts >>
    nvt.ops.LambdaOp(lambda col: cudf.to_datetime(col, unit='s')) >>
    nvt.ops.Rename(name = 'event_time_dt')
)
sessiontime_weekday = (
    session_time >>
    nvt.ops.LambdaOp(lambda col: col.dt.weekday) >>
    nvt.ops.Rename(name ='et_dayofweek')
)

# Derive cyclical features: Defines a custom lambda function
def get_cycled_feature_value_sin(col, max_value):
    value_scaled = (col + 0.000001) / max_value
    value_sin = np.sin(2*np.pi*value_scaled)
    return value_sin

weekday_sin = sessiontime_weekday >> (lambda col: get_cycled_feature_value_sin(col+1, 7)) >> nvt.ops.Rename(name = 'et_dayofweek_sin')

# Compute Item recency: Define a custom Op
class ItemRecency(nvt.ops.Operator):
    def transform(self, columns, gdf):
        for column in columns.names:
            col = gdf[column]
            item_first_timestamp = gdf['itemid_ts_first']
            delta_days = (col - item_first_timestamp) / (60*60*24)
            gdf[column + "_age_days"] = delta_days * (delta_days >=0)
        return gdf

    def compute_selector(
        self,
        input_schema: Schema,
        selector: ColumnSelector,
        parents_selector: ColumnSelector,
        dependencies_selector: ColumnSelector,
    ) -> ColumnSelector:
        self._validate_matching_cols(input_schema, parents_selector, "computing input selector")
        return parents_selector

    def column_mapping(self, col_selector):
        column_mapping = {}
        for col_name in col_selector.names:
            column_mapping[col_name + "_age_days"] = [col_name]
        return column_mapping

    @property
    def dependencies(self):
        return ["itemid_ts_first"]

    @property
    def output_dtype(self):
        return np.float64

recency_features = session_ts >> ItemRecency()
# Apply standardization to this continuous feature
recency_features_norm = recency_features >> nvt.ops.LogOp() >> nvt.ops.Normalize() >> nvt.ops.Rename(name='product_recency_days_log_norm')

time_features = (
    session_time +
    sessiontime_weekday +
    weekday_sin +
    recency_features_norm
)

features = ColumnSelector(['timestamp', 'session_id']) + cat_feats + time_features

# ### Define the preprocessing of sequential features

# Once the item features are generated, the objective of this cell is grouping interactions at the session level, sorting the interactions by time. We additionally truncate all sessions to first 20 interactions and filter out sessions with less than 2 interactions.

# In[10]:

# Define Groupby Operator
groupby_features = features >> nvt.ops.Groupby(
    groupby_cols=["session_id"],
    sort_cols=["timestamp"],
    aggs={
        'item_id': ["list", "count"],
        'category': ["list"],
        'timestamp': ["first"],
        'event_time_dt': ["first"],
        'et_dayofweek_sin': ["list"],
        'product_recency_days_log_norm': ["list"]
        },
    name_sep="-") >> nvt.ops.AddMetadata(tags=[Tags.CATEGORICAL])

# Truncate sequence features to first interacted 20 items
SESSIONS_MAX_LENGTH = 20

groupby_features_list = groupby_features['item_id-list', 'category-list', 'et_dayofweek_sin-list', 'product_recency_days_log_norm-list']
groupby_features_truncated = groupby_features_list >> nvt.ops.ListSlice(0, SESSIONS_MAX_LENGTH, pad=True) >> nvt.ops.Rename(postfix = '_seq')

# Calculate session day index based on 'event_time_dt-first' column
day_index = ((groupby_features['event_time_dt-first'])  >>
    nvt.ops.LambdaOp(lambda col: (col - col.min()).dt.days +1) >>
    nvt.ops.Rename(f = lambda col: "day_index")
)

# Select features for training
selected_features = groupby_features['session_id', 'item_id-count'] + groupby_features_truncated + day_index

# Filter out sessions with less than 2 interactions
MINIMUM_SESSION_LENGTH = 2
filtered_sessions = selected_features >> nvt.ops.Filter(f=lambda df: df["item_id-count"] >= MINIMUM_SESSION_LENGTH)

# - Avoid Numba low occupancy warnings

# In[11]:

from numba import config
config.CUDA_LOW_OCCUPANCY_WARNINGS = 0

# ### Execute NVTabular workflow

# Once we have defined the general workflow (`filtered_sessions`), we provide our cudf dataset to nvt.Dataset class which is optimized to split data into chunks that can fit in device memory and to handle the calculation of complex global statistics. Then, we execute the pipeline that fits and transforms data to get the desired output features.

# In[12]:

dataset = nvt.Dataset(interactions_merged_df)
workflow = nvt.Workflow(filtered_sessions)
# Learns features statistics necessary of the preprocessing workflow
workflow.fit(dataset)
workflow.transform(dataset).to_parquet("./schema_generated", out_files_per_proc=1)

Result

from ./schema_generated/schema.pbtxt

feature {
  name: "session_id"
  type: INT
  int_domain {
    name: "session_id"
    max: 30551
    is_categorical: true
  }
  annotation {
    tag: "categorical"
    extra_metadata {
      type_url: "type.googleapis.com/google.protobuf.Struct"
      value: "\n\021\n\013num_buckets\022\002\010\000\n\034\n\017dtype_item_size\022\t\021\000\000\000\000\000\000P@\n\025\n\010max_size\022\t\021\000\000\000\000\000\000\000\000\n5\n\010cat_path\022)\032\'.//categories/unique.session_id.parquet\n\033\n\016freq_threshold\022\t\021\000\000\000\000\000\000\000\000\n\017\n\tis_ragged\022\002 \000\n\030\n\013start_index\022\t\021\000\000\000\000\000\000\360?\n\r\n\007is_list\022\002 \000\nG\n\017embedding_sizes\0224*2\n\026\n\tdimension\022\t\021\000\000\000\000\000\000\200@\n\030\n\013cardinality\022\t\021\000\000\000\000\000\326\335@"
    }
  }
}
feature {
  name: "item_id-count"
  type: INT
  int_domain {
    name: "item_id"
    max: 10825
    is_categorical: true
  }
  annotation {
    tag: "categorical"
    extra_metadata {
      type_url: "type.googleapis.com/google.protobuf.Struct"
      value: "\n\017\n\tis_ragged\022\002 \000\n\r\n\007is_list\022\002 \000\n\025\n\010max_size\022\t\021\000\000\000\000\000\000\000\000\n\033\n\016freq_threshold\022\t\021\000\000\000\000\000\000\000\000\n2\n\010cat_path\022&\032$.//categories/unique.item_id.parquet\nG\n\017embedding_sizes\0224*2\n\026\n\tdimension\022\t\021\000\000\000\000\0000r@\n\030\n\013cardinality\022\t\021\000\000\000\000\000%\305@\n\030\n\013start_index\022\t\021\000\000\000\000\000\000\360?\n\021\n\013num_buckets\022\002\010\000\n\034\n\017dtype_item_size\022\t\021\000\000\000\000\000\000@@"
    }
  }
}
feature {
  name: "item_id-list_seq"
  value_count {
  }
  type: INT
  int_domain {
    name: "item_id"
    max: 10825
    is_categorical: true
  }
  annotation {
    tag: "categorical"
    tag: "list"
    extra_metadata {
      type_url: "type.googleapis.com/google.protobuf.Struct"
      value: "\n\030\n\013start_index\022\t\021\000\000\000\000\000\000\360?\n\034\n\017dtype_item_size\022\t\021\000\000\000\000\000\000P@\n\021\n\013num_buckets\022\002\010\000\n\017\n\tis_ragged\022\002 \000\n2\n\010cat_path\022&\032$.//categories/unique.item_id.parquet\n\033\n\016freq_threshold\022\t\021\000\000\000\000\000\000\000\000\n\r\n\007is_list\022\002 \001\nG\n\017embedding_sizes\0224*2\n\026\n\tdimension\022\t\021\000\000\000\000\0000r@\n\030\n\013cardinality\022\t\021\000\000\000\000\000%\305@\n\025\n\010max_size\022\t\021\000\000\000\000\000\000\000\000"
    }
  }
}
feature {
  name: "category-list_seq"
  value_count {
  }
  type: INT
  int_domain {
    name: "category"
    max: 2
    is_categorical: true
  }
  annotation {
    tag: "categorical"
    tag: "list"
    extra_metadata {
      type_url: "type.googleapis.com/google.protobuf.Struct"
      value: "\n\017\n\tis_ragged\022\002 \000\n3\n\010cat_path\022\'\032%.//categories/unique.category.parquet\n\034\n\017dtype_item_size\022\t\021\000\000\000\000\000\000P@\n\021\n\013num_buckets\022\002\010\000\n\033\n\016freq_threshold\022\t\021\000\000\000\000\000\000\000\000\n\030\n\013start_index\022\t\021\000\000\000\000\000\000\360?\nG\n\017embedding_sizes\0224*2\n\030\n\013cardinality\022\t\021\000\000\000\000\000\000\010@\n\026\n\tdimension\022\t\021\000\000\000\000\000\0000@\n\r\n\007is_list\022\002 \001\n\025\n\010max_size\022\t\021\000\000\000\000\000\000\000\000"
    }
  }
}
feature {
  name: "et_dayofweek_sin-list_seq"
  value_count {
  }
  type: FLOAT
  annotation {
    tag: "categorical"
    tag: "list"
    extra_metadata {
      type_url: "type.googleapis.com/google.protobuf.Struct"
      value: "\n\017\n\tis_ragged\022\002 \000\n\034\n\017dtype_item_size\022\t\021\000\000\000\000\000\000@@\n\r\n\007is_list\022\002 \001"
    }
  }
}
feature {
  name: "product_recency_days_log_norm-list_seq"
  value_count {
  }
  type: FLOAT
  annotation {
    tag: "categorical"
    tag: "list"
    extra_metadata {
      type_url: "type.googleapis.com/google.protobuf.Struct"
      value: "\n\017\n\tis_ragged\022\002 \000\n\034\n\017dtype_item_size\022\t\021\000\000\000\000\000\000P@\n\r\n\007is_list\022\002 \001"
    }
  }
}
feature {
  name: "day_index"
  type: INT
  annotation {
    tag: "categorical"
    extra_metadata {
      type_url: "type.googleapis.com/google.protobuf.Struct"
      value: "\n\034\n\017dtype_item_size\022\t\021\000\000\000\000\000\000P@\n\017\n\tis_ragged\022\002 \000\n\r\n\007is_list\022\002 \000"
    }
  }
}

Update

I also see that product_recency_days_log_norm-list_seq gets tagged as categorical while it is continuous in the demo schema.