NVIDIA-Merlin / Merlin

NVIDIA Merlin is an open source library providing end-to-end GPU-accelerated recommender systems, from feature engineering and preprocessing to training deep learning models and running inference in production.
Apache License 2.0
715 stars 111 forks source link

[QST] How to do normal retrieval of candidates without starting a server #1095

Closed Anindyadeep closed 2 months ago

Anindyadeep commented 3 months ago

Hello, first of thanks for creating this library, the way everything is crafted makes it worth production grade.

However, I am in my learning phase right now. I have went through almost all the tutorials. Currently I was following this tutorial.

I am in one of the final phases of this tutorial, where I intend to just retrieve the candidate sets (item features) and not do ranking. I also do not intend to start a server for that. Because I am getting triton logs which says:

I0405 04:38:53.051978 64840 server.cc:673] 
+---------------------------+---------+------------------------------------------------------------+
| Model                     | Version | Status                                                     |
+---------------------------+---------+------------------------------------------------------------+
| 0_transformworkflowtriton | 1       | UNAVAILABLE: Internal: Unable to initialize shared memory  |
|                           |         | key 'triton_python_backend_shm_region_3' to requested size |
|                           |         |  (67108864 bytes). If you are running Triton inside docker |
|                           |         | , use '--shm-size' flag to control the shared memory regio |
|                           |         | n size. Each Python backend model instance requires at lea |
|                           |         | st 64MBs of shared memory. Error: No space left on device  |
| 1_predicttensorflowtriton | 1       | READY                                                      |
| 2_transformworkflowtriton | 1       | UNAVAILABLE: Internal: Unable to initialize shared memory  |
|                           |         | key 'triton_python_backend_shm_region_1' to requested size |
|                           |         |  (67108864 bytes). If you are running Triton inside docker |
|                           |         | , use '--shm-size' flag to control the shared memory regio |
|                           |         | n size. Each Python backend model instance requires at lea |
|                           |         | st 64MBs of shared memory. Error: No space left on device  |
| executor_model            | 1       | UNAVAILABLE: Internal: Unable to initialize shared memory  |
|                           |         | key 'triton_python_backend_shm_region_2' to requested size |
|                           |         |  (67108864 bytes). If you are running Triton inside docker |
|                           |         | , use '--shm-size' flag to control the shared memory regio |
|                           |         | n size. Each Python backend model instance requires at lea |
|                           |         | st 64MBs of shared memory. Error: No space left on device  |
+---------------------------+---------+------------------------------------------------------------+

I have: Nvidia RTX 3060, 6 GB

Most of the workflows and model training went super smooth, but I am stucked with resource problem in the triton inference stage. And since merlin has bit less documenation, so I am confused on how can I execute a workflow for an user id which can do the following:

user id -> goes to feast for user feature -> retrieval model -> ANN in Faiss -> retrieve candidate set -> feast item feature -> return the result in a df 

This is the example code, taken from the same tutorial. I need help on how can I execute the above workflow, without setting triton server

from merlin.systems.dag.ops.faiss import QueryFaiss, setup_faiss 
from merlin.systems.dag.ops.feast import QueryFeast

DATA_FOLDER = "./data"
BASE_DIR = "./merlin_recsys"

os.makedirs(
    os.path.join(BASE_DIR, "faiss_index"), exist_ok=True
)
FEATURE_REPO_PATH = os.path.join(BASE_DIR, "feast_repo/feature_repo")

# Set the feature store
feature_store = feast.FeatureStore(FEATURE_REPO_PATH)

# Set up FAISS index 

faiss_index_path = os.path.join(BASE_DIR, 'faiss_index', "index.faiss")
retrieval_model_path = os.path.join(BASE_DIR, "query_tower/")

# read the embeddings

item_embeddings = pd.read_parquet(
    os.path.join(BASE_DIR, "item_embeddings.parquet")
)
setup_faiss(item_embeddings, faiss_index_path, embedding_column="output_1")

user_attributes = ["user_id"] >> QueryFeast.from_feature_view(
    store = feature_store,
    view="user_features",
    column="user_id", 
    include_id=True
)

# Get existing workflow

nvt_workflow = Workflow.load(os.path.join(DATA_FOLDER, "processed_nvt/workflow"))
user_subgraph = nvt_workflow.get_subworkflow("user")
user_features = user_attributes >> TransformWorkflow(user_subgraph)

# Retrieval workflow (but not using nvt.workflow)

topk_retrieval = 100
retrieval = (
    user_features
    >> PredictTensorflow(retrieval_model_path)
    >> QueryFaiss(faiss_index_path, topk=topk_retrieval)
)

item_attributes = retrieval["candidate_ids"] >> QueryFeast.from_feature_view(
    store=feature_store,
    view="item_features",
    column="candidate_ids",
    output_prefix="item",
    include_id=True,
)

user_features_to_unroll = [
    "user_id",
    "user_shops",
    "user_profile",
    "user_group",
    "user_gender",
    "user_age",
    "user_consumption_2",
    "user_is_occupied",
    "user_geography",
    "user_intentions",
    "user_brands",
    "user_categories",
]

combined_features = item_features >> UnrollFeatures(
    "item_id", user_features[user_features_to_unroll]
)
ensemble = Ensemble(ordering, request_schema)

Now I tried to do this (through some guessing), but got error:

request = make_df({"user_id": [7]})
ensemble.transform(request)

Got this error:

ERROR:merlin:Failed to transform operator <merlin.systems.dag.runtimes.triton.ops.workflow.TransformWorkflowTriton object at 0x7ed3303bb100>
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/merlin/dag/executors.py", line 237, in _run_node_transform
    transformed_data = node.op.transform(selection, input_data)
  File "/usr/local/lib/python3.10/dist-packages/merlin/systems/dag/runtimes/triton/ops/workflow.py", line 82, in transform
    inference_request = tensor_table_to_triton_request(
  File "/usr/local/lib/python3.10/dist-packages/merlin/systems/triton/conversions.py", line 282, in tensor_table_to_triton_request
    input_tensors = [_triton_tensor_from_array(name, tensor) for name, tensor in aligned.items()]
  File "/usr/local/lib/python3.10/dist-packages/merlin/systems/triton/conversions.py", line 282, in <listcomp>
    input_tensors = [_triton_tensor_from_array(name, tensor) for name, tensor in aligned.items()]
  File "/usr/local/lib/python3.10/dist-packages/merlin/systems/triton/conversions.py", line 324, in _triton_tensor_from_array
    tensor = pb_utils.Tensor(name, array)
AttributeError: 'NoneType' object has no attribute 'Tensor'
rnyak commented 2 months ago

Where do you run this example? on which env? with what GPU? you can add this --shm-size=1g arg in your docker run .. and test again.

Anindyadeep commented 2 months ago

Yes, I figured that after some iterations, but thanks for the answer

we can close this issue :)