Netflix / iceberg

Iceberg is a table format for large, slow-moving tabular data
Apache License 2.0
472 stars 59 forks source link

Cannot convert unknown primitive type: required int96 `timestamp` #103

Closed fbocse closed 5 years ago

fbocse commented 5 years ago

Writing to parquet a very basic collection such as...

    List<java.sql.Timestamp> data = new ArrayList<>();
    data.add(java.sql.Timestamp.valueOf("1999-07-14 20:30:00"));
    data.add(java.sql.Timestamp.valueOf("2014-11-23 20:30:00"));

and generating the Iceberg schema from the Spark schema with com.netflix.iceberg.spark.SparkSchemaUtil#convert(org.apache.spark.sql.types.StructType)

then trying to load data from disk using "iceberg" format I get

Cannot convert unknown primitive type: required int96 timestamp

Should I put together a more comprehensive integration test on this? Counting on this not being an issue and it's just something that I'm missing here 👍

Stack trace

java.lang.UnsupportedOperationException: Unsupported primitive type: INT96
    at com.netflix.iceberg.parquet.ColumnIterator.newIterator(ColumnIterator.java:77)
    at com.netflix.iceberg.parquet.ParquetValueReaders$PrimitiveReader.<init>(ParquetValueReaders.java:114)
    at com.netflix.iceberg.parquet.ParquetValueReaders$UnboxedReader.<init>(ParquetValueReaders.java:136)
    at com.netflix.iceberg.spark.data.SparkParquetReaders$ReadBuilder.primitive(SparkParquetReaders.java:238)
    at com.netflix.iceberg.spark.data.SparkParquetReaders$ReadBuilder.primitive(SparkParquetReaders.java:111)
    at com.netflix.iceberg.parquet.TypeWithSchemaVisitor.visit(TypeWithSchemaVisitor.java:49)
    at com.netflix.iceberg.parquet.TypeWithSchemaVisitor.visitField(TypeWithSchemaVisitor.java:151)
    at com.netflix.iceberg.parquet.TypeWithSchemaVisitor.visitFields(TypeWithSchemaVisitor.java:165)
    at com.netflix.iceberg.parquet.TypeWithSchemaVisitor.visit(TypeWithSchemaVisitor.java:44)
    at com.netflix.iceberg.spark.data.SparkParquetReaders.buildReader(SparkParquetReaders.java:77)
    at com.netflix.iceberg.spark.source.Reader$TaskDataReader.lambda$newParquetIterable$1(Reader.java:449)
    at com.netflix.iceberg.parquet.ParquetReader$ReadConf.<init>(ParquetReader.java:88)
    at com.netflix.iceberg.parquet.ParquetReader.init(ParquetReader.java:171)
    at com.netflix.iceberg.parquet.ParquetReader.iterator(ParquetReader.java:182)
    at com.netflix.iceberg.spark.source.Reader$TaskDataReader.open(Reader.java:429)
    at com.netflix.iceberg.spark.source.Reader$TaskDataReader.open(Reader.java:352)
    at com.netflix.iceberg.spark.source.Reader$TaskDataReader.<init>(Reader.java:288)
    at com.netflix.iceberg.spark.source.Reader$ReadTask.createDataReader(Reader.java:249)
    at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD.compute(DataSourceRDD.scala:42)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
rdblue commented 5 years ago

@fbocse, INT96 timestamps are not supported in the Iceberg spec. Iceberg has strict requirements about how types are stored to guarantee interoperability and INT96 timestamps don't meet those standards.

Instead of writing data with that timestamp format, you can use the Spark integration or Iceberg helper methods to produce the data files.

In Spark, you'd do this:

df.write.format("iceberg").save("hdfs://nn/path/to/table")

With helpers, you can create files directly like this:

appender = Parquet.write(out).forTable(table).createWriterFunc(ParquetAvroWriter::buildWriter).build()
appender.add(record)
appender.close()

Here's an example from tests: https://github.com/Netflix/iceberg/blob/master/spark/src/test/java/com/netflix/iceberg/spark/data/TestParquetAvroWriter.java#L80-L85

fbocse commented 5 years ago

@rdblue thank you very much for the detailed explanation. Looking for relevant literature on this aspect on the net I bumped into this old PR https://github.com/apache/parquet-format/pull/49 where your explanations also seemed very insightful. Loved the ending though :)

Long story short: This is a nightmare.