NVIDIA-Merlin / Transformers4Rec

Transformers4Rec is a flexible and efficient library for sequential and session-based recommendation and works with PyTorch.
https://nvidia-merlin.github.io/Transformers4Rec/main
Apache License 2.0
1.08k stars 142 forks source link

Add pre-trained embeddings support to T4Rec input blocks #690

Closed sararb closed 1 year ago

sararb commented 1 year ago

Fixes #683 Fixes #684 Fixes #685 Fixes #485

Goals :soccer:

github-actions[bot] commented 1 year ago

Documentation preview

https://nvidia-merlin.github.io/Transformers4Rec/review/pr-690

gaceladri commented 1 year ago

Hi, nice feature! Do you anticipate this PR to be merged and available for use with your docker files soon? I would like to test this feature within T4R for this competition. I'm not sure if NVIDIA's leadership team is already using the text embeds. 😄

Thanks and good job!

gaceladri commented 1 year ago

I have managed to pass the embeddings to the trainer, but I'm not sure how to map the embedding IDs to the article_ids since they have gone through the Categorify operation and it's not a 1-1 mapping. Any ideas or help? I could use an OrdinalEncoder outside of NVTabular since I am not very familiar with it, so I could avoid the Categorify operation.

Here's the code I used to pass the embeddings to the trainer:

Click to expand/hide the code block ```python import numpy as np import pandas as pd import os import sys import warnings warnings.filterwarnings('ignore') import cudf import nvtabular as nvt from nvtabular.ops import * from merlin.schema.tags import Tags import merlin.models.torch as mm from merlin.io.dataset import Dataset data_path = './' train_dataset = nvt.Dataset(os.path.join(data_path, 'train.parquet')) valid_dataset = nvt.Dataset(os.path.join(data_path, 'val.parquet')) item_id = ["article_id"] >> Categorify(dtype='int32') >> TagAsItemID() item_features = ["brand", "color", "size", "model", "material"] >> FillMissing(0) >> Categorify(dtype='int32') >> TagAsItemFeatures() timestamp = ['session_step'] >> nvt.ops.AddMetadata(tags=[Tags.TIME]) def relative_price_to_avg_pr(col, gdf): epsilon = 1e-5 col = ((gdf['price'] - col) / (col + epsilon)) * (col > 0).astype(int) return col price = ( ['price'] >> FillMissing(0) >> LogOp() >> Normalize() >> LambdaOp(lambda col: col.astype("float32")) >> TagAsItemFeatures() ) avg_price_product = ['article_id'] >> JoinGroupby(cont_cols =['price'], stats=["mean"]) relative_price_to_avg = ( avg_price_product >> LambdaOp(relative_price_to_avg_pr, dependency=['price']) >> LambdaOp(lambda col: col.astype("float32")) >> Rename(name='relative_price') >> AddMetadata(tags=["item", Tags.CONTINUOUS]) ) feats = item_id + item_features + timestamp + price + relative_price_to_avg groupby_feats = (feats >> Groupby( groupby_cols=['customer_id'], aggs={ 'brand': ['list'], 'article_id': ['list'], 'color': ['list'], 'size': ['list'], 'model': ['list'], 'material': ['list'], # 'relative_price': ['list'], 'price': ['list'] }, sort_cols=['session_step'] )) session_max_lenght = 20 groupby_feats_trunc = groupby_feats >> ListSlice(10) workflow = nvt.Workflow(groupby_feats_trunc) workflow.fit(train_dataset) workflow.transform(train_dataset).to_parquet(os.path.join(data_path, "train/")) workflow.transform(valid_dataset).to_parquet(os.path.join(data_path, "test/")) -------------------------------------------------------------------------------------------------------- import os import numpy as np from merlin.io import Dataset from transformers4rec.torch.utils.data_utils import MerlinDataLoader data_path = './' train = Dataset(os.path.join(data_path, 'train/part_0.parquet')) valid = Dataset(os.path.join(data_path, 'test/part_0.parquet'), part_mem_fraction=0.1) train_schema = train.schema val_schema = train.schema from merlin.dataloader.ops.embeddings import EmbeddingOperator from merlin.dataloader.torch import Loader np_emb_item_id = np.random.rand(1326769, 16) train_loader = Loader(train, batch_size=32, transforms=[ EmbeddingOperator( np_emb_item_id, lookup_key='article_id_list', embedding_name='pretrained_item_id_embeddings') ]) # , # device=0) val_loader = Loader(valid, batch_size=32, transforms=[ EmbeddingOperator( np_emb_item_id, lookup_key='article_id_list', embedding_name='pretrained_item_id_embeddings') ]) # , # device=0) from transformers4rec import torch as tr from transformers4rec.torch.ranking_metric import NDCGAt, RecallAt, PrecisionAt, AvgPrecisionAt max_sequence_length, d_model = 10, 64 input_module = tr.TabularSequenceFeatures.from_schema( schema=train_schema, max_sequence_length=max_sequence_length, continuous_projection=64, aggregation='concat', d_output=d_model, masking="mlm", ) prediction_task = tr.NextItemPredictionTask(weight_tying=True, metrics=[ NDCGAt(top_ks=[10, 20], labels_onehot=True), RecallAt(top_ks=[10, 20], labels_onehot=True), PrecisionAt(top_ks=[10, 20], labels_onehot=True), AvgPrecisionAt(top_ks=[10, 20], labels_onehot=True) ]) transformer_config = tr.XLNetConfig.build(d_model=d_model, n_head=8, n_layer=2, total_seq_length=max_sequence_length) model = transformer_config.to_torch_model(input_module, prediction_task) training_args = tr.trainer.T4RecTrainingArguments( output_dir='./tmp', max_sequence_length=10, data_loader_engine='merlin', num_train_epochs=10, dataloader_drop_last=False, per_device_train_batch_size=256, per_device_eval_batch_size=128, learning_rate=0.0005, fp16=True, report_to=[], logging_steps=500, eval_steps=500, evaluation_strategy='steps' ) recsys_trainer = tr.Trainer( model=model, args=training_args, train_dataloader=train_loader, eval_dataloader=val_loader, schema=train_schema, compute_metrics=True) recsys_trainer.train() ```

I also had to modify some internals in the Transformers4Rec trainer:

Click to expand/hide the code block ```python - batch_size = dataloader._batch_size + batch_size = dataloader.batch_size - if not isinstance(dataloader.dataset, collections.abc.Sized): - raise ValueError("dataset must implement __len__") + # if not isinstance(dataloader.dataset, collections.abc.Sized): + # raise ValueError("dataset must implement __len__") ```
sararb commented 1 year ago

@gaceladri I'm glad to hear that you're interested in using the new feature in the Transformers4Rec (T4R) library for the KDD competition. Please note that this feature is currently a work in progress, but we plan to include it in the upcoming release.

Regarding your question on how to map embedding IDs to article IDs after the Categorify operation, we are currently working on aligning the output of Categorify with the pre-trained embeddings matrix.

In the meantime, I prepared a workaround solution based on your code:

Click to expand/hide the code Once you fit your workflow, you can save it to disk using the following code: ```workflow.save(os.path.join(DATA_FOLDER, "workflow_etl”))``` ==> This will create a workflow_etl directory, which contains a categories subdirectory with parquet files that list the unique values of each categorical feature. You can use the parquet file to map original IDs to encoded ones using the following function: ``` def categorical_mapping(categories_dir: str, col_name: str = "item_id"): """ Loads the categories file and gets the mapping from the original Item ID the index value. By convention, the "merlin IDs" are the row number for each item_id in this DataFrame. """ filename = os.path.join(categories_dir, f"unique.{col_name}.parquet") df = pd.read_parquet(filename) return dict(zip(df[col_name], df.index)) ``` Assuming you have a DataFrame of pre-trained embeddings associated with the original IDs, you can create the numpy file required by the PretrainedEmbeddingsOp as follows: image ``` # Reorganize pre-trained embeddings based on the encoded ID. mapping = categorical_mapping("/workspace/data/yoochoose/workflow_etl/categories/") pre_trained_df['encoded_id'] = pre_trained_df.original_ids.map(mapping) pre_trained_df = pre_trained_df.sort_values('encoded_id') pre_trained_df = pre_trained_df.set_index('encoded_id') # Complete the special item IDs (like padding) and fill their embeddings with 0s. pre_trained_df = pre_trained_df.reindex(np.arange(0, len(mapping), dtype=pre_trained_df.index.dtype)).fillna(0) # Save the result to a .npy file. np.save('pre-trained-item-id.npy', pre_trained_df.values) ``` It's worth noting that the NVT Categorify reserves some of the first positions (based on the `start_index` parameter) to account for special values used in modeling libraries, such as padding. Therefore, we recommend using start_index=1 for T4R to account for padding (index 0) via: `nvt.ops.Categorify(start_index=1)`.

I hope this helps! If you have any further questions, feel free to ask.

dcy0577 commented 7 months ago

Hi, cool feature indeed. Is there an example about how to serve the pytorch model with pretrained embedding on Triton server? I tried to follow the example from your tf repo. Here is my code:

  model_input_dict_from_dataloaer = next(iter(dataloader))[0]

  # changes output schema from "next items" to "items scores" and "items ids" 
  topk = 5
  model.top_k = topk

  model.eval()
  print(model.input_schema)
  print(model.output_schema)
  traced_model = torch.jit.trace(model, model_input_dict_from_dataloaer, strict=True)

  ens_model_path = os.environ.get("ens_model_path", "ens_models_standard_workflow_with_pretrained_embeddings")
  os.mkdir(ens_model_path)

  workflow = nvt.Workflow.load(workflow_path)

  torch_op = workflow.input_schema.column_names >> TransformWorkflow(workflow) >> embeddings_op >> PredictPyTorch(
  traced_model, model.input_schema, model.output_schema)

  ensemble = Ensemble(torch_op, workflow.input_schema)
  ens_config, node_configs = ensemble.export(ens_model_path)
  print(ens_config)
  print(node_configs)

However I got errors when I start the inference using the exported graph. Any hints? Thanks!

ArthurHochedez commented 2 months ago

Hi, I'm new with T4Rec and I am trying to use pretrained embeddings for this competition: https://recsys.eb.dk/. Is there a notebook on how to do that?

MitchDonley commented 1 month ago

Would also like a notebook on how to do this. Would be super helpful.

rnyak commented 1 month ago

@MitchDonley @ArthurHochedez hello. we do not have enough bandwidth to add new notebooks or new features at this moment. If you would like to contribute, you are more than welcome to create an example notebook PR.