NVIDIA / spark-rapids

Spark RAPIDS plugin - accelerate Apache Spark with GPUs
https://nvidia.github.io/spark-rapids
Apache License 2.0
823 stars 236 forks source link

[BUG] Create parquet table with compression #11416

Open Feng-Jiang28 opened 2 months ago

Feng-Jiang28 commented 2 months ago

When rapids creates a table with 'SNAPPY' compression, it results in an UNCOMPRESSED ColumnMetaData.
You can replace SNAPPY with GZIP or ZSTD and find that you can still get an UNCOMPRESSED ColumnMeraData. Reproduce:

import org.apache.spark.sql.execution.datasources.parquet
import org.apache.parquet.hadoop.{Footer, ParquetFileReader, ParquetFileWriter, ParquetOutputFormat}
import org.apache.hadoop.fs.{FileSystem, Path}

val dirPath = "/home/fejiang/Downloads/compressionTmp"
val hadoopConf = spark.sessionState.newHadoopConf()

val fs = FileSystem.get(hadoopConf)
val path = new Path(dirPath)
fs.mkdirs(path)

spark.sql(
  s"""
     |CREATE TABLE tableName USING Parquet 
     |OPTIONS('path'='/home/fejiang/Downloads/compressionTmp', 'parquet.compression'='SNAPPY') 
     |PARTITIONED BY (p) 
     |AS SELECT 1 AS col1, 2 AS p
   """.stripMargin)

val path2 = new Path("/home/fejiang/Downloads/compressionTmp/p=2")
ParquetFileReader.readAllFootersInParallel(hadoopConf, fs.getFileStatus(path2))

CPU:


scala> import org.apache.spark.sql.execution.datasources.parquet
import org.apache.spark.sql.execution.datasources.parquet

scala> import org.apache.parquet.hadoop.{Footer, ParquetFileReader, ParquetFileWriter, ParquetOutputFormat}
import org.apache.parquet.hadoop.{Footer, ParquetFileReader, ParquetFileWriter, ParquetOutputFormat}

scala> import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.fs.{FileSystem, Path}

scala> val dirPath = "/home/fejiang/Downloads/compressionTmp"
dirPath: String = /home/fejiang/Downloads/compressionTmp

scala> val hadoopConf = spark.sessionState.newHadoopConf()
hadoopConf: org.apache.hadoop.conf.Configuration = Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-rbf-default.xml, hdfs-site.xml, hdfs-rbf-site.xml, __spark_hadoop_conf__.xml

scala> val fs = FileSystem.get(hadoopConf)
fs: org.apache.hadoop.fs.FileSystem = org.apache.hadoop.fs.LocalFileSystem@69c071db

scala> val path = new Path(dirPath)
path: org.apache.hadoop.fs.Path = /home/fejiang/Downloads/compressionTmp

scala> fs.mkdirs(path)
res0: Boolean = true

scala> spark.sql(
     |   s"""
     |      |CREATE TABLE tableName USING Parquet 
     |      |OPTIONS('path'='/home/fejiang/Downloads/compressionTmp', 'parquet.compression'='SNAPPY') 
     |      |PARTITIONED BY (p) 
     |      |AS SELECT 1 AS col1, 2 AS p
     |    """.stripMargin)
res1: org.apache.spark.sql.DataFrame = []                                       

scala> val path2 = new Path("/home/fejiang/Downloads/compressionTmp/p=2")
path2: org.apache.hadoop.fs.Path = /home/fejiang/Downloads/compressionTmp/p=2

scala> ParquetFileReader.readAllFootersInParallel(hadoopConf, fs.getFileStatus(path2))
warning: one deprecation; for details, enable `:setting -deprecation' or `:replay -deprecation'
res2: java.util.List[org.apache.parquet.hadoop.Footer] =
[Footer{file:/home/fejiang/Downloads/compressionTmp/p=2/part-00000-4299eb68-0f76-4c73-b1ad-05492734f294.c000.snappy.parquet, ParquetMetaData{FileMetaData{schema: message spark_schema {
  required int32 col1;
}
, metadata: {org.apache.spark.version=3.3.0, org.apache.spark.sql.parquet.row.metadata={"type":"struct","fields":[{"name":"col1","type":"integer","nullable":false,"metadata":{}}]}}}, blocks: [BlockMetaData{1, 27 [ColumnMetaData{SNAPPY [col1] required int32 col1  [PLAIN, BIT_PACKED], 4}]}]}}]

GPU:

scala> import org.apache.spark.sql.execution.datasources.parquet
import org.apache.spark.sql.execution.datasources.parquet

scala> import org.apache.parquet.hadoop.{Footer, ParquetFileReader, ParquetFileWriter, ParquetOutputFormat}
import org.apache.parquet.hadoop.{Footer, ParquetFileReader, ParquetFileWriter, ParquetOutputFormat}

scala> import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.fs.{FileSystem, Path}

scala> val dirPath = "/home/fejiang/Downloads/compressionTmp"
dirPath: String = /home/fejiang/Downloads/compressionTmp

scala> val hadoopConf = spark.sessionState.newHadoopConf()
hadoopConf: org.apache.hadoop.conf.Configuration = Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-rbf-default.xml, hdfs-site.xml, hdfs-rbf-site.xml, __spark_hadoop_conf__.xml

scala> val fs = FileSystem.get(hadoopConf)
fs: org.apache.hadoop.fs.FileSystem = org.apache.hadoop.fs.LocalFileSystem@2375cb0e

scala> val path = new Path(dirPath)
path: org.apache.hadoop.fs.Path = /home/fejiang/Downloads/compressionTmp

scala> fs.mkdirs(path)
res0: Boolean = true

scala> spark.sql(
     |   s"""
     |      |CREATE TABLE tableName USING Parquet 
     |      |OPTIONS('path'='/home/fejiang/Downloads/compressionTmp', 'parquet.compression'='SNAPPY') 
     |      |PARTITIONED BY (p) 
     |      |AS SELECT 1 AS col1, 2 AS p
     |    """.stripMargin)
24/09/02 11:42:17 WARN GpuOverrides: 
    ! <RDDScanExec> cannot run on GPU because GPU does not currently support the operator class org.apache.spark.sql.execution.RDDScanExec

24/09/02 11:42:18 WARN GpuOverrides:                                            
! <CommandResultExec> cannot run on GPU because GPU does not currently support the operator class org.apache.spark.sql.execution.CommandResultExec

res1: org.apache.spark.sql.DataFrame = []

scala> val path2 = new Path("/home/fejiang/Downloads/compressionTmp/p=2")
path2: org.apache.hadoop.fs.Path = /home/fejiang/Downloads/compressionTmp/p=2

scala> ParquetFileReader.readAllFootersInParallel(hadoopConf, fs.getFileStatus(path2))
warning: one deprecation; for details, enable `:setting -deprecation' or `:replay -deprecation'
res2: java.util.List[org.apache.parquet.hadoop.Footer] =
[Footer{file:/home/fejiang/Downloads/compressionTmp/p=2/part-00000-156af91b-b43e-4b45-ae0b-5f4f950af591.c000.snappy.parquet, ParquetMetaData{FileMetaData{schema: message schema {
  required int32 col1;
}
, metadata: {org.apache.spark.version=3.3.0, org.apache.spark.sql.parquet.row.metadata={"type":"struct","fields":[{"name":"col1","type":"integer","nullable":false,"metadata":{}}]}}}, blocks: [BlockMetaData{1, 21 [ColumnMetaData{UNCOMPRESSED [col1] required int32 col1  [PLAIN], 4}]}]}}]
mattahrens commented 2 months ago

Note that cudf is capable of writing uncompressed data when the compressed version would be larger which can happen in exception cases with random data.

jlowe commented 2 months ago

It looks like the issue is with the test. The data being compressed is a single value in each column, and that value will not compress well. The compressed version will be larger than the uncompressed version. libcudf's write code on the GPU will not use compression when the compressed data is larger than the uncompressed data.

I recommend updating the test to use data that is expected to produce a smaller size when compressed, and see if the issue persists afterwards.

Feng-Jiang28 commented 2 months ago

I tried to increase the row size of the table, then compress the table with "SNAPPY" option, then it shows that SNAPPY in metadata.

import csv

# Define the data
data = [{"col1": i, "p": i % 2 } for i in range(1, 1001)]  # Creates 100 rows

# Specify the file name
csv_file = "data.csv"

# Write to the CSV file
with open(csv_file, mode='w', newline='') as file:
    writer = csv.DictWriter(file, fieldnames=["col1", "p"])
    writer.writeheader()
    writer.writerows(data)

print(f"CSV file '{csv_file}' created successfully.")
import org.apache.spark.sql.execution.datasources.parquet
import org.apache.parquet.hadoop.{Footer, ParquetFileReader, ParquetFileWriter, ParquetOutputFormat}
import org.apache.hadoop.fs.{FileSystem, Path}

val df = spark.read.option("header", "true").csv("/home/fejiang/Downloads/data.csv")

(df.write
  .mode("overwrite")
  .format("parquet")
  .option("path", "/home/fejiang/Downloads/compressionTmp")
  .option("parquet.compression", "SNAPPY")
  .partitionBy("p")
  .saveAsTable("tableName")
)

val path2 = new Path("/home/fejiang/Downloads/compressionTmp/p=1")
ParquetFileReader.readAllFootersInParallel(hadoopConf, fs.getFileStatus(path2))
metadata: {org.apache.spark.version=3.3.0, org.apache.spark.sql.parquet.row.metadata={"type":"struct","fields":[{"name":"col1","type":"string","nullable":true,"metadata":{}}]}}}, blocks: [BlockMetaData{500, 3472 [ColumnMetaData{SNAPPY [col1] optional binary col1 (STRING)  [PLAIN, RLE], 4}]}]}}]
GaryShen2008 commented 2 months ago

So, let's adjust the test case to use more data to verify the SNAPPY compression.(means excluding the original test case, and create a new case by ourselves.)