tensorflow / recommenders-addons

Additional utils and helpers to extend TensorFlow when build recommendation systems, contributed and maintained by SIG Recommenders.
Apache License 2.0
592 stars 135 forks source link

two features cannot share dynamic embeddings when one is a scalar id and the other is a list of ids #227

Closed cockroachzl closed 11 months ago

cockroachzl commented 2 years ago

System information

Describe the bug Suppose there are two features, one if movie_id of shape [batch_size,], the other is a list of movie_ids of shape [batch_size, length_of_list] to represent the recent history of movies watched. I would like these two features to share the same embedding table to reduce memory footprints, improve training speed and to generalize better.

The second feature of list of movie ids can be simply pooled after embedding lookup.

However, this won't work with dynamic_embedding in either eager mode or graph mode. When computing gradients grads = tape.gradient(loss, model.trainable_variables), an error is raised Inputs to operation AddN of type AddN must have the same size and shape. Input 0: [256,1,32] != input 1: [256,2,32] [Op:AddN]

Code to reproduce the issue A notebook is created to with the movielens dataset to reproduce this issue, see: https://github.com/cockroachzl/recommenders-addons/blob/master/docs/tutorials/reproduce_shared_embedding_issue.ipynb

In the notebook, the scalar id feature is movie_id, the list id feature is called second_movie_id:

movie_id = tf.reshape(batch['movie_id'], (-1, 1))
second_movie_id = tf.stack([tf.random.shuffle(batch["movie_id"]), tf.random.shuffle(batch["movie_id"])], axis=1)

Other info / logs The full stack trace is pasted below, which is also included in the notebook above.

---------------------------------------------------------------------------
InvalidArgumentError                      Traceback (most recent call last)
<ipython-input-15-b888f18a64ca> in <module>
----> 1 train(1)

<ipython-input-14-3eca2ca09eaa> in train(epoch)
      3         total_loss = np.array([])
      4         for (_, batch) in enumerate(dataset_train):
----> 5             loss = train_step(batch, model)
      6             total_loss = np.append(total_loss, loss)
      7         print("epoch:", i, "mean_squared_error:", np.mean(total_loss))

<ipython-input-13-a91e1f8a13cf> in train_step(batch, model)
      3     with tf.GradientTape() as tape:
      4         loss = model(batch)
----> 5     grads = tape.gradient(loss, model.trainable_variables)
      6     optimizer.apply_gradients(zip(grads, model.trainable_variables))
      7     return loss

/data/miniconda3/envs/env-3.8.8/lib/python3.8/site-packages/tensorflow/python/eager/backprop.py in gradient(self, target, sources, output_gradients, unconnected_gradients)
   1079                           for x in nest.flatten(output_gradients)]
   1080 
-> 1081     flat_grad = imperative_grad.imperative_grad(
   1082         self._tape,
   1083         flat_targets,

/data/miniconda3/envs/env-3.8.8/lib/python3.8/site-packages/tensorflow/python/eager/imperative_grad.py in imperative_grad(tape, target, sources, output_gradients, sources_raw, unconnected_gradients)
     65         "Unknown value for unconnected_gradients: %r" % unconnected_gradients)
     66 
---> 67   return pywrap_tfe.TFE_Py_TapeGradient(
     68       tape._tape,  # pylint: disable=protected-access
     69       target,

/data/miniconda3/envs/env-3.8.8/lib/python3.8/site-packages/tensorflow/python/eager/backprop.py in _aggregate_grads(gradients)
    644     return gradients[0]
    645   if all(isinstance(g, ops.Tensor) for g in gradients):
--> 646     return gen_math_ops.add_n(gradients)
    647   else:
    648     assert all(

/data/miniconda3/envs/env-3.8.8/lib/python3.8/site-packages/tensorflow/python/ops/gen_math_ops.py in add_n(inputs, name)
    396       return _result
    397     except _core._NotOkStatusException as e:
--> 398       _ops.raise_from_not_ok_status(e, name)
    399     except _core._FallbackException:
    400       pass

/data/miniconda3/envs/env-3.8.8/lib/python3.8/site-packages/tensorflow/python/framework/ops.py in raise_from_not_ok_status(e, name)
   7184 def raise_from_not_ok_status(e, name):
   7185   e.message += (" name: " + name if name is not None else "")
-> 7186   raise core._status_to_exception(e) from None  # pylint: disable=protected-access
   7187 
   7188 

InvalidArgumentError: Inputs to operation AddN of type AddN must have the same size and shape.  Input 0: [256,32] != input 1: [256,2,32] [Op:AddN]
Lifann commented 2 years ago

ShadowVariable will project the embedding space to local trainable scope. So every embedding lookup has its own ShadowVariable. So it should be like:

import sys,os

import numpy as np
import tensorflow as tf
from tensorflow.keras.layers import Dense

import tensorflow_datasets as tfds
import tensorflow_recommenders_addons as tfra

ratings = tfds.load("movielens/100k-ratings", split="train")

ratings = ratings.map(lambda x: {
    "movie_id": tf.strings.to_number(x["movie_id"], tf.int64),
    "user_id": tf.strings.to_number(x["user_id"], tf.int64),
    "user_rating": x["user_rating"]
})

tf.random.set_seed(2021)
shuffled = ratings.shuffle(100_000, seed=2021, reshuffle_each_iteration=False)

dataset_train = shuffled.take(100_000).batch(256)

class NCFModel(tf.keras.Model):

    def __init__(self):
        super(NCFModel, self).__init__()
        self.embedding_size = 32
        self.d0 = Dense(
            256,
            activation='relu',
            kernel_initializer=tf.keras.initializers.RandomNormal(0.0, 0.1),
            bias_initializer=tf.keras.initializers.RandomNormal(0.0, 0.1))
        self.d1 = Dense(
            64,
            activation='relu',
            kernel_initializer=tf.keras.initializers.RandomNormal(0.0, 0.1),
            bias_initializer=tf.keras.initializers.RandomNormal(0.0, 0.1))
        self.d2 = Dense(
            1,
            kernel_initializer=tf.keras.initializers.RandomNormal(0.0, 0.1),
            bias_initializer=tf.keras.initializers.RandomNormal(0.0, 0.1))

        self.user_embeddings = tfra.dynamic_embedding.get_variable(
            name="user_dynamic_embeddings",
            dim=self.embedding_size,
            initializer=tf.keras.initializers.RandomNormal(-1, 1))
        self.user_embeddings_shadow = tfra.dynamic_embedding.shadow_ops.ShadowVariable(
            self.user_embeddings,
            name='user_dynamic_embeddings_shadow',
            max_norm=None,
            trainable=True)

        self.movie_embeddings = tfra.dynamic_embedding.get_variable(
            name="moive_dynamic_embeddings",
            dim=self.embedding_size,
            initializer=tf.keras.initializers.RandomNormal(-1, 1))
        self.movie_embeddings_shadow = tfra.dynamic_embedding.shadow_ops.ShadowVariable(
            self.movie_embeddings,
            name='movie_dynamic_embeddings_shadow',
            max_norm=None,
            trainable=True)
        # Another ShadowVariable on `movie_embeddings`
        self.second_movie_embeddings_shadow = tfra.dynamic_embedding.shadow_ops.ShadowVariable(
            self.movie_embeddings,
            name='second_movie_dynamic_embeddings_shadow',
            max_norm=None,
            trainable=True)

        self.loss = tf.keras.losses.MeanSquaredError()

    def call(self, batch):
        movie_id = batch["movie_id"]
        second_movie_id = tf.stack([tf.random.shuffle(batch["movie_id"]), tf.random.shuffle(batch["movie_id"])], axis=1)
        user_id = batch["user_id"]
        rating = batch["user_rating"]

        input_shape = tf.shape(user_id)
        user_id_weights = tfra.dynamic_embedding.shadow_ops.embedding_lookup(self.user_embeddings_shadow, user_id, name='e1')
        user_id_weights = tf.reshape(user_id_weights, tf.concat([input_shape, [self.embedding_size]], 0))

        input_shape = tf.shape(movie_id)
        movie_id_weights = tfra.dynamic_embedding.shadow_ops.embedding_lookup(self.movie_embeddings_shadow, movie_id, name='e2')
        movie_id_weights = tf.reshape(movie_id_weights, tf.concat([input_shape, [self.embedding_size]], 0))

        input_shape = tf.shape(second_movie_id)
        second_movie_id_weights = tfra.dynamic_embedding.shadow_ops.embedding_lookup(self.second_movie_embeddings_shadow, second_movie_id, name='e3')
        second_movie_id_weights = tf.reshape(second_movie_id_weights, tf.concat([input_shape, [self.embedding_size]], 0))
        second_movie_id_weights = tfra.dynamic_embedding.keras.layers.embedding.reduce_pooling(second_movie_id_weights)

        embeddings = tf.concat([user_id_weights, movie_id_weights, second_movie_id_weights], axis=1)

        dnn = self.d0(embeddings)
        dnn = self.d1(dnn)
        dnn = self.d2(dnn)
        out = tf.reshape(dnn, shape=[-1])
        loss = self.loss(rating, out)
        return loss

model = NCFModel()
optimizer = tf.keras.optimizers.Adam(learning_rate=0.001)
optimizer = tfra.dynamic_embedding.DynamicEmbeddingOptimizer(optimizer)

def train_step(batch, model):
    with tf.GradientTape() as tape:
        loss = model(batch)
    grads = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(zip(grads, model.trainable_variables))
    return loss

def train(epoch=1):
    for i in range(epoch):
        total_loss = np.array([])
        for (_, batch) in enumerate(dataset_train):
            loss = train_step(batch, model)
            total_loss = np.append(total_loss, loss)
        print("epoch:", i, "mean_squared_error:", np.mean(total_loss))

train(1)
sunshinenum commented 1 year ago

When I use the shadow var version, I find that the time cost increases a lot. It changes from 2s to about 12s. But, shadow var really fixed the problem of embedding sharing.

Lifann commented 1 year ago

Update: This problem can be solved by symmetric encryption with optimal performance and little space cost, especially for running on GPU. I'll update it later.

MoFHeka commented 1 year ago

Has this issue been solved? May I close it?

sunshinenum commented 1 year ago

Has this issue been solved? May I close it?

yes, thanks

univerone commented 1 year ago

Update: This problem can be solved by symmetric encryption with optimal performance and little space cost, especially for running on GPU. I'll update it later.

Looking forward for any update

MoFHeka commented 11 months ago

Update: This problem can be solved by symmetric encryption with optimal performance and little space cost, especially for running on GPU. I'll update it later.

Looking forward for any update

@univerone You can encode different feature ID inputs into int64, for example:

# Use 46 bit for expressing ID, 17 bit for distinguish between different features, and 1 bit for sign bit.
fea_0_code = 11
fea_0_input = (fea_0_code << 47) + fea_0_id
fea_1_code = 22
fea_1_input = (fea_1_code << 47) + fea_1_id
# Then concat all features input to Embedding.
all_input = concat(fea_0_input, fea_1_input)
ebb(all_input)

TFRA合并查询

MoFHeka commented 11 months ago

This problem has been solved by merging the input with the tf.concat operator and then doing embedding lookup.