tensorflow / recommenders-addons

Additional utils and helpers to extend TensorFlow when build recommendation systems, contributed and maintained by SIG Recommenders.
Apache License 2.0
588 stars 132 forks source link

Can't run with ParameterServerStrategy #272

Closed songyaheng closed 1 year ago

songyaheng commented 2 years ago

System information

Describe the bug

ValueError: Trying to create optimizer slot variable under the scope for tf.distribute.Strategy (<tensorflow.python.distribute.parameter_server_strategy_v2.ParameterServerStrategyV2 object at 0x7ff12fdb35b0>), which is different from the scope used for the original variable (<tf.Variable 'TrainableWrapperHandle:0' shape=(0, 32) dtype=float32, numpy=array([], shape=(0, 32), dtype=float32)>). Make sure the slot variables are created under the same strategy scope. This may happen if you're restoring from a checkpoint outside the scope

A clear and concise description of what the bug is.

Code to reproduce the issue

with self._distribution_strategy_scope(): strategy = distribute_ctx.get_strategy() if not strategy.extended.variable_created_in_scope(var): raise ValueError( "Trying to create optimizer slot variable under the scope for " "tf.distribute.Strategy ({}), which is different from the scope " "used for the original variable ({}). Make sure the slot " "variables are created under the same strategy scope. This may " "happen if you're restoring from a checkpoint outside the scope" .format(strategy, var))

Provide a reproducible test case that is the bare minimum necessary to generate the problem.

Other info / logs

Include any logs or source code that would be helpful to diagnose the problem. If including tracebacks, please include the full traceback. Large logs and files should be attached.

songyaheng commented 2 years ago

with strategy.scope(): model = DualChannelsDeepModel(strategy, embedding_size, embedding_size, tf.keras.initializers.RandomNormal(0.0, 0.5)) optimizer = tf.keras.optimizers.Adam(1E-3) optimizer = de.DynamicEmbeddingOptimizer(optimizer)

auc = tf.keras.metrics.AUC(num_thresholds=1000)
model.compile(optimizer=optimizer,
              loss=tf.keras.losses.MeanSquaredError(),
              metrics=[
                  auc,
              ])

self.user_embedding = de.keras.layers.SquashedEmbedding( embedding_size=user_embedding_size, initializer=embedding_initializer, name='user_embedding', distribute_strategy=strategy, keep_distribution=True )

and it works!

rhdong commented 2 years ago

Hi @songyaheng, thank you for your feedback, I'm sorry, I'm a little confused about your issue, do you mean you resolved it by yourself? Could you paste the whole script? Thank you!

songyaheng commented 2 years ago

``

Hi @songyaheng, thank you for your feedback, I'm sorry, I'm a little confused about your issue, do you mean you resolved it by yourself? Could you paste the whole script? Thank you!

the whole scrip likes:

` import tensorflow as tf import tensorflow_datasets as tfds import argparse import os from tensorflow_recommenders_addons import dynamic_embedding as de

parser = argparse.ArgumentParser() parser.add_argument("--worker", type=str, help="like: localhost:2231,localhost:2232") parser.add_argument("--ps", type=str, help="like: localhost:2221,localhost:2222") parser.add_argument("--task_type", type=str, help="must in [ps, worker]") parser.add_argument("--task_index", type=int, help="like: 0")

arguments = parser.parse_args()

class DualChannelsDeepModel(tf.keras.Model): def init(self, strategy, user_embedding_size=1, movie_embedding_size=1, embedding_initializer=None): super(DualChannelsDeepModel, self).init() self.user_embedding_size = user_embedding_size self.movie_embedding_size = movie_embedding_size

    if embedding_initializer is None:
        embedding_initializer = tf.keras.initializers.Zeros()

    self.user_embedding = de.keras.layers.SquashedEmbedding(
        embedding_size=user_embedding_size,
        initializer=embedding_initializer,
        name='user_embedding',
        distribute_strategy=strategy
    )
    self.movie_embedding = de.keras.layers.SquashedEmbedding(
        embedding_size=movie_embedding_size,
        initializer=embedding_initializer,
        name='movie_embedding',
        distribute_strategy=strategy
    )

    self.dnn1 = tf.keras.layers.Dense(
        64,
        activation='relu',
        kernel_initializer=tf.keras.initializers.RandomNormal(0.0, 0.1),
        bias_initializer=tf.keras.initializers.RandomNormal(0.0, 0.1))
    self.dnn2 = tf.keras.layers.Dense(
        16,
        activation='relu',
        kernel_initializer=tf.keras.initializers.RandomNormal(0.0, 0.1),
        bias_initializer=tf.keras.initializers.RandomNormal(0.0, 0.1))
    self.dnn3 = tf.keras.layers.Dense(
        5,
        activation='softmax',
        kernel_initializer=tf.keras.initializers.RandomNormal(0.0, 0.1),
        bias_initializer=tf.keras.initializers.RandomNormal(0.0, 0.1))
    self.bias_net = tf.keras.layers.Dense(
        5,
        activation='softmax',
        kernel_initializer=tf.keras.initializers.RandomNormal(0.0, 0.1),
        bias_initializer=tf.keras.initializers.RandomNormal(0.0, 0.1))

def call(self, features):
    user_id = tf.reshape(features['user_id'], (-1, 1))
    movie_id = tf.reshape(features['movie_id'], (-1, 1))
    user_latent = self.user_embedding(user_id)
    movie_latent = self.movie_embedding(movie_id)
    latent = tf.concat([user_latent, movie_latent], axis=1)

    x = self.dnn1(latent)
    x = self.dnn2(x)
    x = self.dnn3(x)

    bias = self.bias_net(latent)
    x = 0.2 * x + 0.8 * bias
    return x

def get_dataset(batch_size=1): dataset = tfds.load('movielens/1m-ratings', split='train') features = dataset.map( lambda x: { "movie_id": tf.strings.to_number(x["movie_id"], tf.int64), "user_id": tf.strings.to_number(x["user_id"], tf.int64), }) ratings = dataset.map( lambda x: tf.one_hot(tf.cast(x['user_rating'] - 1, dtype=tf.int64), 5)) dataset = dataset.zip((features, ratings)) dataset = dataset.shuffle(4096, reshuffle_each_iteration=False) if batch_size > 1: dataset = dataset.batch(batch_size) return dataset

def create_strategy(args): ps_hosts = args.ps.split(",") worker_hosts = args.worker.split(",") env = { "cluster": {"ps": ps_hosts, "worker": worker_hosts}, "task": {"type": args.task_type, "index": args.task_index} } import json os.environ['TF_CONFIG'] = json.dumps(env) cluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver() os.environ["GRPC_FAIL_FAST"] = "use_caller" server = tf.distribute.Server( cluster_resolver.cluster_spec(), job_name=cluster_resolver.task_type, task_index=cluster_resolver.task_id, protocol=cluster_resolver.rpc_layer or "grpc", start=True) if cluster_resolver.task_type == "ps": print("parameter server starting...") server.join() else: print("worker training starting...") variable_partitioner = (tf.distribute.experimental.partitioners.MinSizePartitioner(min_shard_bytes=256 << 10, max_shards=len(ps_hosts))) return tf.distribute.experimental.ParameterServerStrategy(cluster_resolver, variable_partitioner=variable_partitioner)

strategy = create_strategy(args=arguments)

embedding_size = 32 epochs = 10 steps_per_epoch = 20000 model_dir = "./model" test_batch = 1024 test_steps = 128

with strategy.scope(): model = DualChannelsDeepModel(strategy, embedding_size, embedding_size, tf.keras.initializers.RandomNormal(0.0, 0.5)) optimizer = tf.keras.optimizers.Adam(1E-3) optimizer = de.DynamicEmbeddingOptimizer(optimizer)

auc = tf.keras.metrics.AUC(num_thresholds=1000)
model.compile(optimizer=optimizer,
              loss=tf.keras.losses.MeanSquaredError(),
              metrics=[
                  auc,
              ])

dataset = get_dataset(batch_size=32) model.fit(dataset, epochs=epochs, steps_per_epoch=steps_per_epoch, verbose=2)

`

and run scripts:

` python tf_dist_de_keras.py --ps="localhost:2220,localhost:2221" --worker="localhost:2230,localhost:2231" --task_type="ps" --task_index=0

python tf_dist_de_keras.py --ps="localhost:2220,localhost:2221" --worker="localhost:2230,localhost:2231" --task_type="ps" --task_index=1

python tf_dist_de_keras.py --ps="localhost:2220,localhost:2221" --worker="localhost:2230,localhost:2231" --task_type="worker" --task_index=0

python tf_dist_de_keras.py --ps="localhost:2220,localhost:2221" --worker="localhost:2230,localhost:2231" --task_type="worker" --task_index=1 `

rhdong commented 1 year ago

Hi @songyaheng , thank you for your feedback, this is a known issue for TFRA, and hard to be resolved in a short time, but we still can use the original API to build the distributed pipeline please refer to the demo https://github.com/tensorflow/recommenders-addons/tree/master/demo/dynamic_embedding/movielens-100k-estimator. BTW, in your message, there are some code out of the code block and have no alignment, so could you modify them, and I will try to reproduce the issue by your script. Thank you!

rhdong commented 1 year ago

Closed for being resolved.