tensorflow / recommenders

TensorFlow Recommenders is a library for building recommender system models using TensorFlow.
Apache License 2.0
1.82k stars 273 forks source link

InvalidArgumentError: Requires start <= limit when delta > 0 when trying to distribute training #274

Open dgoldenberg-audiomack opened 3 years ago

dgoldenberg-audiomack commented 3 years ago

Setup:

The TQDM progress bar never moves. Execution appears hanging, also, with progress bar code removed.

Things attempted:

Can I get any output out of TFRS at this point to tell what might be going wrong? Any other things to try?

Output on the command line:

2021-04-12 21:44:02.417762: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcudart.so.11.0
2021-04-12 21:44:03.666561: I tensorflow/compiler/jit/xla_cpu_device.cc:41] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-04-12 21:44:03.667485: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcuda.so.1
2021-04-12 21:44:03.739716: E tensorflow/stream_executor/cuda/cuda_driver.cc:328] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2021-04-12 21:44:03.739767: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:156] kernel driver does not appear to be running on this host (ip-10-2-249-213.awsinternal.audiomack.com): /proc/driver/nvidia/version does not exist
2021-04-12 21:44:03.740798: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-04-12 21:44:03.741015: I tensorflow/compiler/jit/xla_gpu_device.cc:99] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-04-12 21:44:03.741571: I tensorflow/compiler/jit/xla_gpu_device.cc:99] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-04-12 21:44:03.745589: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:301] Initialize GrpcChannelCache for job worker -> {0 -> 10.2.249.213:2121, 1 -> 10.2.252.56:2121, 2 -> 10.2.252.97:2121}
2021-04-12 21:44:03.745902: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:411] Started server with target: grpc://10.2.249.213:2121

>> 2021-04-12 16:44:07 : >> Running the prototype...

>> Initializing TfrsModelMaker...
>> items_path  : s3://my-bucket/recsys-tf/temp-data/20210411173323/items
>> users_path  : s3://my-bucket/recsys-tf/temp-data/20210411173323/users
>> events_path : s3://my-bucket/recsys-tf/temp-data/20210411173323/events
>> num_items   : 100
>> num_users   : 100
>> num_events  : 100

2021-04-12 21:44:08.693713: W tensorflow_io/core/kernels/audio_video_mp3_kernels.cc:271] libmp3lame.so.0 or lame functions are not available
2021-04-12 21:44:08.693900: I tensorflow_io/core/kernels/cpu_check.cc:128] Your CPU supports instructions that this TensorFlow IO binary was not compiled to use: AVX2 AVX512F FMA

>> Strategy: <tensorflow.python.distribute.collective_all_reduce_strategy.CollectiveAllReduceStrategy object at 0x7f8ab9d67690>
>> 2021-04-12 16:44:09 : >> Training the model...

2021-04-12 21:44:09.449931: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:116] None of the MLIR optimization passes are enabled (registered 2)
2021-04-12 21:44:09.467194: I tensorflow/core/platform/profile_utils/cpu_utils.cc:112] CPU Frequency: 2499995000 Hz
  0%|                                                                                                                            | 0/3 [00:00<?, ?epoch/s]
0.00batch [00:00, ?batch/s]
dgoldenberg-audiomack commented 3 years ago

The code is basically as below

class TfrsModel(tfrs.Model, ABC):
    def __init__(self, user_model, item_model, loss_task, cached_train_event_ds, cached_test_event_ds):
        super().__init__()
        self.item_model: tf.keras.Model = item_model
        self.user_model: tf.keras.Model = user_model
        self.task: tf.keras.layers.Layer = loss_task

        self.cached_train_event_ds = cached_train_event_ds
        self.cached_test_event_ds = cached_test_event_ds

    def compute_loss(self, features: Dict[Text, tf.Tensor], training=False) -> tf.Tensor:
        # We pick out the user features and pass them into the user model.
        user_embeddings = self.user_model(features["user_id"])
        # Pick out the item features and pass them into the item model, getting embeddings back.
        item_embeddings = self.item_model(features["item_id"])

        # The task computes the loss and the metrics.
        return self.task(user_embeddings, item_embeddings)

class TfrsModelMaker(object):
    def __init__(self, items_path, users_path, events_path, num_items, num_users, num_events):
        self.items_path = items_path
        self.users_path = users_path
        self.events_path = events_path
        self.num_items = num_items
        self.num_users = num_users
        self.num_events = num_events

        print()
        print(">> Initializing TfrsModelMaker...")
        print(">> items_path  : {}".format(items_path))
        print(">> users_path  : {}".format(users_path))
        print(">> events_path : {}".format(events_path))
        print(">> num_items   : {}".format(num_items))
        print(">> num_users   : {}".format(num_users))
        print(">> num_events  : {}".format(num_events))
        print()

        # Turn off the many Unverified HTTPS request warnings during file downloads.
        urllib3.disable_warnings()
        self.items_ds, self.events_ds = self._load_tf_datasets()
        self.test_events_ds, self.train_events_ds = self._prepare_data()

    def create_model(self):
        embedding_dimension = 32

        print(">> Strategy: {}".format(strategy))

        with strategy.scope():
            # tf.config.set_soft_device_placement(True)
            # print(">> Number of devices: {}".format(strategy.num_replicas_in_sync))

            user_ids_filepath = self.get_s3_filepaths(self.users_path, ".csv")[0]
            item_ids_filepath = self.get_s3_filepaths(self.items_path, ".csv")[0]

            # The query tower
            u_lookup = tf.keras.layers.experimental.preprocessing.IntegerLookup(vocabulary=user_ids_filepath)
            user_model = tf.keras.Sequential(
                [
                    u_lookup,
                    # We add an additional embedding to account for unknown tokens.
                    tf.keras.layers.Embedding(u_lookup.vocab_size() + 1, embedding_dimension),
                ]
            )

            # The candidate tower
            c_lookup = tf.keras.layers.experimental.preprocessing.IntegerLookup(vocabulary=item_ids_filepath)
            item_model = tf.keras.Sequential(
                [
                    c_lookup,
                    # We add an additional embedding to account for unknown tokens.
                    tf.keras.layers.Embedding(c_lookup.vocab_size() + 1, embedding_dimension),
                ]
            )

            # Metrics
            cands = self.items_ds.map(item_model)
            metrics = tfrs.metrics.FactorizedTopK(candidates=cands)

            # Loss
            task = tfrs.tasks.Retrieval(metrics=metrics)

            cached_train_event_ds = self.train_events_ds.batch(8192).cache()
            cached_test_event_ds = self.test_events_ds.batch(4096).cache()

            # per_replica_batch_size = 64
            # global_batch_size = per_replica_batch_size * strategy.num_replicas_in_sync
            #
            # print()
            # print(">> BATCH SIZE: {}".format(global_batch_size))
            # print()
            #
            # cached_train_event_ds = self.train_events_ds.batch(global_batch_size).cache()
            # cached_test_event_ds = self.test_events_ds.batch(global_batch_size).cache()

            model = TfrsModel(user_model, item_model, task, cached_train_event_ds, cached_test_event_ds)

            model.compile(optimizer=tf.keras.optimizers.Adagrad(learning_rate=0.1))
            # model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.1))

        return model

    @staticmethod
    def train_and_evaluate(model, num_epochs):
        print_with_date(">> Training the model...")

        # Train the model
        # model.fit(model.cached_train_event_ds, epochs=num_epochs)

        # turn off keras progress (verbose=0) and use tqdm instead. For the callback:
        # verbose=2 means separate progressbars for epochs and batches
        # 1 means clear batch bars when done
        # 0 means only show epochs (never show batch bars)
        model.fit(model.cached_train_event_ds, epochs=num_epochs, verbose=0, callbacks=[TqdmCallback(verbose=2)])

        print_with_date(">> Training of the model: done.")

        # Evaluate the model
        print_with_date(">> Evaluating the model...")
        eval_results = model.evaluate(model.cached_test_event_ds, return_dict=True)
        print_with_date(">> Evaluation of the model: done.")

        print()
        print(f">> Eval results (epochs={num_epochs}):")
        print(str(eval_results))
        print()
dgoldenberg-audiomack commented 3 years ago

Retrieval example appears to execute fine under the MultiWorkerMirroredStrategy

from typing import Dict, Text

import numpy as np
import tensorflow as tf
import tensorflow_datasets as tfds
import tensorflow_recommenders as tfrs
from tqdm.keras import TqdmCallback

# strategy = tf.distribute.get_strategy()
strategy = tf.distribute.MultiWorkerMirroredStrategy()

class MovielensModel(tfrs.Model):
    def __init__(self, user_model, movie_model, task):
        super().__init__()
        self.movie_model: tf.keras.Model = movie_model
        self.user_model: tf.keras.Model = user_model
        self.task: tf.keras.layers.Layer = task

    def compute_loss(self, features: Dict[Text, tf.Tensor], training=False) -> tf.Tensor:
        # We pick out the user features and pass them into the user model.
        user_embeddings = self.user_model(features["user_id"])
        # And pick out the movie features and pass them into the movie model,
        # getting embeddings back.
        positive_movie_embeddings = self.movie_model(features["movie_title"])

        # The task computes the loss and the metrics.
        return self.task(user_embeddings, positive_movie_embeddings)

# Ratings data.
ratings = tfds.load("movielens/100k-ratings", split="train")
# Features of all the available movies.
movies = tfds.load("movielens/100k-movies", split="train")

ratings = ratings.map(lambda x: {"movie_title": x["movie_title"], "user_id": x["user_id"],})
movies = movies.map(lambda x: x["movie_title"])

tf.random.set_seed(42)
shuffled = ratings.shuffle(100_000, seed=42, reshuffle_each_iteration=False)

train = shuffled.take(80_000)
test = shuffled.skip(80_000).take(20_000)

movie_titles = movies.batch(1_000)
user_ids = ratings.batch(1_000_000).map(lambda x: x["user_id"])

unique_movie_titles = np.unique(np.concatenate(list(movie_titles)))
unique_user_ids = np.unique(np.concatenate(list(user_ids)))

embedding_dimension = 32

with strategy.scope():
    user_model = tf.keras.Sequential(
        [
            tf.keras.layers.experimental.preprocessing.StringLookup(vocabulary=unique_user_ids, mask_token=None),
            # We add an additional embedding to account for unknown tokens.
            tf.keras.layers.Embedding(len(unique_user_ids) + 1, embedding_dimension),
        ]
    )

    movie_model = tf.keras.Sequential(
        [
            tf.keras.layers.experimental.preprocessing.StringLookup(vocabulary=unique_movie_titles, mask_token=None),
            tf.keras.layers.Embedding(len(unique_movie_titles) + 1, embedding_dimension),
        ]
    )

    metrics = tfrs.metrics.FactorizedTopK(candidates=movies.batch(128).map(movie_model))
    task = tfrs.tasks.Retrieval(metrics=metrics)

    model = MovielensModel(user_model, movie_model, task)
    model.compile(optimizer=tf.keras.optimizers.Adagrad(learning_rate=0.1))

options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.DATA

cached_train = train.shuffle(100_000).batch(8192).cache().with_options(options)
cached_test = test.batch(4096).cache().with_options(options)

print(">> Training...")
model.fit(cached_train, epochs=3, verbose=0, callbacks=[TqdmCallback(verbose=2)])

print(">> Evaluating...")
model.evaluate(cached_test, return_dict=True)

# Create a model that takes in raw query features, and
index = tfrs.layers.factorized_top_k.BruteForce(model.user_model)
# recommends movies out of the entire movies dataset.
index.index(movies.batch(100).map(model.movie_model), movies)

# Get recommendations.
_, titles = index(tf.constant(["42"]))
print(f"Recommendations for user 42: {titles[0, :3]}")
dgoldenberg-audiomack commented 3 years ago

Might this type of thing be related? Cannot tell https://stackoverflow.com/questions/59808666/stuck-in-the-first-epoch-when-training-the-cnn-lstm-using-keras

dgoldenberg-audiomack commented 3 years ago

Same hanging behavior with string ID's in the input data as it is with the integer ID's.

dgoldenberg-audiomack commented 3 years ago

Similar hanging behavior on a aws p3 machine when trying to use the MirroredStrategy.

Ran with the following:

model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.1))

due to #269 -- Adagrad for embeddings isn't implemented on GPUs.

2021-04-13 15:15:10.232864: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcudart.so.11.0
2021-04-13 15:15:11.767665: I tensorflow/compiler/jit/xla_cpu_device.cc:41] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-04-13 15:15:11.768680: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcuda.so.1
2021-04-13 15:15:11.820154: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:941] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2021-04-13 15:15:11.821137: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1720] Found device 0 with properties:
pciBusID: 0000:00:1e.0 name: Tesla V100-SXM2-16GB computeCapability: 7.0
coreClock: 1.53GHz coreCount: 80 deviceMemorySize: 15.78GiB deviceMemoryBandwidth: 836.37GiB/s
2021-04-13 15:15:11.821177: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcudart.so.11.0
2021-04-13 15:15:11.824313: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcublas.so.11
2021-04-13 15:15:11.824379: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcublasLt.so.11
2021-04-13 15:15:11.826228: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcufft.so.10
2021-04-13 15:15:11.826585: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcurand.so.10
2021-04-13 15:15:11.828709: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcusolver.so.10
2021-04-13 15:15:11.829523: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcusparse.so.11
2021-04-13 15:15:11.829728: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcudnn.so.8
2021-04-13 15:15:11.829840: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:941] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2021-04-13 15:15:11.830827: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:941] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2021-04-13 15:15:11.831748: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1862] Adding visible gpu devices: 0
2021-04-13 15:15:11.832142: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-04-13 15:15:11.832901: I tensorflow/compiler/jit/xla_gpu_device.cc:99] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-04-13 15:15:11.833008: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:941] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2021-04-13 15:15:11.833984: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1720] Found device 0 with properties:
pciBusID: 0000:00:1e.0 name: Tesla V100-SXM2-16GB computeCapability: 7.0
coreClock: 1.53GHz coreCount: 80 deviceMemorySize: 15.78GiB deviceMemoryBandwidth: 836.37GiB/s
2021-04-13 15:15:11.834023: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcudart.so.11.0
2021-04-13 15:15:11.834053: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcublas.so.11
2021-04-13 15:15:11.834080: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcublasLt.so.11
2021-04-13 15:15:11.834106: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcufft.so.10
2021-04-13 15:15:11.834127: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcurand.so.10
2021-04-13 15:15:11.834152: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcusolver.so.10
2021-04-13 15:15:11.834173: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcusparse.so.11
2021-04-13 15:15:11.834198: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcudnn.so.8
2021-04-13 15:15:11.834265: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:941] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2021-04-13 15:15:11.835228: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:941] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2021-04-13 15:15:11.836132: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1862] Adding visible gpu devices: 0
2021-04-13 15:15:11.836181: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcudart.so.11.0
2021-04-13 15:15:12.466481: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1261] Device interconnect StreamExecutor with strength 1 edge matrix:
2021-04-13 15:15:12.466530: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1267]      0
2021-04-13 15:15:12.466541: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1280] 0:   N
2021-04-13 15:15:12.466781: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:941] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2021-04-13 15:15:12.467759: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:941] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2021-04-13 15:15:12.468732: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:941] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2021-04-13 15:15:12.469717: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1406] Created TensorFlow device (/job:localhost/replica:0/task:0/device:GPU:0 with 14760 MB memory) -> physical GPU (device: 0, name: Tesla V100-SXM2-16GB, pci bus id: 0000:00:1e.0, compute capability: 7.0)

>> 2021-04-13 10:15:12 : >> Running the TFRS based song recommender...

>> Initializing TfrsModelMaker...
>> num_items   : 100
>> num_users   : 100
>> num_events  : 100

>> Loading TF datasets from S3...
>> --- Loading the ITEMS dataset from s3://my-bucket/20210411173323/items...
>> --- Loading the EVENTS dataset from s3://my-bucket/20210411173323/events...
>> Loading s3://my-bucket/20210411173323/events/events-20210413-2-100.snappy.parquet for events...

2021-04-13 15:15:13.307509: W tensorflow_io/core/kernels/audio_video_mp3_kernels.cc:271] libmp3lame.so.0 or lame functions are not available
2021-04-13 15:15:13.307729: I tensorflow_io/core/kernels/cpu_check.cc:128] Your CPU supports instructions that this TensorFlow IO binary was not compiled to use: AVX2 FMA

>> Strategy: <tensorflow.python.distribute.mirrored_strategy.MirroredStrategy object at 0x7fc4721962d0>
>> Number of devices: 1
>> GPU's: [PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU')]
>> 2021-04-13 10:15:14 : >> Training the model...
2021-04-13 15:15:14.420790: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:116] None of the MLIR optimization passes are enabled (registered 2)
2021-04-13 15:15:14.441513: I tensorflow/core/platform/profile_utils/cpu_utils.cc:112] CPU Frequency: 2300055000 Hz
Epoch 1/3
2021-04-13 15:15:18.372798: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcublas.so.11
2021-04-13 15:15:18.784890: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcublasLt.so.11
dgoldenberg-audiomack commented 3 years ago

Also hangs if I load events from CSV rather than parquet

dgoldenberg-audiomack commented 3 years ago

After a few hours, I do get an error, as below.

@maciejkula Could you provide any insight into what this might mean?

Epoch 1/3
Traceback (most recent call last):
  File "recsys_tfrs_songs_2.py", line 88, in <module>
    main(sys.argv)
  File "recsys_tfrs_songs_2.py", line 70, in main
    model_maker.train_and_evaluate(model, NUM_TRAIN_EPOCHS)
  File "/home/ec2-user/tfrs_proto/recommender_system/recsys_tf/recsys_tfrs_model.py", line 151, in train_and_evaluate
    model.fit(model.cached_train_event_ds, epochs=num_epochs)
  File "/home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/keras/engine/training.py", line 1100, in fit
    tmp_logs = self.train_function(iterator)
  File "/home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/eager/def_function.py", line 828, in __call__
    result = self._call(*args, **kwds)
  File "/home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/eager/def_function.py", line 888, in _call
    return self._stateless_fn(*args, **kwds)
  File "/home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/eager/function.py", line 2943, in __call__
    filtered_flat_args, captured_inputs=graph_function.captured_inputs)  # pylint: disable=protected-access
  File "/home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/eager/function.py", line 1919, in _call_flat
    ctx, args, cancellation_manager=cancellation_manager))
  File "/home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/eager/function.py", line 560, in call
    ctx=ctx)
  File "/home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/eager/execute.py", line 60, in quick_execute
    inputs, attrs, num_outputs)
tensorflow.python.framework.errors_impl.InvalidArgumentError:  Requires start <= limit when delta > 0: 2147483000/-2147483296
     [[{{node range}}]]
     [[retrieval/streaming/ReduceDataset]] [Op:__inference_train_function_1835]

Function call stack:
train_function

I see some mentions of this type of error in various TF tickets none of which offer a resolution:

https://github.com/tensorflow/tensorflow/issues/47934 - marked closed, with no resolution https://github.com/tensorflow/tensorflow/issues/47545 - open https://github.com/tensorflow/tensorflow/issues/45609 - marked closed, with no resolution https://github.com/tensorflow/tensorflow/issues/45019 - marked closed, with no resolution https://github.com/tensorflow/tensorflow/issues/44801 - marked closed. User comment: "Fixed this issue by switching to pytorch. Closing the issue since I have no interest in finding the solution anymore." https://github.com/tensorflow/tensorflow/issues/43604 - open https://github.com/tensorflow/tensorflow/issues/41474 - marked closed, with no resolution https://github.com/tensorflow/tensorflow/issues/35878 https://github.com/tensorflow/probability/issues/851 https://github.com/tensorflow/tensorflow/issues/45986 https://github.com/tensorflow/tensorflow/issues/6590

Per https://github.com/tensorflow/tensorflow/issues/35878, I've moved the .with_options(options) call on the train ds under the with strategy.scope(): but that didn't seem to make a difference.

maciejkula commented 3 years ago

Could you try a couple of things:

  1. Don't save your training/test datasets as attributes on the model.
  2. Remove the FactorizedTopK metric.

This will help me narrow down the problem.

dgoldenberg-audiomack commented 3 years ago

@maciejkula Sure, let me set this up, rerun and update the ticket.

dgoldenberg-audiomack commented 3 years ago
  1. Not saving the training/test datasets on the model.
  2. Removed the FactorizedTopK metric.
  3. strategy is set to tf.distribute.get_strategy()

CODE IS ATTACHED

Execution went further. However, at the end it seems hanging in the recs generation after a WARNING. Not sure if it'd be expected or not. Seems like something's off with the batch sizes maybe?

2021-04-20 19:57:17.392641: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcudart.so.11.0

>> 2021-04-20 14:57:18 : >> Running the TFRS based song recommender...

>> Initializing TfrsModelMaker...
>> items_path  : s3://my-bucket/items
>> users_path  : s3://my-bucket/users
>> events_path : s3://my-bucket/events
>> num_items   : 100
>> num_users   : 100
>> num_events  : 100

>> Loading TF datasets from S3...

2021-04-20 19:57:19.280966: I tensorflow/compiler/jit/xla_cpu_device.cc:41] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-04-20 19:57:19.281902: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcuda.so.1
2021-04-20 19:57:19.357132: E tensorflow/stream_executor/cuda/cuda_driver.cc:328] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2021-04-20 19:57:19.357185: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:156] kernel driver does not appear to be running on this host (ip-xx-x-xxx-xxx.awsinternal.xxxxxx): /proc/driver/nvidia/version does not exist
2021-04-20 19:57:19.357894: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-04-20 19:57:19.358160: I tensorflow/compiler/jit/xla_gpu_device.cc:99] Not creating XLA devices, tf_xla_enable_xla_devices not set
>> --- ITEMS dataset: loaded.
>> --- EVENTS dataset: loaded
>> Loading TF datasets from S3: done.
>> Preparing data...
>> Data preparation: done.
>> Strategy: <tensorflow.python.distribute.distribute_lib._DefaultDistributionStrategy object at 0x7f96c8508890>
>> Number of devices: 1
>> GPU's: []
>> 2021-04-20 14:57:19 : >> Training the model...
2021-04-20 19:57:19.917819: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:116] None of the MLIR optimization passes are enabled (registered 2)
2021-04-20 19:57:19.918174: I tensorflow/core/platform/profile_utils/cpu_utils.cc:112] CPU Frequency: 2499995000 Hz
Epoch 1/3
80/80 [==============================] - 1s 965us/step - loss: 16.1486 - regularization_loss: 0.0000e+00 - total_loss: 16.1486
Epoch 2/3
80/80 [==============================] - 0s 902us/step - loss: 1.2836 - regularization_loss: 0.0000e+00 - total_loss: 1.2836  
Epoch 3/3
80/80 [==============================] - 0s 877us/step - loss: 1.4034 - regularization_loss: 0.0000e+00 - total_loss: 1.4034
>> 2021-04-20 14:57:20 : >> Training of the model: done.
>> 2021-04-20 14:57:20 : >> Evaluating the model...
20/20 [==============================] - 0s 875us/step - loss: 1.5844 - regularization_loss: 0.0000e+00 - total_loss: 1.5844
>> 2021-04-20 14:57:20 : >> Evaluation of the model: done.

>> Eval results (epochs=3):
{'loss': 2.7725982666015625, 'regularization_loss': 0, 'total_loss': 2.7725982666015625}

>> Generating recs - 1...

>> Generating recs - 2...
WARNING:tensorflow:Model was constructed with shape (None,) for input KerasTensor(type_spec=TensorSpec(shape=(None,), dtype=tf.string, name='string_lookup_1_input'), name='string_lookup_1_input', description="created by layer 'string_lookup_1_input'"), but it was called on an input with incompatible shape (None, 1000).
maciejkula commented 3 years ago

Nice!

This last error looks more like a bug in the eval code, you seem to be passing inputs with the wrong shape?

Could you re-add the FactorizedTopK metric and see if that's the issue?

dgoldenberg-audiomack commented 3 years ago

looks more like a bug in the eval code, you seem to be passing inputs with the wrong shape?

Meaning? how can the shape be wrong? The 1000 is used here:

        items_ds = tf.data.experimental.make_csv_dataset(
            local_file_list, column_names=["item_id"], batch_size=1000, num_parallel_reads=50, sloppy=True,
        )

Should I not be setting the batch size there?

Here's the input data: tfrs_274_invalid_arg_requires_input-data-2021-04020.zip

maciejkula commented 3 years ago

All I know is that your input is (batch_size,) in training but (batch_size, 1000) in eval. Can you check if you accidentally batch it twice?

dgoldenberg-audiomack commented 3 years ago

The mismatch, I would think, is between these:

items_ds = tf.data.experimental.make_csv_dataset(
    local_file_list, column_names=["item_id"], batch_size=1000, num_parallel_reads=50, sloppy=True,
)

and

c_lookup = tf.keras.layers.experimental.preprocessing.StringLookup(
    vocabulary=item_ids_filepath, mask_token=None
)

That's why I'm wondering if items should be loaded from CSV with no batch size set? Or, is there a way to pass the same batch size into the String lookup?

There's also

index.index(items_ds.batch(100).map(model.item_model), items_ds)

Must this batch size be the same, 1K? does it not matter?

Could you re-add the FactorizedTopK metric and see if that's the issue?

I've re-added it and now it's just "stuck" again:

>> 2021-04-20 15:16:08 : >> Training the model...
2021-04-20 20:16:08.982627: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:116] None of the MLIR optimization passes are enabled (registered 2)
2021-04-20 20:16:08.982979: I tensorflow/core/platform/profile_utils/cpu_utils.cc:112] CPU Frequency: 2499995000 Hz
Epoch 1/3
dgoldenberg-audiomack commented 3 years ago

OK so batch_size must be set in make_csv_dataset, otherwise:

TypeError: make_csv_dataset_v2() missing 1 required positional argument: 'batch_size'

If that's the case, how can I propagate this batch size into StringLookup? it doesn't appear to have such a parameter.

maciejkula commented 3 years ago

You're batching item_ds twice, first in make_csv_dataset then in items_ds.batch.

After first batch it becomes (1000,), after second it becomes (100, 1000). If you have another one it would become (new_batch_size, 100, 1000).

Does that make it clearer? Remove item_ds.batch(100) and it should work.

dgoldenberg-audiomack commented 3 years ago

The warning seemed to refer to the StringLookup, but sure, removed the item_ds.batch(100). Execution is now sitting at the following:

>> 2021-04-20 15:38:31 : >> Training the model...
2021-04-20 20:38:31.103668: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:116] None of the MLIR optimization passes are enabled (registered 2)
2021-04-20 20:38:31.104020: I tensorflow/core/platform/profile_utils/cpu_utils.cc:112] CPU Frequency: 2499995000 Hz
Epoch 1/3
maciejkula commented 3 years ago

Barring other evidence I'd say that it's "stuck" just because the factorized evaluation is slow. Try disabling it in training and only running in test (pass compute_metrics=not training to your task).

dgoldenberg-audiomack commented 3 years ago

I've added that piece of logic:

    def compute_loss(self, features: Dict[Text, tf.Tensor], training=False) -> tf.Tensor:
        .................
        # The task computes the loss and the metrics.
        return self.task(user_embeddings, item_embeddings, compute_metrics=not training)

Can't tell exactly how many hours later (quite a few), I still got the error that is subject of this issue:

>> Strategy: <tensorflow.python.distribute.distribute_lib._DefaultDistributionStrategy object at 0x7f2018f23b10>
>> Number of devices: 1
>> GPU's: []
>> 2021-04-20 16:05:51 : >> Training the model...
2021-04-20 21:05:51.642265: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:116] None of the MLIR optimization passes are enabled (registered 2)
2021-04-20 21:05:51.642600: I tensorflow/core/platform/profile_utils/cpu_utils.cc:112] CPU Frequency: 2499995000 Hz
Epoch 1/3
81/81 [==============================] - 1s 1ms/step - factorized_top_k/top_1_categorical_accuracy: 0.0000e+00 - factorized_top_k/top_5_categorical_accuracy: 0.0000e+00 - factorized_top_k/top_10_categorical_accuracy: 0.0000e+00 - factorized_top_k/top_50_categorical_accuracy: 0.0000e+00 - factorized_top_k/top_100_categorical_accuracy: 0.0000e+00 - loss: 16.0800 - regularization_loss: 0.0000e+00 - total_loss: 16.0800
Epoch 2/3
81/81 [==============================] - 0s 1ms/step - factorized_top_k/top_1_categorical_accuracy: 0.0000e+00 - factorized_top_k/top_5_categorical_accuracy: 0.0000e+00 - factorized_top_k/top_10_categorical_accuracy: 0.0000e+00 - factorized_top_k/top_50_categorical_accuracy: 0.0000e+00 - factorized_top_k/top_100_categorical_accuracy: 0.0000e+00 - loss: 1.6088 - regularization_loss: 0.0000e+00 - total_loss: 1.6088
Epoch 3/3
81/81 [==============================] - 0s 1ms/step - factorized_top_k/top_1_categorical_accuracy: 0.0000e+00 - factorized_top_k/top_5_categorical_accuracy: 0.0000e+00 - factorized_top_k/top_10_categorical_accuracy: 0.0000e+00 - factorized_top_k/top_50_categorical_accuracy: 0.0000e+00 - factorized_top_k/top_100_categorical_accuracy: 0.0000e+00 - loss: 1.4377 - regularization_loss: 0.0000e+00 - total_loss: 1.4377    
>> 2021-04-20 16:05:52 : >> Training of the model: done.
>> 2021-04-20 16:05:52 : >> Evaluating the model...
Traceback (most recent call last):
  File "recsys_tfrs_songs_2.py", line 87, in <module>
    main(sys.argv)
  File "recsys_tfrs_songs_2.py", line 69, in main
    model_maker.train_and_evaluate(model, NUM_TRAIN_EPOCHS, train_events_ds, test_events_ds)
  File "/home/ec2-user/tfrs_proto/recommender_system/recsys_tf/recsys_tfrs_model.py", line 160, in train_and_evaluate
    eval_results = model.evaluate(test_events_ds, return_dict=True)
  File "/home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/keras/engine/training.py", line 1389, in evaluate
    tmp_logs = self.test_function(iterator)
  File "/home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/eager/def_function.py", line 828, in __call__
    result = self._call(*args, **kwds)
  File "/home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/eager/def_function.py", line 895, in _call
    filtered_flat_args, self._concrete_stateful_fn.captured_inputs)  # pylint: disable=protected-access
  File "/home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/eager/function.py", line 1919, in _call_flat
    ctx, args, cancellation_manager=cancellation_manager))
  File "/home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/eager/function.py", line 560, in call
    ctx=ctx)
  File "/home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/eager/execute.py", line 60, in quick_execute
    inputs, attrs, num_outputs)
tensorflow.python.framework.errors_impl.InvalidArgumentError:  Requires start <= limit when delta > 0: 2147483000/-2147483296
     [[{{node range}}]]
     [[retrieval/streaming/ReduceDataset]] [Op:__inference_test_function_3177]

Function call stack:
test_function
maciejkula commented 3 years ago

It looks like by default make_csv_dataset creates a dataset that infinitely repeats the input data - or, in this case, it repeats it until its internal iteration counter overflows.

Can you try passing num_epochs=1 to it?

dgoldenberg-audiomack commented 3 years ago

Seriously??? :) wow... that sounds like a bug there. How could it possibly work for anyone? unless they all set the num epochs. Which is just, odd.

    num_epochs: An int specifying the number of times this dataset is repeated.
      If None, cycles through the dataset forever.

wow..

OK, will try your suggestion and update.

dgoldenberg-audiomack commented 3 years ago

Wow, this was definitely the problem. @maciejkula Would you agree that this seems like a bug worth filing into TF IO? I would think the default should be 1, not None?

Results from the run which actually completed, and zippy quickly:

Epoch 1/3
81/81 [==============================] - 1s 1ms/step - factorized_top_k/top_1_categorical_accuracy: 0.0000e+00 - factorized_top_k/top_5_categorical_accuracy: 0.0000e+00 - factorized_top_k/top_10_categorical_accuracy: 0.0000e+00 - factorized_top_k/top_50_categorical_accuracy: 0.0000e+00 - factorized_top_k/top_100_categorical_accuracy: 0.0000e+00 - loss: 15.7745 - regularization_loss: 0.0000e+00 - total_loss: 15.7745
Epoch 2/3
81/81 [==============================] - 0s 1ms/step - factorized_top_k/top_1_categorical_accuracy: 0.0000e+00 - factorized_top_k/top_5_categorical_accuracy: 0.0000e+00 - factorized_top_k/top_10_categorical_accuracy: 0.0000e+00 - factorized_top_k/top_50_categorical_accuracy: 0.0000e+00 - factorized_top_k/top_100_categorical_accuracy: 0.0000e+00 - loss: 1.3863 - regularization_loss: 0.0000e+00 - total_loss: 1.3863
Epoch 3/3
81/81 [==============================] - 0s 997us/step - factorized_top_k/top_1_categorical_accuracy: 0.0000e+00 - factorized_top_k/top_5_categorical_accuracy: 0.0000e+00 - factorized_top_k/top_10_categorical_accuracy: 0.0000e+00 - factorized_top_k/top_50_categorical_accuracy: 0.0000e+00 - factorized_top_k/top_100_categorical_accuracy: 0.0000e+00 - loss: 1.1638 - regularization_loss: 0.0000e+00 - total_loss: 1.1638
>> 2021-04-21 10:44:51 : >> Training of the model: done.
>> 2021-04-21 10:44:51 : >> Evaluating the model...
20/20 [==============================] - 1s 32ms/step - factorized_top_k/top_1_categorical_accuracy: 0.6800 - factorized_top_k/top_5_categorical_accuracy: 1.0000 - factorized_top_k/top_10_categorical_accuracy: 1.0000 - factorized_top_k/top_50_categorical_accuracy: 1.0000 - factorized_top_k/top_100_categorical_accuracy: 1.0000 - loss: 1.2543 - regularization_loss: 0.0000e+00 - total_loss: 1.2543
>> 2021-04-21 10:44:52 : >> Evaluation of the model: done.

>> Eval results (epochs=3):
{'factorized_top_k/top_1_categorical_accuracy': 0.6800000071525574, 'factorized_top_k/top_5_categorical_accuracy': 1.0, 'factorized_top_k/top_10_categorical_accuracy': 1.0, 'factorized_top_k/top_50_categorical_accuracy': 1.0, 'factorized_top_k/top_100_categorical_accuracy': 1.0, 'loss': 1.3863075971603394, 'regularization_loss': 0, 'total_loss': 1.3863075971603394}

Would I need to increase the amount of input data to see accuracy fluctuate more (as opposed to being at 0) so I can arrive at a better number of epochs to train for? I presume the usual tactic of trying to locate a spot where accuracy is highest and loss is smallest?

maciejkula commented 3 years ago

I agree with you, this looks like a terrible default that goes against the compositional nature of tf.data APIs (where you'd normally stick a .repeat() at the end if you want an infinite dataset). Please open a bug linking this one.

Now that we've resolved the problem you can run eval in training again, that might help you gauge the convergence curves.

dgoldenberg-audiomack commented 3 years ago

Re: open an issue -- will do.

I need to understand how to make a sensible thing out of this. Does it make sense to load 1 GB of events? say I've got 10 mln users and 500K items. I need to figure out what cluster setup would be fastest and scale the most. Any recommendation on the approach? I'd like to cut down on the number of experimentations...

maciejkula commented 3 years ago

In general TensorFlow datasets scale up to huge amounts of data. Have you seen this page?

The default for tf.data is to stream from disk, and this should scale to terabyte+ datasets quite easily.

In terms of cluster size more workers should give you a speed-up, up to quite a few workers; for very large clusters, the ParameterServerStrategy is probably the way to go.

dgoldenberg-audiomack commented 3 years ago

Thanks, @maciejkula . We could probably close this issue and continue discussing in #201. Thanks for the link, I'll noodle that info.

The default for tf.data is to stream from disk, and this should scale to terabyte+ datasets quite easily.

What about scaling from other file systems? One clear example being S3. We want to stream Parquet data from there w/o having to download its massive amounts onto the TF worker machines.

maciejkula commented 3 years ago

It looks like tf.data works with S3 out of the box: link.

Not sure if it works with Parquet, but worth trying!

dgoldenberg-audiomack commented 3 years ago

@maciejkula Hi, I have filed an issue in TF IO to address the num_epochs default in make_csv_dataset, as discussed here.

Correction: the issue is this one, in core TF.

dgoldenberg-audiomack commented 3 years ago

Hi @maciejkula , I can file a separate issue for this and it's prob more generic TF one but was hoping to get your recommendations (no pun intended :))

I'm just trying to load a couple of weeks' worth of events into the model, for now just on a single box (m5.2xlarge in AWS, with 32 gig of memory). I keep getting out-of-memory errors despite adding the below code to curb the memory consumption.

I will try to distribute this next but, any suggestions as to how to curtail memory generally? I would think TF should be smart enough to keep the process within the bounds of prlimit :( Tried running with this and without.

I would think execution would just be lengthy but an OOM would not happen.

    lim = resource.getrlimit(resource.RLIMIT_AS)
    print(">> INITIAL LIM: " + str(lim))
    resource.setrlimit(resource.RLIMIT_AS, (num_bytes, num_bytes))
    lim = resource.getrlimit(resource.RLIMIT_AS)
    print(">> RESET LIM: " + str(lim))

Error

Epoch 1/3
2021-05-12 15:27:37.334298: W tensorflow/core/framework/cpu_allocator_impl.cc:80] Allocation of 5274514432 exceeds 10% of free system memory.
2021-05-12 15:27:39.223502: W tensorflow/core/framework/cpu_allocator_impl.cc:80] Allocation of 5274514432 exceeds 10% of free system memory.
2021-05-12 15:27:39.820697: W tensorflow/core/framework/cpu_allocator_impl.cc:80] Allocation of 5274514432 exceeds 10% of free system memory.
2021-05-12 15:27:41.147965: W tensorflow/core/framework/cpu_allocator_impl.cc:80] Allocation of 5274514432 exceeds 10% of free system memory.
2021-05-12 15:27:41.656255: W tensorflow/core/framework/cpu_allocator_impl.cc:80] Allocation of 5274514432 exceeds 10% of free system memory.
      1/Unknown - 12s 12s/step - factorized_top_k/top_1_categorical_accuracy: 0.0000e+00 - factorized_top_k/top_5_categorical_accuracy: 0.0000e+00 - factorized_top_k/top_10_categorical_accuracy: 0.0000e+00 - factorized_top_k/top_50_categorical_accuracy: 0.0000e+00 - factorized_top_k/top_100_categorical_accuracy: 0.0000e+00 - loss: 460.4915 - regularization_loss: 0.0000e+00 - total_loss: 460.49152021-05-12 15:27:53.015008: W tensorflow/core/framework/op_kernel.cc:1763] OP_REQUIRES failed at resource_variable_ops.cc:410 : Resource exhausted: OOM when allocating tensor with shape[41207144,32] and type float on /job:localhost/replica:0/task:0/device:CPU:0 by allocator cpu
Traceback (most recent call last):
  File "recsys_tfrs_songs_2.py", line 95, in <module>
    main(sys.argv)
  File "recsys_tfrs_songs_2.py", line 77, in main
    model_maker.train_and_evaluate(model, NUM_TRAIN_EPOCHS, train_events_ds, test_events_ds)
  File "/home/ec2-user/tfrs_proto/recommender_system/recsys_tf/recsys_tfrs_model.py", line 153, in train_and_evaluate
    model.fit(train_events_ds, epochs=num_epochs)
  File "/home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/keras/engine/training.py", line 1100, in fit
    tmp_logs = self.train_function(iterator)
  File "/home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/eager/def_function.py", line 828, in __call__
    result = self._call(*args, **kwds)
  File "/home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/eager/def_function.py", line 855, in _call
    return self._stateless_fn(*args, **kwds)  # pylint: disable=not-callable
  File "/home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/eager/function.py", line 2943, in __call__
    filtered_flat_args, captured_inputs=graph_function.captured_inputs)  # pylint: disable=protected-access
  File "/home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/eager/function.py", line 1919, in _call_flat
    ctx, args, cancellation_manager=cancellation_manager))
  File "/home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/eager/function.py", line 560, in call
    ctx=ctx)
  File "/home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/eager/execute.py", line 60, in quick_execute
    inputs, attrs, num_outputs)
tensorflow.python.framework.errors_impl.ResourceExhaustedError:  OOM when allocating tensor with shape[41207144,32] and type float on /job:localhost/replica:0/task:0/device:CPU:0 by allocator cpu
     [[node Adam/Adam/update_1/AssignVariableOp (defined at /home/ec2-user/.local/lib/python3.7/site-packages/tensorflow_recommenders/models/base.py:76) ]]
Hint: If you want to see a list of allocated tensors when OOM happens, add report_tensor_allocations_upon_oom to RunOptions for current allocation info.
 [Op:__inference_train_function_27330]
maciejkula commented 3 years ago

The tf.data pipelines are designed to stream data from disk, rather than load everything in one go - your memory usage should not depend on the size of your data, but just on the size of your model.

How big is your model? What is your input pipeline?

dgoldenberg-audiomack commented 3 years ago

@maciejkula I wonder if memory usage is indeed affected in some ways by the input data amounts because I'm not seeing an OOM error when experimenting with "tiny" datasets. Now I'm loading larger datasets and it's running out of memory. I suspect the model may be significantly larger, with the larger datasets at play. (I may be wrong)

How big is your model?

That raises the question, how can I tell? If I try to do a model.save(), that has to happen after a model.fit() call but fit doesn't complete due to the OOM. And tf.size() only works with tensors, not models.

What is your input pipeline?

It's really nothing complicated right now; you've seen it but I'm attaching the code again. Basically it loads items from CSV and events from Parquet out of S3. Most of this code is similar to the sample code in TFRS.

maciejkula commented 3 years ago

The usual way would be to count the number of parameters. You can run sum([tf.size(x).numpy() for x in model.variables]) to get that number (once you've run at least one batch of inputs through your model).

Is it possible that your embedding layers are becoming huge with more data?

When sharing model code, can you put it in a repo or a Gist? Downloading zip files is a no-go.

dgoldenberg-audiomack commented 3 years ago

count the number of parameters... once you've run at least one batch of inputs through your model

How would I do that? There is just one call to .fit(). Where would the parameter counting be wired in?

Is it possible that your embedding layers are becoming huge with more data?

How could I keep embedding layers from "becoming huge", if that's something we need to curtail? Feeding smallish amounts of data is a no go; this needs to scale, with features of millions of users and millions of items. There are also a lot of events to feed; the thinking is to feed a couple of months' worth of events which starts reflecting users' patterns of interacting with the items.

Right now, I'm just trying to feed a couple of weeks' worth of events. A "tiny" run with a few dozen input elements had succeeded but it's not giving me a model to use.

When sharing model code, can you put it in a repo or a Gist?

Done. Here it is: https://github.com/dgoldenberg-audiomack/tfrs_issue_274

maciejkula commented 3 years ago

You can call fit with a single batch of data. Then count the parameters via the snippet I shared above.

dgoldenberg-audiomack commented 3 years ago

It's not working. Even with a single batch of the train events I'm getting the OOM.

        # Train the model
        model.fit(train_events_ds.take(1), epochs=num_epochs)

        sz = sum([tf.size(x).numpy() for x in model.variables])
        print(">> SZ: " + str(sz))

Error

ResourceExhaustedError: OOM when allocating tensor with shape[41207144,32] and type float on /job:localhost/replica:0/task:0/device:CPU:0 by allocator cpu [Op:Mul]

It's not liking the users dataset size. Num users is 41,207,141. But that's how many users I need to process.

Is there a limit as to how many users the framework can process? Having to batch this up and train the model on N such batches would create a significant complication ...

maciejkula commented 3 years ago

This is a common problem. There are a couple of solutions for this:

  1. Do not fit one embedding vector per user. Instead, try to parameterize users in terms of their characteristics (such as metadata, past interactions etc). Session-based recommenders are a common pattern for this. This approach completely eliminates the user embedding, as well as having secondary advantages in their ability to make recommendations to new users without retraining.
  2. Hashing. Only include top X million user ids in your vocabular; hash the rest using the hashing trick.
  3. Get a bigger machine.

There are of course limits to how big your model can get, but we use TensorFlow/TFRS at Google and we found it scales quite well to the types of problems we have.

dgoldenberg-audiomack commented 3 years ago

Session-based recommenders

Perhaps something to look into at a later stage. I just want to get TFRS up and running.

Hashing

So you're referring to the feature hashing... Will give it a shot though would be good to just get the basic flow going, with vocabs.

Get a bigger machine.

I think I'll keep plugging away at that option first. Does it make sense to plug in a distributed strategy yet or can I "feel out" the right machine size on a single box first? Or, is it clear that for 41 mln users, I absolutely do need to switch over to hashing and probably get a bigger machine?

maciejkula commented 3 years ago

I think you have to figure out what works best for you. Please let me know if you find any bugs in the library, though!

dgoldenberg-audiomack commented 3 years ago

I'm fishing for ways to cut down on trial-and-error. If we can identify easy patterns to follow while avoiding pitfalls, it'll help potentially many users. It's taken me quite a few weeks just to get to this point and I still don't have a usable model.

I think the idea is usability. Being able to for the most part do a plug-and-play. As it stands, with Tensorflow, one has to dig through a huge pile of code and doc and still do a huge number of trial-and-error attempts and then ponder the cryptic stack traces. IMHO, that's a usability issue.

dgoldenberg-audiomack commented 3 years ago

@maciejkula Case in point, on a larger box, the program ran for a few hours then just yielded the below. I'm wondering if it's the issue I'd reported before but still. Cryptic errors, one after another.

2021-05-13 17:39:19.463075: W tensorflow/core/framework/op_kernel.cc:1763] OP_REQUIRES failed at io_kernel.h:78 : Invalid argument: null value in column: user_id
Traceback (most recent call last):
  File "recsys_tfrs_songs_2.py", line 95, in <module>
    main(sys.argv)
  File "recsys_tfrs_songs_2.py", line 77, in main
    model_maker.train_and_evaluate(model, NUM_TRAIN_EPOCHS, train_events_ds, test_events_ds)
  File "/home/ec2-user/tfrs_proto/recommender_system/recsys_tf/recsys_tfrs_model.py", line 153, in train_and_evaluate
    model.fit(train_events_ds, epochs=num_epochs)
  File "/home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/keras/engine/training.py", line 1100, in fit
    tmp_logs = self.train_function(iterator)
  File "/home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/eager/def_function.py", line 828, in __call__
    result = self._call(*args, **kwds)
  File "/home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/eager/def_function.py", line 855, in _call
    return self._stateless_fn(*args, **kwds)  # pylint: disable=not-callable
  File "/home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/eager/function.py", line 2943, in __call__
    filtered_flat_args, captured_inputs=graph_function.captured_inputs)  # pylint: disable=protected-access
  File "/home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/eager/function.py", line 1919, in _call_flat
    ctx, args, cancellation_manager=cancellation_manager))
  File "/home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/eager/function.py", line 560, in call
    ctx=ctx)
  File "/home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/eager/execute.py", line 60, in quick_execute
    inputs, attrs, num_outputs)
tensorflow.python.framework.errors_impl.InvalidArgumentError:  null value in column: item_id
     [[{{node IO>ParquetReadableRead}}]]
     [[IteratorGetNext]] [Op:__inference_train_function_27330]

Function call stack:
train_function
maciejkula commented 3 years ago

This does seem to be the same problem. I'm sorry you're running into this, it does look like a pain.

dgoldenberg-audiomack commented 3 years ago

There are no nulls in the data but there was an issue in tf-io where they were treating things as null when they weren't (see here).

I'm going to try with tf-io nightly b/c it's not clear when the fix was/will be merged to mainline, what release.

All failing, will have to resort to flipping back to working with csv rather than parquet.

dgoldenberg-audiomack commented 3 years ago

@maciejkula I believe it was the same problem with parquet, or similar.

I'm now doing this kind of bootstrapping 'dance' to get a narrow path to being able to run:

pip3 install --user tensorflow_recommenders==v0.4.0
# After tfrs o/w tfrs forces tf 2.4.0; we need 2.4.1 however, for the distributed strategy classes
pip3 install --user tensorflow==2.4.1

# This rakes in TF 2.5.0
# pip3 install --user tensorflow-io==0.18.0
# pip3 install --user tensorflow-io-nightly

# https://github.com/tensorflow/io/issues/1254
pip3 install tensorflow-io-nightly==0.17.0.dev20210208174016

But with that, setting the strategy to be MultiWorkerMirroredStrategy, I now get a cryptic error as below which doesn't seem to show up in TF issues or in google search results much.

I've placed the gist of my code into this repo here: https://github.com/dgoldenberg-audiomack/tfrs_issue_274_2.

Any ideas?

>> 2021-05-15 11:55:12 : >> Training the model...
2021-05-15 16:55:15.298818: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:116] None of the MLIR optimization passes are enabled (registered 2)
2021-05-15 16:55:15.318021: I tensorflow/core/platform/profile_utils/cpu_utils.cc:112] CPU Frequency: 2499995000 Hz
Epoch 1/3
2021-05-15 16:55:31.783213: W tensorflow/core/framework/op_kernel.cc:1763] OP_REQUIRES failed at collective_ops.cc:454 : Internal: RecvBufResponse returned 0 bytes where to_tensor expected 5274514432
2021-05-15 16:55:31.783299: W tensorflow/core/common_runtime/base_collective_executor.cc:223] BaseCollectiveExecutor already aborted, ignoring StartAbort: Internal: RecvBufResponse returned 0 bytes where to_tensor expected 5274514432
2021-05-15 16:55:33.935240: W tensorflow/core/common_runtime/base_collective_executor.cc:223] BaseCollectiveExecutor already aborted, ignoring StartAbort: Internal: [_Derived_]Collective ops is aborted by: RecvBufResponse returned 0 bytes where to_tensor expected 5274514432
The error could be from a previous operation. Restart your program to reset.
Traceback (most recent call last):
  File "recsys_tfrs_songs_2.py", line 89, in <module>
    main(sys.argv)
  File "recsys_tfrs_songs_2.py", line 71, in main
    model_maker.train_and_evaluate(model, NUM_TRAIN_EPOCHS, train_events_ds, test_events_ds)
  File "/home/ec2-user/tfrs_proto/recommender_system/recsys_tf/recsys_tfrs_model.py", line 154, in train_and_evaluate
    model.fit(train_events_ds, epochs=num_epochs)
  File "/home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/keras/engine/training.py", line 1100, in fit
    tmp_logs = self.train_function(iterator)
  File "/home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/eager/def_function.py", line 828, in __call__
    result = self._call(*args, **kwds)
  File "/home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/eager/def_function.py", line 871, in _call
    self._initialize(args, kwds, add_initializers_to=initializers)
  File "/home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/eager/def_function.py", line 726, in _initialize
    *args, **kwds))
  File "/home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/eager/function.py", line 2969, in _get_concrete_function_internal_garbage_collected
    graph_function, _ = self._maybe_define_function(args, kwargs)
  File "/home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/eager/function.py", line 3361, in _maybe_define_function
    graph_function = self._create_graph_function(args, kwargs)
  File "/home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/eager/function.py", line 3206, in _create_graph_function
    capture_by_value=self._capture_by_value),
  File "/home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/framework/func_graph.py", line 990, in func_graph_from_py_func
    func_outputs = python_func(*func_args, **func_kwargs)
  File "/home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/eager/def_function.py", line 634, in wrapped_fn
    out = weak_wrapped_fn().__wrapped__(*args, **kwds)
  File "/home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/framework/func_graph.py", line 977, in wrapper
    raise e.ag_error_metadata.to_exception(e)
tensorflow.python.framework.errors_impl.InternalError: in user code:

    /home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/keras/engine/training.py:805 train_function  *
        return step_function(self, iterator)
    /home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/keras/engine/training.py:795 step_function  **
        outputs = model.distribute_strategy.run(run_step, args=(data,))
    /home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/distribute/distribute_lib.py:1259 run
        return self._extended.call_for_each_replica(fn, args=args, kwargs=kwargs)
    /home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/distribute/distribute_lib.py:2730 call_for_each_replica
        return self._call_for_each_replica(fn, args, kwargs)
    /home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/distribute/mirrored_strategy.py:629 _call_for_each_replica
        self._container_strategy(), fn, args, kwargs)
    /home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/distribute/mirrored_run.py:93 call_for_each_replica
        return _call_for_each_replica(strategy, fn, args, kwargs)
    /home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/distribute/mirrored_run.py:234 _call_for_each_replica
        coord.join(threads)
    /home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/training/coordinator.py:389 join
        six.reraise(*self._exc_info_to_raise)
    /usr/local/lib/python3.7/site-packages/six.py:703 reraise
        raise value
    /home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/training/coordinator.py:297 stop_on_exception
        yield
    /home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/distribute/mirrored_run.py:323 run
        self.main_result = self.main_fn(*self.main_args, **self.main_kwargs)
    /home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/keras/engine/training.py:788 run_step  **
        outputs = model.train_step(data)
    /home/ec2-user/.local/lib/python3.7/site-packages/tensorflow_recommenders/models/base.py:68 train_step
        loss = self.compute_loss(inputs, training=True)
    /home/ec2-user/tfrs_proto/recommender_system/recsys_tf/recsys_tfrs_model.py:40 compute_loss
        user_embeddings = self.user_model(features["user_id"])
    /home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/keras/engine/base_layer.py:1012 __call__
        outputs = call_fn(inputs, *args, **kwargs)
    /home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/keras/engine/sequential.py:389 call
        outputs = layer(inputs, **kwargs)
    /home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/keras/engine/base_layer.py:1008 __call__
        self._maybe_build(inputs)
    /home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/keras/engine/base_layer.py:2710 _maybe_build
        self.build(input_shapes)  # pylint:disable=not-callable
    /home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/keras/utils/tf_utils.py:272 wrapper
        output_shape = fn(instance, input_shape)
    /home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/keras/layers/embeddings.py:156 build
        experimental_autocast=False)
    /home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/keras/engine/base_layer.py:639 add_weight
        caching_device=caching_device)
    /home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/training/tracking/base.py:810 _add_variable_with_custom_getter
        **kwargs_for_getter)
    /home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/keras/engine/base_layer_utils.py:142 make_variable
        shape=variable_shape if variable_shape else None)
    /home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/ops/variables.py:260 __call__
        return cls._variable_v1_call(*args, **kwargs)
    /home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/ops/variables.py:221 _variable_v1_call
        shape=shape)
    /home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/ops/variables.py:67 getter
        return captured_getter(captured_previous, **kwargs)
    /home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/distribute/shared_variable_creator.py:69 create_new_variable
        v = next_creator(**kwargs)
    /home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/ops/variables.py:67 getter
        return captured_getter(captured_previous, **kwargs)
    /home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/distribute/distribute_lib.py:2083 creator_with_resource_vars
        created = self._create_variable(next_creator, **kwargs)
    /home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/distribute/mirrored_strategy.py:489 _create_variable
        distribute_utils.VARIABLE_POLICY_MAPPING, **kwargs)
    /home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/distribute/distribute_utils.py:311 create_mirrored_variable
        value_list = real_mirrored_creator(**kwargs)
    /home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/distribute/mirrored_strategy.py:481 _real_mirrored_creator
        v = next_creator(**kwargs)
    /home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/ops/variables.py:67 getter
        return captured_getter(captured_previous, **kwargs)
    /home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/eager/def_function.py:714 variable_capturing_scope
        lifted_initializer_graph=lifted_initializer_graph, **kwds)
    /home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/ops/variables.py:264 __call__
        return super(VariableMetaclass, cls).__call__(*args, **kwargs)
    /home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/eager/def_function.py:227 __init__
        initial_value = initial_value()
    /home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/distribute/collective_all_reduce_strategy.py:573 initial_value_fn
        collective_instance_key)
    /home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/ops/collective_ops.py:290 broadcast_recv
        timeout_seconds=timeout)
    /home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/ops/gen_collective_ops.py:59 collective_bcast_recv
        timeout_seconds=timeout_seconds, name=name, ctx=_ctx)
    /home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/ops/gen_collective_ops.py:115 collective_bcast_recv_eager_fallback
        attrs=_attrs, ctx=ctx, name=name)
    /home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/eager/execute.py:60 quick_execute
        inputs, attrs, num_outputs)

    InternalError: [_Derived_]Collective ops is aborted by: RecvBufResponse returned 0 bytes where to_tensor expected 5274514432
    The error could be from a previous operation. Restart your program to reset. [Op:CollectiveBcastRecv]
dgoldenberg-audiomack commented 3 years ago

@maciejkula Trying to work around the issue I posted above, I've switched from loading Parquet to loading the same event data as CSV.

Now I'm getting an error as below.

tensorflow.RecvBufRespExtra exceeded maximum protobuf size of 2GB

Why is this a problem? E.g. I see this reported as https://github.com/onnx/tensorflow-onnx/issues/1040 but that claims the problem to have been fixed, as in https://github.com/onnx/tensorflow-onnx/pull/1090, back in September of 2020. Why would I see this with TF 2.4.1 and how can I work around this?

Epoch 1/3
[libprotobuf ERROR external/com_google_protobuf/src/google/protobuf/message_lite.cc:451] tensorflow.RecvBufRespExtra exceeded maximum protobuf size of 2GB: 5368538440
[libprotobuf ERROR external/com_google_protobuf/src/google/protobuf/message_lite.cc:451] tensorflow.RecvBufRespExtra exceeded maximum protobuf size of 2GB: 5368538440
WARNING:tensorflow:/job:worker/replica:0/task:1 seems down, retrying 1/3
WARNING:tensorflow:/job:worker/replica:0/task:1 seems down, retrying 2/3
ERROR:tensorflow:Cluster check alive failed, /job:worker/replica:0/task:1 is down, aborting collectives: Deadline Exceeded
Additional GRPC error information from remote target /job:worker/replica:0/task:1:
:{"created":"@1621963057.971457165","description":"Deadline Exceeded","file":"external/com_github_grpc_grpc/src/core/ext/filters/deadline/deadline_filter.cc","file_line":69,"grpc_status":4}
2021-05-25 17:17:37.971862: W tensorflow/core/common_runtime/base_collective_executor.cc:223] BaseCollectiveExecutor already aborted, ignoring StartAbort: Unavailable: [_Derived_]Collective ops is aborted by: cluster check alive failed, /job:worker/replica:0/task:1 is down
The error could be from a previous operation. Restart your program to reset.
Traceback (most recent call last):
  File "recsys_tfrs_songs_2.py", line 88, in <module>
    main(sys.argv)
  File "recsys_tfrs_songs_2.py", line 70, in main
    model_maker.train_and_evaluate(model, NUM_TRAIN_EPOCHS, train_events_ds, test_events_ds)
  File "/home/ec2-user/tfrs_proto/recommender_system/recsys_tf/recsys_tfrs_model.py", line 165, in train_and_evaluate
    model.fit(train_events_ds, epochs=num_epochs)
  File "/home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/keras/engine/training.py", line 1100, in fit
    tmp_logs = self.train_function(iterator)
  File "/home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/eager/def_function.py", line 828, in __call__
    result = self._call(*args, **kwds)
  File "/home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/eager/def_function.py", line 871, in _call
    self._initialize(args, kwds, add_initializers_to=initializers)
  File "/home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/eager/def_function.py", line 726, in _initialize
    *args, **kwds))
  File "/home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/eager/function.py", line 2969, in _get_concrete_function_internal_garbage_collected
    graph_function, _ = self._maybe_define_function(args, kwargs)
  File "/home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/eager/function.py", line 3361, in _maybe_define_function
    graph_function = self._create_graph_function(args, kwargs)
  File "/home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/eager/function.py", line 3206, in _create_graph_function
    capture_by_value=self._capture_by_value),
  File "/home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/framework/func_graph.py", line 990, in func_graph_from_py_func
    func_outputs = python_func(*func_args, **func_kwargs)
  File "/home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/eager/def_function.py", line 634, in wrapped_fn
    out = weak_wrapped_fn().__wrapped__(*args, **kwds)
  File "/home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/framework/func_graph.py", line 977, in wrapper
    raise e.ag_error_metadata.to_exception(e)
tensorflow.python.framework.errors_impl.UnavailableError: in user code:

    /home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/keras/engine/training.py:805 train_function  *
        return step_function(self, iterator)
    /home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/keras/engine/training.py:795 step_function  **
        outputs = model.distribute_strategy.run(run_step, args=(data,))
    /home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/distribute/distribute_lib.py:1259 run
        return self._extended.call_for_each_replica(fn, args=args, kwargs=kwargs)
    /home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/distribute/distribute_lib.py:2730 call_for_each_replica
        return self._call_for_each_replica(fn, args, kwargs)
    /home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/distribute/mirrored_strategy.py:629 _call_for_each_replica
        self._container_strategy(), fn, args, kwargs)
    /home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/distribute/mirrored_run.py:93 call_for_each_replica
        return _call_for_each_replica(strategy, fn, args, kwargs)
    /home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/distribute/mirrored_run.py:234 _call_for_each_replica
        coord.join(threads)
    /home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/training/coordinator.py:389 join
        six.reraise(*self._exc_info_to_raise)
    /usr/local/lib/python3.7/site-packages/six.py:703 reraise
        raise value
    /home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/training/coordinator.py:297 stop_on_exception
        yield
    /home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/distribute/mirrored_run.py:323 run
        self.main_result = self.main_fn(*self.main_args, **self.main_kwargs)
    /home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/keras/engine/training.py:788 run_step  **
        outputs = model.train_step(data)
    /home/ec2-user/.local/lib/python3.7/site-packages/tensorflow_recommenders/models/base.py:76 train_step
        self.optimizer.apply_gradients(zip(gradients, self.trainable_variables))
    /home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/keras/optimizer_v2/optimizer_v2.py:604 apply_gradients
        self._create_all_weights(var_list)
    /home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/keras/optimizer_v2/optimizer_v2.py:781 _create_all_weights
        _ = self.iterations
    /home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/keras/optimizer_v2/optimizer_v2.py:788 __getattribute__
        return super(OptimizerV2, self).__getattribute__(name)
    /home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/keras/optimizer_v2/optimizer_v2.py:926 iterations
        aggregation=tf_variables.VariableAggregation.ONLY_FIRST_REPLICA)
    /home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/keras/optimizer_v2/optimizer_v2.py:1132 add_weight
        aggregation=aggregation)
    /home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/training/tracking/base.py:810 _add_variable_with_custom_getter
        **kwargs_for_getter)
    /home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/keras/engine/base_layer_utils.py:142 make_variable
        shape=variable_shape if variable_shape else None)
    /home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/ops/variables.py:260 __call__
        return cls._variable_v1_call(*args, **kwargs)
    /home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/ops/variables.py:221 _variable_v1_call
        shape=shape)
    /home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/ops/variables.py:67 getter
        return captured_getter(captured_previous, **kwargs)
    /home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/distribute/shared_variable_creator.py:69 create_new_variable
        v = next_creator(**kwargs)
    /home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/ops/variables.py:67 getter
        return captured_getter(captured_previous, **kwargs)
    /home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/distribute/distribute_lib.py:2083 creator_with_resource_vars
        created = self._create_variable(next_creator, **kwargs)
    /home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/distribute/mirrored_strategy.py:489 _create_variable
        distribute_utils.VARIABLE_POLICY_MAPPING, **kwargs)
    /home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/distribute/distribute_utils.py:311 create_mirrored_variable
        value_list = real_mirrored_creator(**kwargs)
    /home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/distribute/mirrored_strategy.py:481 _real_mirrored_creator
        v = next_creator(**kwargs)
    /home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/ops/variables.py:67 getter
        return captured_getter(captured_previous, **kwargs)
    /home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/eager/def_function.py:714 variable_capturing_scope
        lifted_initializer_graph=lifted_initializer_graph, **kwds)
    /home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/ops/variables.py:264 __call__
        return super(VariableMetaclass, cls).__call__(*args, **kwargs)
    /home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/eager/def_function.py:227 __init__
        initial_value = initial_value()
    /home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/distribute/collective_all_reduce_strategy.py:566 initial_value_fn
        group_size, group_key, collective_instance_key)
    /home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/ops/collective_ops.py:247 broadcast_send
        timeout_seconds=timeout)
    /home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/ops/gen_collective_ops.py:158 collective_bcast_send
        timeout_seconds=timeout_seconds, name=name, ctx=_ctx)
    /home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/ops/gen_collective_ops.py:213 collective_bcast_send_eager_fallback
        attrs=_attrs, ctx=ctx, name=name)
    /home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/eager/execute.py:60 quick_execute
        inputs, attrs, num_outputs)

    UnavailableError: [_Derived_]Collective ops is aborted by: cluster check alive failed, /job:worker/replica:0/task:1 is down
    The error could be from a previous operation. Restart your program to reset. [Op:CollectiveBcastSend]
dgoldenberg-audiomack commented 3 years ago

@maciejkula I've reduced the size of the input CSV events dataset in half and now I'm back to the other error, RecvBufResponse returned 0 bytes where to_tensor expected 5364609280. What does this mean and how does one fix this or work around this? Thanks.

maciejkula commented 3 years ago

Could you try reading a single CSV file of data, a very small subset?

Also, I assume that this is a cluster - anything in the logs of the worker machines? It could be that they are failing with a good error message, and your coordinator only knows that a worker has failed.

maciejkula commented 3 years ago

I'd also recommend first trying to train on a single machine - make sure it works there (even if it's slow), then scale to more machines.

dgoldenberg-audiomack commented 3 years ago

@maciejkula I've already done the gamut of running a small dataset on a single machine. The program is able to finish (leaving any details of quality of recs aside for the moment).

The natural next step is to load a larger dataset and distribute the training.

With MultiWorkerMirroredStrategy, loading a single event file (csv.gz) yields tensorflow.RecvBufRespExtra exceeded maximum protobuf size of 2GB: 5368538440.

I assume that this is a cluster

Yes, this is a cluster of 3 machines.

anything in the logs of the worker machines?

Where specifically would I want to look? I'm looking at the stdout only for now, after having set the following:

os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3"

Log output

2021-05-25 18:20:50.415248: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcudart.so.11.0
2021-05-25 18:20:51.608591: I tensorflow/compiler/jit/xla_cpu_device.cc:41] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-05-25 18:20:51.609425: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcuda.so.1
2021-05-25 18:20:51.667435: E tensorflow/stream_executor/cuda/cuda_driver.cc:328] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2021-05-25 18:20:51.667480: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:156] kernel driver does not appear to be running on this host (ip-10-2-249-213.awsinternal.audiomack.com): /proc/driver/nvidia/version does not exist
2021-05-25 18:20:51.668450: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-05-25 18:20:51.669318: I tensorflow/compiler/jit/xla_gpu_device.cc:99] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-05-25 18:20:51.670502: I tensorflow/compiler/jit/xla_gpu_device.cc:99] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-05-25 18:20:51.676561: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:301] Initialize GrpcChannelCache for job worker -> {0 -> 10.2.249.213:2121, 1 -> 10.2.252.56:2121, 2 -> 10.2.252.97:2121}
2021-05-25 18:20:51.677551: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:411] Started server with target: grpc://10.2.249.213:2121

>> 2021-05-25 13:20:55 : >> Running the TFRS based song recommender...

>> Loading TF datasets from S3...
>> --- ITEMS dataset: loaded.
>> --- Loading the EVENTS dataset from s3
>> --- EVENTS dataset: loaded
>> Loading TF datasets from S3: done.
>> Preparing data...
>> Data preparation: done.
>> Strategy: <tensorflow.python.distribute.collective_all_reduce_strategy.CollectiveAllReduceStrategy object at 0x7f0b710e0d50>
>> Number of devices: 3
>> GPU's: []
>> 2021-05-25 13:23:54 : >> Training the model...
2021-05-25 18:23:54.040848: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:116] None of the MLIR optimization passes are enabled (registered 2)
2021-05-25 18:23:54.060004: I tensorflow/core/platform/profile_utils/cpu_utils.cc:112] CPU Frequency: 2499995000 Hz
Epoch 1/3
[libprotobuf ERROR external/com_google_protobuf/src/google/protobuf/message_lite.cc:451] tensorflow.RecvBufRespExtra exceeded maximum protobuf size of 2GB: 5368538440
[libprotobuf ERROR external/com_google_protobuf/src/google/protobuf/message_lite.cc:451] tensorflow.RecvBufRespExtra exceeded maximum protobuf size of 2GB: 5368538440
WARNING:tensorflow:/job:worker/replica:0/task:1 seems down, retrying 1/3
WARNING:tensorflow:/job:worker/replica:0/task:1 seems down, retrying 2/3
ERROR:tensorflow:Cluster check alive failed, /job:worker/replica:0/task:1 is down, aborting collectives: Deadline Exceeded
Additional GRPC error information from remote target /job:worker/replica:0/task:1:
:{"created":"@1621967097.618896060","description":"Deadline Exceeded","file":"external/com_github_grpc_grpc/src/core/ext/filters/deadline/deadline_filter.cc","file_line":69,"grpc_status":4}
2021-05-25 18:24:57.619317: W tensorflow/core/common_runtime/base_collective_executor.cc:223] BaseCollectiveExecutor already aborted, ignoring StartAbort: Unavailable: [_Derived_]Collective ops is aborted by: cluster check alive failed, /job:worker/replica:0/task:1 is down
The error could be from a previous operation. Restart your program to reset.
Traceback (most recent call last):
  File "recsys_tfrs_songs_2.py", line 88, in <module>
    main(sys.argv)
  File "recsys_tfrs_songs_2.py", line 70, in main
    model_maker.train_and_evaluate(model, NUM_TRAIN_EPOCHS, train_events_ds, test_events_ds)
  File "/home/ec2-user/tfrs_proto/recommender_system/recsys_tf/recsys_tfrs_model.py", line 165, in train_and_evaluate
    model.fit(train_events_ds, epochs=num_epochs)
  File "/home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/keras/engine/training.py", line 1100, in fit
    tmp_logs = self.train_function(iterator)
  File "/home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/eager/def_function.py", line 828, in __call__
    result = self._call(*args, **kwds)
  File "/home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/eager/def_function.py", line 871, in _call
    self._initialize(args, kwds, add_initializers_to=initializers)
  File "/home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/eager/def_function.py", line 726, in _initialize
    *args, **kwds))
  File "/home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/eager/function.py", line 2969, in _get_concrete_function_internal_garbage_collected
    graph_function, _ = self._maybe_define_function(args, kwargs)
  File "/home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/eager/function.py", line 3361, in _maybe_define_function
    graph_function = self._create_graph_function(args, kwargs)
  File "/home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/eager/function.py", line 3206, in _create_graph_function
    capture_by_value=self._capture_by_value),
  File "/home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/framework/func_graph.py", line 990, in func_graph_from_py_func
    func_outputs = python_func(*func_args, **func_kwargs)
  File "/home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/eager/def_function.py", line 634, in wrapped_fn
    out = weak_wrapped_fn().__wrapped__(*args, **kwds)
  File "/home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/framework/func_graph.py", line 977, in wrapper
    raise e.ag_error_metadata.to_exception(e)
tensorflow.python.framework.errors_impl.UnavailableError: in user code:

    /home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/keras/engine/training.py:805 train_function  *
        return step_function(self, iterator)
    /home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/keras/engine/training.py:795 step_function  **
        outputs = model.distribute_strategy.run(run_step, args=(data,))
    /home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/distribute/distribute_lib.py:1259 run
        return self._extended.call_for_each_replica(fn, args=args, kwargs=kwargs)
    /home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/distribute/distribute_lib.py:2730 call_for_each_replica
        return self._call_for_each_replica(fn, args, kwargs)
    /home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/distribute/mirrored_strategy.py:629 _call_for_each_replica
        self._container_strategy(), fn, args, kwargs)
    /home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/distribute/mirrored_run.py:93 call_for_each_replica
        return _call_for_each_replica(strategy, fn, args, kwargs)
    /home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/distribute/mirrored_run.py:234 _call_for_each_replica
        coord.join(threads)
    /home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/training/coordinator.py:389 join
        six.reraise(*self._exc_info_to_raise)
    /usr/local/lib/python3.7/site-packages/six.py:703 reraise
        raise value
    /home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/training/coordinator.py:297 stop_on_exception
        yield
    /home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/distribute/mirrored_run.py:323 run
        self.main_result = self.main_fn(*self.main_args, **self.main_kwargs)
    /home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/keras/engine/training.py:788 run_step  **
        outputs = model.train_step(data)
    /home/ec2-user/.local/lib/python3.7/site-packages/tensorflow_recommenders/models/base.py:76 train_step
        self.optimizer.apply_gradients(zip(gradients, self.trainable_variables))
    /home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/keras/optimizer_v2/optimizer_v2.py:604 apply_gradients
        self._create_all_weights(var_list)
    /home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/keras/optimizer_v2/optimizer_v2.py:781 _create_all_weights
        _ = self.iterations
    /home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/keras/optimizer_v2/optimizer_v2.py:788 __getattribute__
        return super(OptimizerV2, self).__getattribute__(name)
    /home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/keras/optimizer_v2/optimizer_v2.py:926 iterations
        aggregation=tf_variables.VariableAggregation.ONLY_FIRST_REPLICA)
    /home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/keras/optimizer_v2/optimizer_v2.py:1132 add_weight
        aggregation=aggregation)
    /home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/training/tracking/base.py:810 _add_variable_with_custom_getter
        **kwargs_for_getter)
    /home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/keras/engine/base_layer_utils.py:142 make_variable
        shape=variable_shape if variable_shape else None)
    /home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/ops/variables.py:260 __call__
        return cls._variable_v1_call(*args, **kwargs)
    /home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/ops/variables.py:221 _variable_v1_call
        shape=shape)
    /home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/ops/variables.py:67 getter
        return captured_getter(captured_previous, **kwargs)
    /home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/distribute/shared_variable_creator.py:69 create_new_variable
        v = next_creator(**kwargs)
    /home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/ops/variables.py:67 getter
        return captured_getter(captured_previous, **kwargs)
    /home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/distribute/distribute_lib.py:2083 creator_with_resource_vars
        created = self._create_variable(next_creator, **kwargs)
    /home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/distribute/mirrored_strategy.py:489 _create_variable
        distribute_utils.VARIABLE_POLICY_MAPPING, **kwargs)
    /home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/distribute/distribute_utils.py:311 create_mirrored_variable
        value_list = real_mirrored_creator(**kwargs)
    /home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/distribute/mirrored_strategy.py:481 _real_mirrored_creator
        v = next_creator(**kwargs)
    /home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/ops/variables.py:67 getter
        return captured_getter(captured_previous, **kwargs)
    /home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/eager/def_function.py:714 variable_capturing_scope
        lifted_initializer_graph=lifted_initializer_graph, **kwds)
    /home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/ops/variables.py:264 __call__
        return super(VariableMetaclass, cls).__call__(*args, **kwargs)
    /home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/eager/def_function.py:227 __init__
        initial_value = initial_value()
    /home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/distribute/collective_all_reduce_strategy.py:566 initial_value_fn
        group_size, group_key, collective_instance_key)
    /home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/ops/collective_ops.py:247 broadcast_send
        timeout_seconds=timeout)
    /home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/ops/gen_collective_ops.py:158 collective_bcast_send
        timeout_seconds=timeout_seconds, name=name, ctx=_ctx)
    /home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/ops/gen_collective_ops.py:213 collective_bcast_send_eager_fallback
        attrs=_attrs, ctx=ctx, name=name)
    /home/ec2-user/.local/lib/python3.7/site-packages/tensorflow/python/eager/execute.py:60 quick_execute
        inputs, attrs, num_outputs)

    UnavailableError: [_Derived_]Collective ops is aborted by: cluster check alive failed, /job:worker/replica:0/task:1 is down
    The error could be from a previous operation. Restart your program to reset. [Op:CollectiveBcastSend]
maciejkula commented 3 years ago

What happens if you remove the distribution strategy entirely?

dgoldenberg-audiomack commented 3 years ago

Basically the same behavior as with the default strategy, goes into training. Hasn't finished yet but doesn't look unhappy.

    strategy = None

    if strategy is not None:
        with strategy.scope():
            model, opt_test_events_ds, opt_train_events_ds = self.do_create_model(embedding_dimension)
     else:
        model, opt_test_events_ds, opt_train_events_ds = self.do_create_model(embedding_dimension)