tensorflow / recommenders

TensorFlow Recommenders is a library for building recommender system models using TensorFlow.
Apache License 2.0
1.83k stars 274 forks source link

Anyone able to load tfrecords into TFRS generated with the spark-to-tf-records connector? #197

Open dgoldenberg-audiomack opened 3 years ago

dgoldenberg-audiomack commented 3 years ago

Anyone see this kind of error when trying to load TF records generated from Spark by the spark to tf records connector or linkedin's spark tf record library?

Error: Error when deserializing tfrecord's in TF 2.x: Only integers, slices (:), ellipsis (...), tf.newaxis (None) and scalar tf.int32/tf.int64 tensors are valid indices

Filed tickets there with details

Really just doing a simple thing, using the small movielens dataset:

    # Code for the connector
    movies_df.write.format("tfrecords").mode("overwrite").save(tf_movies_dir)
    ratings_df.write.format("tfrecords").mode("overwrite").save(tf_ratings_dir)

    # Alternatively, code for the spark to tfrecord
    movies_df.write.format("tfrecord").mode("overwrite").option("recordType", "Example").save(tf_movies_dir)
    ratings_df.write.format("tfrecord").mode("overwrite").option("recordType", "Example").save(tf_ratings_dir)

    s3 = boto3.resource("s3", verify=False)
    bucket = s3.Bucket("mybucket")

    filenames = []
    for object_summary in bucket.objects.filter(
            Prefix=f"emr/spark_apps/myapp/movielens-100k-conversion/movies-0001/part"
    ):
        filenames.append(os.path.join("s3://audiomack-master-airflow/", object_summary.key))
    movies_dataset = tf.data.TFRecordDataset(filenames)

    filenames = []
    for object_summary in bucket.objects.filter(
            Prefix=f"emr/spark_apps/myapp/movielens-100k-conversion/ratings-0001/part"
    ):
        filenames.append(os.path.join("s3://audiomack-master-airflow/", object_summary.key))
    ratings_dataset = tf.data.TFRecordDataset(filenames)
dgoldenberg-audiomack commented 3 years ago

That said, there's the CSV consuming api's... https://www.tensorflow.org/guide/data#consuming_csv_data, wonder if that's just a better, more direct approach anyway. I have Parquet data to consume; in theory should be able to load it as described here https://github.com/tensorflow/io/issues/1121

maciejkula commented 3 years ago

It might help folks answer this if you post the entire stack trace of your error. It's hard to say what's going on based on your summary of the error message.

dgoldenberg-audiomack commented 3 years ago

Hi @maciejkula, there are two slightly different stack traces in the linked tickets I've included.

However here's one of them:

Traceback (most recent call last): File "/mnt/tmp/spark-9eb04118-c94f-4ed6-bb6b-75927f4d389f/recsys_tfrs_proto.py", line 304, in main(sys.argv) File "/usr/local/lib64/python3.7/site-packages/tensorflow/python/autograph/impl/api.py", line 302, in wrapper return func(*args, kwargs) File "/mnt/tmp/spark-9eb04118-c94f-4ed6-bb6b-75927f4d389f/recsys_tfrs_proto.py", line 50, in main movies, test, train, unique_movie_titles, unique_user_ids = prepare_data(movies, ratings) File "/usr/local/lib64/python3.7/site-packages/tensorflow/python/autograph/impl/api.py", line 302, in wrapper return func(*args, *kwargs) File "/mnt/tmp/spark-9eb04118-c94f-4ed6-bb6b-75927f4d389f/recsys_tfrs_proto.py", line 155, in prepare_data ratings = ratings.map(lambda x: {"movie_title": x["movie_title"], "user_id": x["user_id"]}) File "/usr/local/lib64/python3.7/site-packages/tensorflow/python/data/ops/dataset_ops.py", line 1695, in map return MapDataset(self, map_func, preserve_cardinality=True) File "/usr/local/lib64/python3.7/site-packages/tensorflow/python/data/ops/dataset_ops.py", line 4045, in init use_legacy_function=use_legacy_function) File "/usr/local/lib64/python3.7/site-packages/tensorflow/python/data/ops/dataset_ops.py", line 3371, in init self._function = wrapper_fn.get_concrete_function() File "/usr/local/lib64/python3.7/site-packages/tensorflow/python/eager/function.py", line 2939, in get_concrete_function args, kwargs) File "/usr/local/lib64/python3.7/site-packages/tensorflow/python/eager/function.py", line 2906, in _get_concrete_function_garbage_collected graph_function, args, kwargs = self._maybe_define_function(args, kwargs) File "/usr/local/lib64/python3.7/site-packages/tensorflow/python/eager/function.py", line 3213, in _maybe_define_function graph_function = self._create_graph_function(args, kwargs) File "/usr/local/lib64/python3.7/site-packages/tensorflow/python/eager/function.py", line 3075, in _create_graph_function capture_by_value=self._capture_by_value), File "/usr/local/lib64/python3.7/site-packages/tensorflow/python/framework/func_graph.py", line 986, in func_graph_from_py_func func_outputs = python_func(*func_args, func_kwargs) File "/usr/local/lib64/python3.7/site-packages/tensorflow/python/data/ops/dataset_ops.py", line 3364, in wrapper_fn ret = _wrapper_helper(args) File "/usr/local/lib64/python3.7/site-packages/tensorflow/python/data/ops/dataset_ops.py", line 3299, in _wrapper_helper ret = autograph.tf_convert(func, ag_ctx)(nested_args) File "/usr/local/lib64/python3.7/site-packages/tensorflow/python/autograph/impl/api.py", line 302, in wrapper return func(*args, *kwargs) File "/mnt/tmp/spark-9eb04118-c94f-4ed6-bb6b-75927f4d389f/recsys_tfrs_proto.py", line 155, in ratings = ratings.map(lambda x: {"movie_title": x["movie_title"], "user_id": x["user_id"]}) File "/usr/local/lib64/python3.7/site-packages/tensorflow/python/util/dispatch.py", line 201, in wrapper return target(args, kwargs) File "/usr/local/lib64/python3.7/site-packages/tensorflow/python/ops/array_ops.py", line 986, in _slice_helper _check_index(s) File "/usr/local/lib64/python3.7/site-packages/tensorflow/python/ops/array_ops.py", line 865, in _check_index raise TypeError(_SLICE_TYPE_ERROR + ", got {!r}".format(idx)) TypeError: Only integers, slices (:), ellipsis (...), tf.newaxis (None) and scalar tf.int32/tf.int64 tensors are valid indices, got 'movie_title'

maciejkula commented 3 years ago

It looks like the error is in the map function:

ratings.map(lambda x: {"movie_title": x["movie_title"], "user_id": x["user_id"]})

Whatever the elements of ratings are, they are not dictionaries. What are they?

dgoldenberg-audiomack commented 3 years ago

That's true, unlike with tfds.load, when using the connector and then feeding the files into a tf.data.TFRecordDataset, the elements are not dictionaries, rather they're of type class bytes.

The question is, how does one decode these bytes?

b'\ns\n\x11\n\x08movie_id\x12\x05\x1a\x03\n\x01\x01\n#\n\x0bmovie_title'\x12\x14\n\x12\n\x10Toy Story (1995)\n9\n\x06genres\x12/\n-\n+Adventure|Animation|Children|Comedy|Fantasy'

Clearly this has movie_id, movie_title, and genres. .decode("utf-8") doesn't work and this doesn't cleanly un-hexify, either.

Similarly, for ratings b'\n|\n\x10\n\x07user_id\x12\x05\x1a\x03\n\x01\x01\n\x11\n\x08movie_id\x12\x05\x1a\x03\n\x01\x1f\n)\n\x0bmovie_title\x12\x1a\n\x18\n\x16Dangerous Minds (1995)\n\x12\n\x06rating\x12\x08\x12\x06\n\x04\x00\x00 @\n\x16\n\ttimestamp\x12\t\x1a\x07\n\x05\xe8\xd0\x96\xd9\x04'

Wonder if there's a utility method in tf somewhere..

maciejkula commented 3 years ago

I'm pretty sure the problem is on the writing side. What things do you expect Spark to be writing to these files? Is it writing what you think it's writing?

dgoldenberg-audiomack commented 3 years ago

It's writing the right keys (column names) and the right values, but I don't know enough about the internal tf record format. So this binary blob arrangement I can't pass a judgement on as far as correctness and the encoding it's using. It seems that there's some kind of a disconnect between what's being written and how it's being read into the dataset.

Maybe it's a TF 1.x vs. TF 2.x issue. Or maybe when reading into a TF dataset, tfrecords need to pass through another conversion layer.

All in all, not a critical issue for me because now I want to load Parquet into TF datasets directly; experimenting with that now. I had thought I'd have to write Spark dataframes into intermediary tfrecord files first then load them into tf datasets. But there seems to be a more direct way to just go Parquet -> DS.

That said, I think the load of tfrecord files into a TF DS "should work".

maciejkula commented 3 years ago

Have you looked at the docs for reading TFRecord files containing tf.train.Examples?

It looks like you're skipping the deserialization step (converting the serialized tf.train.Example protos to dictionaries of tensors).

dgoldenberg-audiomack commented 3 years ago

Oic, you mean this? -

parsed_dataset = raw_dataset.map(_parse_function)

It seems odd that one has to load the tfrecords into a dataset, then convert it while instructing it what's in the files :)

I would think that the dataset should be able to do the parsing itself, encapsulating the knowledge of how to parse these files...

I'll try this out a little later

dgoldenberg-audiomack commented 3 years ago

We can probably close the ticket; it may be worth mentioning in the docs that the 'parse' type of transformation is required; it's not obvious especially to a noob :)

Data-Jack commented 3 years ago

@dgoldenberg-audiomack Did you ever find a solution for this? I think I am experiencing a simlar issue when writing array of array column types.

I gtet the error "InvalidArgumentError: Feature: cold (data type: float) is required but could not be found. [Op:ParseSingleExample]" even though "cold" is clearly in the message byte string.

from pyspark.sql.types import *

data = [("A", 1, [1.1,2.0,3.0], [[0.0,1.0],[1.0,0.0]]),
         ("B", 2, [11.0,12.3,13.0], [[0.0,1.0],[1.0,0.0]]),
         ("C", 3, [21.0,22.0,23.5], [[1.0,0.0],[1.0,0.0]]),
  ]

schema = StructType([ \
    StructField("colA",StringType(),True), \
    StructField("colb",IntegerType(),True), \
    StructField("colc",ArrayType(FloatType()),True), \
    StructField("cold",ArrayType(ArrayType(FloatType())),True), \
  ])

df = spark.createDataFrame(data=data,schema=schema)
# df = 
#Write 
write_path = "/home/pathtowrite"

df.write.format("tfrecords").option("recordType", "SequenceExample").mode("overwrite").save(write_path)

### Readin with tensorflow

import tensorflow as tf
import os 

files = [f"{write_path}/{x}" for x in os.listdir(write_path) if x.startswith("part")]
dataset = tf.data.TFRecordDataset(files)
# Create a dictionary describing the features.
feature_description = {
    'colA': tf.io.FixedLenFeature([], tf.string),
    'colb': tf.io.FixedLenFeature([], tf.int64),
    'colc': tf.io.FixedLenFeature([3], tf.float32),
    'cold': tf.io.FixedLenFeature([2,2], tf.float32),
}

for i in dataset.take(1):
    print(repr(i))
    example = tf.io.parse_single_example(i, feature_description)
    print(example)
dgoldenberg-audiomack commented 3 years ago

@Data-Jack In my particular case, the solution was to make sure I use the map:

parsed_dataset = raw_dataset.map(_parse_function)

However generally I've moved away from using tf-records. At least, my current thinking is why bother :) TF has ways of loading CSV and Parquet data into its datasets. What I do is wrangle all the input data to CSV/Parquet using Spark, then load it into TF datasets.

I have filed this issue in TF for better interoperability with Spark.

However, there's also the Big DL project which presumably allows one to distribute TF training using Spark.

Data-Jack commented 3 years ago

@dgoldenberg-audiomack Unfortunately my data contains arrays of array fields and map still doesn't work. Chucking the below parsing function in my example still produces the same error for me.

def _parse_function(example_proto):
  # Parse the input tf.train.Example proto using the dictionary above.
  return tf.io.parse_single_example(example_proto, feature_description)

parsed_dataset = dataset.map(_parse_function)
for i in parsed_dataset.take(1):
    print(i)

Thanks for the tip, I will look into to a pipeline that reads from parquet. I was really hoping to just be able to write all my nested array fields to tf-record and read them in. I am going to settle for flattening all the arrays into 1 array for now whilst I research other options.

dgoldenberg-audiomack commented 3 years ago

@Data-Jack Have you considered filing an issue with tensorflow-io, or core TF? they might suggest something.

Data-Jack commented 3 years ago

@dgoldenberg-audiomack Yeah, I will do. My first guess was it was how it was being written.