mjakubowski84 / parquet4s

Read and write Parquet in Scala. Use Scala classes as schema. No need to start a cluster.
https://mjakubowski84.github.io/parquet4s/
MIT License
283 stars 65 forks source link

`ParquetReader.projectedGeneric` does not work when selecting more than one column from a same group #319

Closed tyoras closed 1 year ago

tyoras commented 1 year ago

Hello, reading a parquet file with a group is failing when we try to select more than one column from the same group using ParquetReader.projectedGeneric. In this example, we try to only read the id, bar.a and bar.c fields:

import com.github.mjakubowski84.parquet4s.*
import org.apache.parquet.hadoop.ParquetFileWriter
import java.nio.file.Path as NioPath

case class Foo(id: Int, bar: Bar)
case class Bar(a: String, b: String, c: String)
case class Baz(a: String, c: String)

val foos = List(
  Foo(1, Bar("a1", "b1", "c1")),
  Foo(2, Bar("a2", "b2", "c2"))
)

val path = "test.parquet"

val options = ParquetWriter.Options(
  writeMode = ParquetFileWriter.Mode.OVERWRITE
)
val writer = IncrementalParquetWriter[Foo](path, options)

def readBaz(path: NioPath): Iterable[Baz] =
  ParquetReader
    .projectedGeneric(
      Col("id").as[Int].alias("id"),
      Col("bar.a").as[String].alias("a"),
      Col("bar.c").as[String].alias("c")
    )
    .read(Path(path))
    .map(_.as[Baz](ValueCodecConfiguration.Default))

@main
def main(): Unit =
  writer.write(foos)
  writer.close()
  readBaz(NioPath.of(path)).foreach(println)

And it fails to read with this error:

Exception in thread "main" org.apache.parquet.io.ParquetDecodingException: Can not read value at 1 in block 0 in file file:/Users/y.alvarez/Prog/repos/test/test.parquet
    at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:264)
    at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:132)
    at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:136)
    at com.github.mjakubowski84.parquet4s.ParquetIterator.preRead(ParquetIterator.scala:29)
    at com.github.mjakubowski84.parquet4s.ParquetIterator.hasNext(ParquetIterator.scala:34)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:601)
    at scala.collection.Iterator$ConcatIterator.advance$1(Iterator.scala:1182)
    at scala.collection.Iterator$ConcatIterator.hasNext(Iterator.scala:1188)
    at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:583)
    at scala.collection.immutable.List.prependedAll(List.scala:152)
    at scala.collection.immutable.List$.from(List.scala:684)
    at scala.collection.immutable.List$.from(List.scala:681)
    at scala.collection.IterableFactory$Delegate.from(Factory.scala:288)
    at scala.collection.immutable.Iterable$.from(Iterable.scala:35)
    at scala.collection.immutable.Iterable$.from(Iterable.scala:32)
    at scala.collection.IterableFactory$Delegate.from(Factory.scala:288)
    at scala.collection.IterableOps.map(Iterable.scala:682)
    at scala.collection.IterableOps.map$(Iterable.scala:682)
    at com.github.mjakubowski84.parquet4s.etl.CompoundParquetIterable.map(CompoundParquetIterable.scala:6)
    at test.Main$package$.readBaz(Main.scala:31)
    at test.Main$package$.main(Main.scala:37)
    at test.main.main(Main.scala:33)
Caused by: java.lang.IllegalArgumentException: Invalid column projection: "bar.c".
    at com.github.mjakubowski84.parquet4s.RootRowParquetRecordConverter.end$$anonfun$1(ParquetReadSupport.scala:165)
    at scala.runtime.function.JProcedure1.apply(JProcedure1.java:15)
    at scala.runtime.function.JProcedure1.apply(JProcedure1.java:10)
    at scala.collection.immutable.Vector.foreach(Vector.scala:2122)
    at com.github.mjakubowski84.parquet4s.RootRowParquetRecordConverter.end(ParquetReadSupport.scala:167)
    at org.apache.parquet.io.RecordReaderImplementation.read(RecordReaderImplementation.java:419)
    at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:234)
    ... 21 more
mjakubowski84 commented 1 year ago

Thank you for reporting an issue. We can add this particular use case to tests if it doesn't exist yet. However, what troubles me is IncrementalParquetWriter... Which version of Parquet4s are you using? We do not have anything like IncrementalParquetWriter here, at least for a couple of years now.

tyoras commented 1 year ago

We are on version 2.13.0 and indeed IncrementalParquetWriter there is not part of Parquet4s but a utility that we implemented, it is only used to create the parquet file that needs to be read in the example. Sorry for the confusion.

tyoras commented 1 year ago

I've simplified the example and removed our custom IncrementalParquetWriter:

import com.github.mjakubowski84.parquet4s.*
import org.apache.parquet.hadoop.ParquetFileWriter

case class Foo(id: Int, bar: Bar)
case class Bar(a: String, b: String, c: String)
case class Baz(a: String, c: String)

val foos = List(
  Foo(1, Bar("a1", "b1", "c1")),
  Foo(2, Bar("a2", "b2", "c2"))
)

val path = Path("test.parquet")

val options = ParquetWriter.Options(
  writeMode = ParquetFileWriter.Mode.OVERWRITE
)

def readBaz(path: Path): Iterable[Baz] =
  ParquetReader
    .projectedGeneric(
      Col("id").as[Int].alias("id"),
      Col("bar.a").as[String].alias("a"),
      Col("bar.c").as[String].alias("c")
    )
    .read(path)
    .map(_.as[Baz](ValueCodecConfiguration.Default))

@main
def main(): Unit =
  ParquetWriter.of[Foo].options(options).writeAndClose(path, foos)
  readBaz(path).foreach(println)
mjakubowski84 commented 1 year ago

Thanks! All is clear now. You've spotted a nasty bug. The culprit is a mutation in end() function of RootRowParquetRecordConverter. It should be quite easy to fix it. However, I won't be able to have a look at it before the end of this week.

tyoras commented 1 year ago

Thanks a lot to you for your great reactivity 👍