Closed lxx719 closed 3 years ago
Thanks for reporting. Confirmed the issue: writing is fine, but reading compressed file does not function as expected. However, we have very limited resource right now. Contributions are welcome.
During read, the compression is not set by "option", but detected automatically. The reader is created here: https://github.com/linkedin/spark-tfrecord/blob/master/src/main/scala/com/linkedin/spark/datasources/tfrecord/TFRecordFileReader.scala#L33
The compression is detected here: https://github.com/tensorflow/ecosystem/blob/master/hadoop/src/main/java/org/tensorflow/hadoop/io/TFRecordFileInputFormat.java#L58
It is not clear why the codecs is not selected correctly.
if you use the part-file path in the load() command, something like .....load("path-to-part-file.tfrecord.gz", then it loads fine, but just for a part file.
@lxx719 v0.3.2 should fix this issue. Please give it a try.
@junshi15 thanks. but my work env is spark version 2.4.0, scala version 2.11.12. not compatible with 0.3.x previous read test is with v2.11-0.2.2. on the other hand, i can read compressed tfrecord with explicit schema.
@junshi15 it works. thanks!
glad it worked.
if you use the part-file path in the load() command, something like .....load("path-to-part-file.tfrecord.gz", then it loads fine, but just for a part file.
Question: For tfrecords in compressed(gzip) format, does it currently support lazy loading? Specifically:
There are n large gzip files(tfrecord) in my directory, I want to read only 1000 rows of it like:
df = (
spark.read.format("tfrecord")
.option("recordType", "Example")
.option("compression", "gzip")
.load(data_dir)
.limit(1000)
.repartition(10)
)
df.show()
The reading process is very slow and prompts:
first, save file with tfrecord format in compress mode, and read the saved file. the schema inference failed, and the data is empty.
` import org.apache.commons.io.FileUtils import org.apache.spark.sql.{ DataFrame, Row } import org.apache.spark.sql.catalyst.expressions.GenericRow import org.apache.spark.sql.types._
val path = "test-output.tfrecord" val testRows: Array[Row] = Array( new GenericRow(Array[Any](11, 1, 23L, 10.0F, 14.0, List(1.0, 2.0), "r1")), new GenericRow(Array[Any](21, 2, 24L, 12.0F, 15.0, List(2.0, 2.0), "r2"))) val schema = StructType(List(StructField("id", IntegerType), StructField("IntegerCol", IntegerType), StructField("LongCol", LongType), StructField("FloatCol", FloatType), StructField("DoubleCol", DoubleType), StructField("VectorCol", ArrayType(DoubleType, true)), StructField("StringCol", StringType)))
val rdd = spark.sparkContext.parallelize(testRows)
val compressMode = "org.apache.hadoop.io.compress.GzipCodec"
//Save DataFrame as TFRecords val df: DataFrame = spark.createDataFrame(rdd, schema) df.write.format("tfrecord").option("recordType", "Example").option("codec", compressMode).save(path)
//Read TFRecords into DataFrame. //The DataFrame schema is inferred from the TFRecords if no custom schema is provided. val importedDf1: DataFrame = spark.read.format("tfrecord").option("recordType", "Example").option("codec", compressMode).load(path) importedDf1.show()
++
`