uber / petastorm

Petastorm library enables single machine or distributed training and evaluation of deep learning models from datasets in Apache Parquet format. It supports ML frameworks such as Tensorflow, Pytorch, and PySpark and can be used from pure Python code.
Apache License 2.0
1.77k stars 282 forks source link

[ISSUE] Petastorm + TF Recommenders hangs forever #672

Open renardeinside opened 3 years ago

renardeinside commented 3 years ago

Hi team,

I've met the following issue while using Petastorm with Tensorflow Recommenders.

Here is a quick code sample:

raw_data = spark.read.load("dbfs:/some/path")

ratings_df = raw_data.select("userid", "productid")
items_df = raw_data.select("productid").distinct()

ratings_converter = make_spark_converter(ratings_df)
items_converter = make_spark_converter(items_df)

with ratings_converter.make_tf_dataset(batch_size=8192) as ratings_ds, items_converter.make_tf_dataset(batch_size=128) as items_ds:
  # this structure is required by TFRS
  ratings = ratings_ds.flat_map(tf.data.Dataset.from_tensor_slices).map(lambda x: {
    "user_id": x[0],
    "item_id": x[1]
  }).batch(8192)

  items = items_ds.flat_map(tf.data.Dataset.from_tensor_slices).map(lambda x: x[0])

  user_model = get_user_model(unique_users)
  item_model = get_item_model(unique_items)

  metrics = tfrs.metrics.FactorizedTopK(
      candidates=items.batch(128).map(item_model)
  )

  task = tfrs.tasks.Retrieval(metrics=metrics)

  model = RecommendationModel(user_model, item_model, task) 
  model.compile(optimizer=tf.keras.optimizers.Adagrad(learning_rate=0.01))

  steps_per_epoch = len(ratings_converter) // 8192

  history = model.fit(
    ratings,
    epochs=3,
    verbose=True,
    steps_per_epoch=steps_per_epoch,
  )

This code hangs forever at the start of the first epoch. It works without any problems if I pass data directly via memory and .toPandas() -> tf.data.Dataset.from_batches.

Versions of components:

selitvin commented 3 years ago

Can you please try reading directly from the ratings_ds and items_ds, without recommenders? This will help us isolate the source of the hanging. Also, you can try passing make_tf_dataset this argument: reader_pool_type="dummy". This will switch off the threadpool and might make debugging easier.

renardeinside commented 3 years ago

Can you please try reading directly from the ratings_ds and items_ds, without recommenders?

yes, if I simply do:

for p in ratings_ds.as_numpy_iterator():
    print(p)

it works fine for all iterations.

I'll try with reader_pool_type="dummy" and let you know the status.

selitvin commented 3 years ago

What are the assumption on the dbfs:/some/path ? Anything you can do to help me recreate a small sample dataset, so I could run locally and observe the same behavior?

renardeinside commented 3 years ago

on dbfs:/some/path there is this Kaggle dataset - https://www.kaggle.com/skillsmuggler/amazon-ratings I've taken .limit(100_000) and saved it into Delta Lake format.

I've also reproduced the same issue locally on my Mac (not on Databricks environment). I see the following messages when I run my code in debug mode locally (not sure if it's relevant tbh):

/Users/ivan.trusov/opt/anaconda3/envs/dbx-tf-recsys/lib/python3.7/site-packages/petastorm/arrow_reader_worker.py:294: ResourceWarning: unclosed file <_io.BufferedReader name='/tmp/petastorm/cache/20210503215814-appid-local-1620071889079-24d8b84b-ed60-459e-bf34-1745c1bbb49c/part-00000-0ac1fe16-08a2-4d49-bbe4-3bdf93f26b5c-c000.parquet'>
  table = piece.read(columns=column_names - partition_names, partitions=self._dataset.partitions)
ResourceWarning: Enable tracemalloc to get the object allocation traceback

Here are code samples to read the data:

# prepare Spark with Delta Lake support
spark = (
            SparkSession.builder.master("local[1]")
            .config("spark.jars.packages", "io.delta:delta-core_2.12:0.8.0")
            .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
            .config("spark.sql.shuffle.partitions", 4)
            .config(
                "spark.sql.catalog.spark_catalog",
                "org.apache.spark.sql.delta.catalog.DeltaCatalog",
            )
            .getOrCreate()
 )

 # read the data

 raw_data = (
           spark.read.format("delta")
                .load(SOURCE_DIR)
                .select("userid", "productid", "rating")
                .sample(0.05)
)

spark.conf.set(
            SparkDatasetConverter.PARENT_CACHE_DIR_URL_CONF,
            "file:///tmp/petastorm/cache"
 )

ratings_converter: SparkDatasetConverter = make_spark_converter(raw_data)
items_converter: SparkDatasetConverter = make_spark_converter(
    raw_data.select("productid").distinct()
)

with ratings_converter.make_tf_dataset(
                batch_size=128
        ) as ratings_ds, items_converter.make_tf_dataset(
            batch_size=16
        ) as items_ds:

        train = ratings_ds.map(
                lambda x: {"user_id": x[0], "product_id": x[1]}
            )

        items = items_ds.map(
            lambda x: x[0]
        )

        # here goes code as per TFRS default notebook - https://www.tensorflow.org/recommenders/examples/basic_retrieval

        model.fit(
                train,
                epochs=3,
                verbose=True,
                steps_per_epoch=len(ratings_converter) // 128
        )
renardeinside commented 3 years ago

I've also added some logging to the training step, and I see the following:

21/05/03 22:24:31 INFO ModelBuilder: Training: start of batch 0; got log keys: []
21/05/03 22:24:31 INFO ModelBuilder: Starting the train step with features: {'user_id': <tf.Tensor 'IteratorGetNext:1' shape=(None,) dtype=string>, 'item_id': <tf.Tensor 'IteratorGetNext:0' shape=(None,) dtype=string>}
21/05/03 22:24:32 INFO ModelBuilder: Train step finished
21/05/03 22:24:32 INFO ModelBuilder: Starting the train step with features: {'user_id': <tf.Tensor 'IteratorGetNext:1' shape=(None,) dtype=string>, 'item_id': <tf.Tensor 'IteratorGetNext:0' shape=(None,) dtype=string>}
21/05/03 22:24:32 INFO ModelBuilder: Train step finished
/Users/ivan.trusov/opt/anaconda3/envs/dbx-tf-recsys/lib/python3.7/site-packages/petastorm/arrow_reader_worker.py:53: FutureWarning: Calling .data on ChunkedArray is provided for compatibility after Column was removed, simply drop this attribute
  column_as_pandas = column.data.chunks[0].to_pandas()

It seems like two steps were performed successfully, but then the process hanged.

renardeinside commented 3 years ago

Interesting - I've found that issue is actually with the items dataset, not with the ratings. If I switch from items_ds above to a simple NumPy-based iterator, everything works like a charm:

items = tf.data.Dataset.from_tensor_slices(unique_items)