linkedin / spark-tfrecord

Read and write Tensorflow TFRecord data from Apache Spark.
BSD 2-Clause "Simplified" License
290 stars 57 forks source link

Error while reading TFRecords - buildReader is not supported for TFRECORD #32

Closed GaganSD closed 3 years ago

GaganSD commented 3 years ago

Hi @junshi15

I have a problem same as #15. I'm trying to replicate the test examples shown in the README but I'm unable to do it because of this error. I'm using Scala 2.12 and Spark 3.0 with version 0.3.2 of spark-tfrecord.

With this installed, spark can write TFRecords as expected however can't read the same tf-records it created.

I get this error message:

Py4JJavaError: An error occurred while calling o183.showString.
: java.lang.UnsupportedOperationException: buildReader is not supported for TFRECORD

Do you know a fix to this?

I'm using Python 3.7. Here's my code (It's mostly from this repo's README)

import os
import sys
from pyspark.sql.types import *
from pyspark.sql import SparkSession

def build_session():
    sess_builder = SparkSession\
        .builder\
        .appName('tfrecordTest')

    # just needed to run locally
    os.environ['PYSPARK_PYTHON'] = sys.executable
    os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

    config = {
        'spark.jars': '/data04/home/ben.davidson/code/ByteSpark/spark-tfrecord_2.12-0.3.2.jar',
    }
    for key in config:
        sess_builder.config(key, config[key])

    spark = sess_builder.getOrCreate()
    return spark

def get_data(spark):
    fields = [StructField("id", IntegerType()), StructField("IntegerCol", IntegerType()),
            StructField("LongCol", LongType()), StructField("FloatCol", FloatType()),
            StructField("DoubleCol", DoubleType()), StructField("VectorCol", ArrayType(DoubleType(), True)),
            StructField("StringCol", StringType())]
    schema = StructType(fields)
    test_rows = [[11, 1, 23, 10.0, 14.0, [1.0, 2.0], "r1"], [21, 2, 24, 12.0, 15.0, [2.0, 2.0], "r2"]]
    rdd = spark.sparkContext.parallelize(test_rows)
    df = spark.createDataFrame(rdd, schema)
    return df

if __name__ == '__main__':
    spark = build_session()
    df = get_data(spark)
    path = 'hdfs://harunava/user/ben.davidson/test/data'

    # works
    df.write.mode("overwrite").format("tfrecord").option("recordType", "Example").save(path)

    # breaks
    df = spark.read.format("tfrecord").option("recordType", "Example").load(path)
    df.show()

I get this error message:


---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-7-31dc9d3fa2b3> in <module>
      1 df2 = spark.read.format("tfrecord").option("recordType", "Example").load(path)
----> 2 df2.show()

~/miniconda3/envs/ByteSpark/lib/python3.7/site-packages/pyspark/sql/dataframe.py in show(self, n, truncate, vertical)
    438         """
    439         if isinstance(truncate, bool) and truncate:
--> 440             print(self._jdf.showString(n, 20, vertical))
    441         else:
    442             print(self._jdf.showString(n, int(truncate), vertical))

~/miniconda3/envs/ByteSpark/lib/python3.7/site-packages/py4j/java_gateway.py in __call__(self, *args)
   1303         answer = self.gateway_client.send_command(command)
   1304         return_value = get_return_value(
-> 1305             answer, self.gateway_client, self.target_id, self.name)
   1306 
   1307         for temp_arg in temp_args:

~/miniconda3/envs/ByteSpark/lib/python3.7/site-packages/pyspark/sql/utils.py in deco(*a, **kw)
    129     def deco(*a, **kw):
    130         try:
--> 131             return f(*a, **kw)
    132         except py4j.protocol.Py4JJavaError as e:
    133             converted = convert_exception(e.java_exception)

~/miniconda3/envs/ByteSpark/lib/python3.7/site-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:
    330                 raise Py4JError(

Py4JJavaError: An error occurred while calling o183.showString.
: java.lang.UnsupportedOperationException: buildReader is not supported for TFRECORD
    at org.apache.spark.sql.execution.datasources.FileFormat.buildReader(FileFormat.scala:116)
    at org.apache.spark.sql.execution.datasources.FileFormat.buildReaderWithPartitionValues(FileFormat.scala:137)
    at org.apache.spark.sql.execution.FileSourceScanExec.inputRDD$lzycompute(DataSourceScanExec.scala:535)
    at org.apache.spark.sql.execution.FileSourceScanExec.inputRDD(DataSourceScanExec.scala:525)
    at org.apache.spark.sql.execution.FileSourceScanExec.doExecute(DataSourceScanExec.scala:610)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
    at org.apache.spark.sql.execution.InputAdapter.inputRDD(WholeStageCodegenExec.scala:526)
...
...
...

Let me know if you know a fix to this, thanks! :)

junshi15 commented 3 years ago

Thanks for your question, @GaganSD .

I tested your script in pyspark shell, by using bin/pyspark --packages com.linkedin.sparktfrecord:spark-tfrecord_2.12:0.3.2

Then copied your code into the pyspark REPL. It worked for me.

I don't know why you were seeing the error. My guess is that it has something to do your environmental settings.

GaganSD commented 3 years ago

Right, I managed to find the bug, and it was indeed an issue with the environment settings.

We were using Spark 3.0.1 and not Spark 3.0(.0) (unlike what I mentioned above). I think this shows that this library isn't compatible with the newer releases of Spark.

Upon searching online, I found out this also happens when pyspark version doesn't match their installed spark version, or when Java's version is 9.0+ as Spark doesn't seem work well with the newer editions of java. Hope this helps someone if they run into this error.

Thanks for the reply! :)

danjwilks commented 3 years ago
Screenshot 2021-08-12 at 12 27 07

Congrats

junshi15 commented 3 years ago

@GaganSD Glad you found the root cause. In fact I tested with spark-3.1.2 and it worked for me.

bytekongfrombupt commented 3 years ago

@junshi15 well I tested with spark-3.0.1, and I got the same error message with @GaganSD . It seems that it's indeed an issue with the environment settings.