apache / parquet-java

Apache Parquet Java
https://parquet.apache.org/
Apache License 2.0
2.65k stars 1.41k forks source link

Parquet-avro cannot decode Avro/Thrift array of primitive array (e.g. array<array<int>>) #1880

Closed asfimport closed 9 years ago

asfimport commented 9 years ago

The problematic Avro and Thrift schemas are:

record AvroArrayOfArray {
  array<array<int>> int_arrays_column;
}

and

struct ThriftListOfList {
  1: list<list<i32>> intArraysColumn;
}

They are converted to the following structurally equivalent Parquet schemas by parquet-avro 1.7.0 and parquet-thrift 1.7.0 respectively:

message AvroArrayOfArray {
  required group int_arrays_column (LIST) {
    repeated group array (LIST) {
      repeated int32 array;
    }
  }
}

and

message ParquetSchema {
  required group intListsColumn (LIST) {
    repeated group intListsColumn_tuple (LIST) {
      repeated int32 intListsColumn_tuple_tuple;
    }
  }
}

AvroIndexedRecordConverter cannot decode such records correctly. The reason is that the 2nd level repeated group array doesn't pass AvroIndexedRecordConverter.isElementType() check. We should check for field name "array" and field name suffix "_thrift" in isElementType() to fix this issue.

Reporter: Cheng Lian / @liancheng Assignee: Ryan Blue / @rdblue

Related issues:

Note: This issue was originally created as PARQUET-364. Please see the migration documentation for further details.

asfimport commented 9 years ago

Cheng Lian / @liancheng: Tried to write a test case in parquet-mr, but fail to build parquet-mr locally on OSX 10.10 because of some environment issue. Verified this bug while fixing SPARK-10136, which is the Spark version of this bug. And here is a Spark SQL ParquetAvroCompatibilitySuite test case for reproducing this issue:

  test("PARQUET-364 avro array of primitive array") {
    withTempPath { dir =>
      val path = dir.getCanonicalPath

      val records = (0 until 3).map { i =>
        AvroArrayOfArray.newBuilder()
          .setIntArraysColumn(
            Seq.tabulate(3, 3)((j, k) => i + j * 3 + k: Integer).map(_.asJava).asJava)
          .build()
      }

      val writer = new AvroParquetWriter[AvroArrayOfArray](
        new Path(path), AvroArrayOfArray.getClassSchema)
      records.foreach(writer.write)
      writer.close()

      val reader = AvroParquetReader.builder[AvroArrayOfArray](new Path(path)).build()
      assert((0 until 10).map(_ => reader.read()) === records)
    }
  }

Exception:

[info] - PARQUET-364 avro array of primitive array *** FAILED *** (428 milliseconds)
[info]   java.lang.ClassCastException: repeated int32 array is not a group
[info]   at org.apache.parquet.schema.Type.asGroupType(Type.java:202)
[info]   at org.apache.parquet.avro.AvroIndexedRecordConverter.newConverter(AvroIndexedRecordConverter.java:144)
[info]   at org.apache.parquet.avro.AvroIndexedRecordConverter.access$200(AvroIndexedRecordConverter.java:42)
[info]   at org.apache.parquet.avro.AvroIndexedRecordConverter$AvroArrayConverter$ElementConverter.<init>(AvroIndexedRecordConverter.java:548)
[info]   at org.apache.parquet.avro.AvroIndexedRecordConverter$AvroArrayConverter.<init>(AvroIndexedRecordConverter.java:480)
[info]   at org.apache.parquet.avro.AvroIndexedRecordConverter.newConverter(AvroIndexedRecordConverter.java:144)
[info]   at org.apache.parquet.avro.AvroIndexedRecordConverter.<init>(AvroIndexedRecordConverter.java:89)
[info]   at org.apache.parquet.avro.AvroIndexedRecordConverter.<init>(AvroIndexedRecordConverter.java:60)
[info]   at org.apache.parquet.avro.AvroRecordMaterializer.<init>(AvroRecordMaterializer.java:34)
[info]   at org.apache.parquet.avro.AvroReadSupport.prepareForRead(AvroReadSupport.java:111)
[info]   at org.apache.parquet.hadoop.InternalParquetRecordReader.initialize(InternalParquetRecordReader.java:174)
[info]   at org.apache.parquet.hadoop.ParquetReader.initReader(ParquetReader.java:151)
[info]   at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:127)
[info]   at org.apache.spark.sql.execution.datasources.parquet.ParquetAvroCompatibilitySuite$$anonfun$5$$anonfun$apply$mcV$sp$4$$anonfun$13.apply(ParquetAvroCompatibilitySuite.scala:186)
[info]   at org.apache.spark.sql.execution.datasources.parquet.ParquetAvroCompatibilitySuite$$anonfun$5$$anonfun$apply$mcV$sp$4$$anonfun$13.apply(ParquetAvroCompatibilitySuite.scala:186)
[info]   at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
[info]   at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
[info]   at scala.collection.immutable.Range.foreach(Range.scala:141)
[info]   at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
[info]   at scala.collection.AbstractTraversable.map(Traversable.scala:105)
[info]   at org.apache.spark.sql.execution.datasources.parquet.ParquetAvroCompatibilitySuite$$anonfun$5$$anonfun$apply$mcV$sp$4.apply(ParquetAvroCompatibilitySuite.scala:186)
[info]   at org.apache.spark.sql.execution.datasources.parquet.ParquetAvroCompatibilitySuite$$anonfun$5$$anonfun$apply$mcV$sp$4.apply(ParquetAvroCompatibilitySuite.scala:170)
[info]   at org.apache.spark.sql.test.SQLTestUtils$class.withTempPath(SQLTestUtils.scala:117)
[info]   at org.apache.spark.sql.execution.datasources.parquet.ParquetCompatibilityTest.withTempPath(ParquetCompatibilityTest.scala:31)
[info]   at org.apache.spark.sql.execution.datasources.parquet.ParquetAvroCompatibilitySuite$$anonfun$5.apply$mcV$sp(ParquetAvroCompatibilitySuite.scala:170)
[info]   at org.apache.spark.sql.execution.datasources.parquet.ParquetAvroCompatibilitySuite$$anonfun$5.apply(ParquetAvroCompatibilitySuite.scala:170)
[info]   at org.apache.spark.sql.execution.datasources.parquet.ParquetAvroCompatibilitySuite$$anonfun$5.apply(ParquetAvroCompatibilitySuite.scala:170)
[info]   at org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22)
[info]   at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
[info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:20)
[info]   at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:166)
[info]   at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:42)
[info]   at org.scalatest.FunSuiteLike$class.invokeWithFixture$1(FunSuiteLike.scala:163)
[info]   at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175)
[info]   at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175)
[info]   at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
[info]   at org.scalatest.FunSuiteLike$class.runTest(FunSuiteLike.scala:175)
[info]   at org.scalatest.FunSuite.runTest(FunSuite.scala:1555)
[info]   at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:208)
[info]   at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:208)
[info]   at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:413)
[info]   at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:401)
[info]   at scala.collection.immutable.List.foreach(List.scala:318)
[info]   at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
[info]   at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:396)
[info]   at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:483)
[info]   at org.scalatest.FunSuiteLike$class.runTests(FunSuiteLike.scala:208)
[info]   at org.scalatest.FunSuite.runTests(FunSuite.scala:1555)
[info]   at org.scalatest.Suite$class.run(Suite.scala:1424)
[info]   at org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1555)
[info]   at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:212)
[info]   at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:212)
[info]   at org.scalatest.SuperEngine.runImpl(Engine.scala:545)
[info]   at org.scalatest.FunSuiteLike$class.run(FunSuiteLike.scala:212)
[info]   at org.apache.spark.sql.execution.datasources.parquet.ParquetCompatibilityTest.org$scalatest$BeforeAndAfterAll$$super$run(ParquetCompatibilityTest.scala:31)
[info]   at org.scalatest.BeforeAndAfterAll$class.liftedTree1$1(BeforeAndAfterAll.scala:257)
[info]   at org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:256)
[info]   at org.apache.spark.sql.execution.datasources.parquet.ParquetCompatibilityTest.run(ParquetCompatibilityTest.scala:31)
[info]   at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:462)
[info]   at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:671)
[info]   at sbt.ForkMain$Run$2.call(ForkMain.java:294)
[info]   at sbt.ForkMain$Run$2.call(ForkMain.java:284)
[info]   at java.util.concurrent.FutureTask.run(FutureTask.java:262)
[info]   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
[info]   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
[info]   at java.lang.Thread.run(Thread.java:745)
asfimport commented 9 years ago

Cheng Lian / @liancheng: Although I haven't verified it yet, I suspect parquet-thrift suffers from a similar issue, e.g. cannot decode Parquet records translated from Thrift structure like list<list<i32>>.

asfimport commented 9 years ago

Cheng Lian / @liancheng: Verified that parquet-avro doesn't correctly decode Parquet records generated by parquet-thrift with Thrift type list<list<i32>> either.

asfimport commented 9 years ago

Cheng Lian / @liancheng: I tested the Thrift case with Thrift 0.9.2 because I can't get Thrift 0.7.0 compiled on Mac OS X 10.10 because of lacking proper C++ header files. I assume that this doesn't change the essence of this issue.

(BTW, any plan to upgrade to Thrift 0.9.2?)

asfimport commented 9 years ago

Cheng Lian / @liancheng: Attached Parquet files generated by parquet-avro and parquet-thrift 1.7.0 with the described schemas.

asfimport commented 9 years ago

Cheng Lian / @liancheng: @rdblue The suggested fix has been verified by Spark PR #8361. I'd like to deliver a PR for parquet-mr, but hit some local build issues. Please feel free to assign this issue to others.

asfimport commented 9 years ago

Ryan Blue / @rdblue: Thanks, [~lian cheng]! I'll have a look at this when I get a chance.

asfimport commented 9 years ago

Cheng Lian / @liancheng: Sent out a PR https://github.com/apache/parquet-mr/pull/264

asfimport commented 9 years ago

Ryan Blue / @rdblue: Linking to PARQUET-212, which should ensure it tests these cases.

asfimport commented 9 years ago

Ryan Blue / @rdblue: This is now fixed in master.