apache / arrow

Apache Arrow is the universal columnar format and multi-language toolbox for fast data interchange and in-memory analytics
https://arrow.apache.org/
Apache License 2.0
14.59k stars 3.54k forks source link

parquet lz4 interop with spark appears broken #3491

Closed pwais closed 5 years ago

pwais commented 5 years ago

Looks like a bug to me. Here's a simple script:

if __name__ == '__main__':
  rows = [{'foo': 1, 'bar': 2}, {'foo': 3, 'bar': 4}]

  import pandas as pd
  import pyarrow as pa
  import pyarrow.parquet as pq
  print pa.__version__

  df = pd.DataFrame(rows)
  table = pa.Table.from_pandas(df)
  pq.write_to_dataset(
        table,
        '/tmp/rows',
        preserve_index=False,
        compression='lz4',
        flavor='spark')

  import findspark
  findspark.init()
  import pyspark
  import pyspark.sql
  print pyspark.__version__
  spark = pyspark.sql.SparkSession.builder.master('local[1]').getOrCreate()
  df = spark.read.parquet('/tmp/rows')
  df.show()

Running it gives this error:

# docker run --rm -it au2018/env:v1.2 bash

$ python script.py
0.11.1
2.4.0
( blah blah blah )
2019-01-26 05:33:14 ERROR Executor:91 - Exception in task 0.0 in stage 1.0 (TID 1)
java.io.IOException: could not decode the dictionary for [bar] optional int64 bar
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.<init>(VectorizedColumnReader.java:121)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.checkEndOfRowGroup(VectorizedParquetRecordReader.java:312)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:256)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:159)
    at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:181)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101)
...

If we change lz4 to snappy then the output is as expected:

+---+---+-----------------+
|bar|foo|__index_level_0__|
+---+---+-----------------+
|  2|  1|                0|
|  4|  3|                1|
+---+---+-----------------+

There's clearly a disagreement here of some sort. Since I believe Spark uses the official Java distro of Parquet, I'm inclined to report the issue here as a bug, though I could certainly understand if by now Arrow is regarded as HEAD and Java Parquet is behind. FWIW au2018/env:v1.2 is a public image.

wesm commented 5 years ago

Can you open a JIRA issue about this?

LZ4 in parquet-cpp is broken, and has been for a while AFAIK. There was a discussion on the mailing list or JIRA as I recall

@xhochy @majetideepak we should disable LZ4 until we can run integration tests to check for compatibility. Thoughts?

xhochy commented 5 years ago

Yes, we should disable LZ4. I think the problem surfaced already some time ago that one of the implementation is using the framed format and the other one the non-framed and thus are incompatible.

majetideepak commented 5 years ago

+1. I also find the lack of compatibility checks among various writers disturbing. We hit into another (statistics) issue last week with a file written by the newer versions of Impala not being backward compatible with a slightly older parquet-cpp reader. Some details are the very bottom of this commit https://github.com/apache/impala/commit/9270346825b0bbc7708d458be076d7c26038edfc

wesm commented 5 years ago

Seems there are multiple JIRA issues that need to be associated with this

Could someone make sure these issues are opened and then we can close this issue? Thanks

majetideepak commented 5 years ago

I will take care of the issue tracking.

majetideepak commented 5 years ago

The LZ4 discussion is on this JIRA. https://issues.apache.org/jira/browse/PARQUET-1241 JIRA to disable LZ4 codec: https://issues.apache.org/jira/browse/PARQUET-1515

The JIRA https://issues.apache.org/jira/browse/PARQUET-1118 aims to build a corpus of files so that different Parquet implementations can validate. I think this is the easiest way to achieve compatibility.

pwais commented 5 years ago

Thank you @majetideepak ! Hope the docker example is helpful for a unit test. FWIW here's the Dockerfile that sets up working Spark, Hadoop, & friends: https://github.com/pwais/au2018/blob/14313dd5195a3b516d019edf0c42c672cfca0a76/docker/Dockerfile

I've also tried to get zstd working, but I couldn't get that working even in Spark / Hadoop. I hope whatever effort to fix lz4 might also have a moment to look into zstd.

pwais commented 5 years ago

(bump) I tried pyarrow 0.14 and lz4 support still appears broken. There was a suggestion to "Please either add a new codec or add an option to Lz4Codec to use the framed format" ( https://issues.apache.org/jira/browse/PARQUET-1241 ). Would that bring us closer to a fix? (i.e. feature parity with pyspark / hadoop, which has "supported" lz4 for a while now).

chairmank commented 4 years ago

I believe that PARQUET-1241 ("[C++] Use LZ4 frame format") does not directly address the issue that was reported here, although there is relevant discussion in the comments (like this and this).

The stack trace in the bug report shows an exception thrown by the Spark class org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader, which uses the parquet-mr class org.apache.parquet.hadoop.ParquetFileReader, which uses the Hadoop org.apache.hadoop.io.compress.Lz4Codec class.

As discussed in HADOOP-12990, the Hadoop Lz4Codec uses the lz4 block format, and it prepends 8 extra bytes before the compressed data. I believe that lz4 implementation used by pyarrow.parquet also uses the lz4 block format, but it does not prepend these 8 extra bytes. Reconciling this incompatibility does not require implementing the framed format.

wesm commented 4 years ago

Can you please open a JIRA issue?

chairmank commented 4 years ago

I created https://issues.apache.org/jira/browse/PARQUET-1878.

pwais commented 4 years ago

Thanks guys for finally closing this one up! Not having proper lz4 support is the main reason I don't use pyarrow directly today.