mjakubowski84 / parquet4s

Read and write Parquet in Scala. Use Scala classes as schema. No need to start a cluster.
https://mjakubowski84.github.io/parquet4s/
MIT License
282 stars 66 forks source link

ProjectionSchema self-inconsistency with partitioned source #354

Open dpogibelskiy opened 1 month ago

dpogibelskiy commented 1 month ago

Hi,

I have faced self-inconsistency with handling of projection schema.

When reading partitioned parquet using ParquetReader.projectedGeneric(expectedSchema).options(...).read the output rows contain partitioning columns even if expectedSchema doesn't.

When reading not-partitioned parquet the output rows contain columns listed in expectedSchema.

I am not sure what the "proper" behavior is, but observed one looks not self-consistent.

Parquet4s version 2.18.0 Here is a test case:

val rows = List(
      Row("a_value", "b_value1", "c_value1"),
      Row("a_value", "b_value1", null)
    )
    val df = spark.createDataFrame(
      rows.asJava,
      StructType(
        Array(
          StructField("a", StringType),
          StructField("b", StringType),
          StructField("c", StringType)
        )
      )
    )
    df.write.parquet("s3a://sample-bucket/p0")
    df.write.partitionBy("a").parquet("s3a://sample-bucket/p1")

    val expectedSchema = new MessageType(
      "root",
      List[Type](
        Types
          .primitive(PrimitiveTypeName.BINARY, Repetition.OPTIONAL)
          .as(LogicalTypeAnnotation.stringType())
          .named("c")
      ).asJava
    )

    def showP4S(path: String): List[String] = {
      Using(
        ParquetReader
          .projectedGeneric(expectedSchema)
          .options(opt)
          .read(Path(new HPath(path)))
      )(_.iterator.map { row =>
        row.foldLeft("") { case (str, (k, v)) => str + s"$k = $v, " }
      }.toList).get
    }

    println("p4s not-partitioned")
    showP4S("s3a://sample-bucket/p0").foreach(println)
    println("p4s partitioned")
    showP4S("s3a://sample-bucket/p1").foreach(println)

The output is:

p4s not-partitioned
c = BinaryValue(Binary{8 constant bytes, [99, 95, 118, 97, 108, 117, 101, 49]}), 
c = NullValue, 
p4s partitioned
c = BinaryValue(Binary{8 constant bytes, [99, 95, 118, 97, 108, 117, 101, 49]}), a = BinaryValue(Binary{"a_value"}), 
c = NullValue, a = BinaryValue(Binary{"a_value"}), 
mjakubowski84 commented 1 month ago

That's pretty expected behaviour, because the expectedSchema is pushed down and used for reading files. The partition value comes automatically from the partition - not from a file.

dpogibelskiy commented 1 month ago

Thank you for answer and explanations.

I have looked through code and I understand how this "unexpected" value comes into result row. My question is wouldn't it be better to refer to projection schema (if any) in com.github.mjakubowski84.parquet4s.ParquetReader.BuilderImpl#setPartitionValues? Since parquet4s silently deals with partitioning the caller sometimes doesn't have any idea whether the data comes from single file/flat-partitioned folder/tree-like-partitioned folder/whaetver. Yes, it is pretty easy to filter out unneeded columns afterwards, but this approach looks like workaround.

mjakubowski84 commented 1 month ago

The issue is more with the other way around. If the user wants to see partition values in the output they have to specify it in the schema. And then Parguet4s has to remove this column from the projection when reading files. This removal mechanism is already implemented, IIRC, so it should be feasible. However, I am more worried about backwards compatibility. Still... providing a schema explicitly for generic records is quite an esoteric use-case for Parquet4s, therefore we can consider changing this behaviour.