apache / parquet-java

Apache Parquet Java
https://parquet.apache.org/
Apache License 2.0
2.49k stars 1.38k forks source link

Encrypting only one field in nested field prevents reading of other fields in nested field without keys #1649

Open asfimport opened 1 year ago

asfimport commented 1 year ago

Hi Team,

While exploring parquet encryption, it is found that, if a field in nested column is encrypted , and If I want to read this parquet directory from other applications which does not have encryption keys to decrypt it, I cannot read the remaining fields of the nested column without keys. 

Example 

`


case class nestedItem(ic: Int = 0, sic : Double, pc: Int = 0)
case class SquareItem(int_column: Int, square_int_column : Double, partitionCol: Int, nestedCol :nestedItem)
`

In the case class SquareItem , nestedCol field is nested field and I want to encrypt a field ic within it.    I also want the footer to be non encrypted , so that I can use the encrypted parquet file by legacy applications.    Encryption is successful, however, when I query the parquet file using spark 3.3.0 without having any configuration for parquet encryption set up , I cannot non encrypted fields of nestedCol sic. I was expecting that only nestedCol ic field will not be querable.     Reproducer.  Spark 3.3.0 Using Spark-shell  Downloaded the file parquet-hadoop-1.12.0-tests.jar and added it to spark-jars folder Code to create encrypted data. #  

 


sc.hadoopConfiguration.set("parquet.crypto.factory.class" ,"org.apache.parquet.crypto.keytools.PropertiesDrivenCryptoFactory")

sc.hadoopConfiguration.set("parquet.encryption.kms.client.class" ,"org.apache.parquet.crypto.keytools.mocks.InMemoryKMS")

sc.hadoopConfiguration.set("parquet.encryption.key.list","key1a: BAECAwQFBgcICQoLDA0ODw==, key2a: BAECAAECAAECAAECAAECAA==, keyz: BAECAAECAAECAAECAAECAA==")

sc.hadoopConfiguration.set("parquet.encryption.key.material.store.internally","false")

val encryptedParquetPath = "/tmp/par_enc_footer_non_encrypted"
valpartitionCol = 1
case class nestedItem(ic: Int = 0, sic : Double, pc: Int = 0)
case class SquareItem(int_column: Int, square_int_column : Double, partitionCol: Int, nestedCol :nestedItem)
val dataRange = (1 to 100).toList
val squares = sc.parallelize(dataRange.map(i => new SquareItem(i, scala.math.pow(i,2), partitionCol,nestedItem(i,i))))
squares.toDS().show()
squares.toDS().write.partitionBy("partitionCol").mode("overwrite").option("parquet.encryption.column.keys", "key1a:square_int_column,nestedCol.ic;").option("parquet.encryption.plaintext.footer",true).option("parquet.encryption.footer.key", "keyz").parquet(encryptedParquetPath)

Code to read the data trying to access non encrypted nested field by opening a new spark-shell  


val encryptedParquetPath = "/tmp/par_enc_footer_non_encrypted"
spark.sqlContext.read.parquet(encryptedParquetPath).createOrReplaceTempView("test")
spark.sql("select nestedCol.sic from test").show()

As you can see that nestedCol.sic is not encrypted , I was expecting the results, but I get the below error  


Caused by: org.apache.parquet.crypto.ParquetCryptoRuntimeException: [square_int_column]. Null File Decryptor
  at org.apache.parquet.hadoop.metadata.EncryptedColumnChunkMetaData.decryptIfNeeded(ColumnChunkMetaData.java:602)
  at org.apache.parquet.hadoop.metadata.ColumnChunkMetaData.getEncodings(ColumnChunkMetaData.java:348)
  at org.apache.parquet.hadoop.ParquetRecordReader.checkDeltaByteArrayProblem(ParquetRecordReader.java:191)
  at org.apache.parquet.hadoop.ParquetRecordReader.initializeInternalReader(ParquetRecordReader.java:177)
  at org.apache.parquet.hadoop.ParquetRecordReader.initialize(ParquetRecordReader.java:140)
  at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.$anonfun$buildReaderWithPartitionValues$1(ParquetFileFormat.scala:375)
  at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:209)
  at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:270)
  at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:116)
  at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
  at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
  at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
  at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:364)
  at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
  at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
  at org.apache.spark.scheduler.Task.run(Task.scala:136)
  at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
  at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
  at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
  at java.base/java.lang.Thread.run(Thread.java:833)

Reporter: Vignesh Nageswaran

Related issues:

Note: This issue was originally created as PARQUET-2193. Please see the migration documentation for further details.

asfimport commented 1 year ago

Gidon Gershinsky / @ggershinsky: Hmm, looks like this method runs over all columns, projected and not projected: org.apache.parquet.hadoop.ParquetRecordReader.checkDeltaByteArrayProblem(ParquetRecordReader.java:191)  

Please check if setting "parquet.split.files" to "false" solves this problem.

asfimport commented 1 year ago

Vignesh Nageswaran: @ggershinsky  thanks sir, it worked. Could you also please help me to understand  about any adverse effects of setting is parameter 

asfimport commented 1 year ago

Gidon Gershinsky / @ggershinsky: Welcome.

From the sound of it, this might require each file to be processed by one thread only (instead of reading a single file by multiple threads); which should be ok in typical usecases where one thread/executor reads multiple files anyway. But I'll have a deeper look at this.

asfimport commented 1 year ago

Vignesh Nageswaran: @ggershinsky Sir, could you please let us know will there be any permanent fix, without setting the parameter parquet.split.files to false 

asfimport commented 1 year ago

Gidon Gershinsky / @ggershinsky: Yep, sorry about the delay. This turned out to be more challenging than I hoped; a fix at the encryption code level will require some changes in the format specification.. A rather big deal, and likely unjustified in this case. The immediate trigger is the checkDeltaByteArrayProblem verification, added 8 years ago to detect encoding irregularities in older files.  For some reason this check is done only on files with nested columns, and not on files with regular columns (at least in Spark). Maybe the right thing today is to remove that verification. I'll check with the community.

asfimport commented 1 year ago

Gidon Gershinsky / @ggershinsky: [~Nageswaran] A couple of updates on this.

We should be able to skip this verification for encrypted files, a pull request is sent to parquet-mr.

Also, I've tried the new Spark 3.4.0 (as is, no modifications) with the scala test above - no exception was thrown. Probably, the updated Spark code bypasses the problematic parquet read path. Can you check if Spark 3.4.0 works ok for your usecase.

asfimport commented 1 year ago

Vignesh Nageswaran: @ggershinsky  sorry for late reply. Yes sir spark 3.4.0 code works without setting the parameter parquet.split.files to false.  Thanks for raising a PR to skip the verification for encrypted files.