apache / arrow

Apache Arrow is the universal columnar format and multi-language toolbox for fast data interchange and in-memory analytics
https://arrow.apache.org/
Apache License 2.0
14.56k stars 3.54k forks source link

[C++][Parquet] lz4 codec is not compatible with Hadoop Lz4Codec #42380

Closed asfimport closed 4 years ago

asfimport commented 4 years ago

As described in HADOOP-12990, the Hadoop Lz4Codec uses the lz4 block format, and it prepends 8 extra bytes before the compressed data. I believe that lz4 implementation in parquet-cpp also uses the lz4 block format, but it does not prepend these 8 extra bytes.

 

Using Java parquet-mr, I wrote a Parquet file with lz4 compression:


$ parquet-tools meta /tmp/f4a1c7f57cb1c98c2b9da3b25b16d027df5d2f1cf55adb79374c154fbd79011f
file:        file:/tmp/f4a1c7f57cb1c98c2b9da3b25b16d027df5d2f1cf55adb79374c154fbd79011f
creator:     parquet-mr version 1.10.1 (build a89df8f9932b6ef6633d06069e50c9b7970bebd1)file schema:
--------------------------------------------------------------------------------
c1:          REQUIRED INT64 R:0 D:0
c0:          REQUIRED BINARY R:0 D:0
v0:          REQUIRED INT64 R:0 D:0row group 1: RC:5007 TS:28028 OFFSET:4
--------------------------------------------------------------------------------
c1:           INT64 LZ4 DO:0 FPO:4 SZ:24797/25694/1.04 VC:5007 ENC:DELTA_BINARY_PACKED ST:[min: 1566330126476659000, max: 1571211622650188000, num_nulls: 0]
c0:           BINARY LZ4 DO:0 FPO:24801 SZ:279/260/0.93 VC:5007 ENC:PLAIN,RLE_DICTIONARY ST:[min: 0x7471732F62656566616C6F2F746F6D6163636F2D66782D6D6B74646174612D6C69766573747265616D, max: 0x7471732F62656566616C6F2F746F6D6163636F2D66782D6D6B74646174612D6C69766573747265616D, num_nulls: 0]
v0:           INT64 LZ4 DO:0 FPO:25080 SZ:1348/2074/1.54 VC:5007 ENC:PLAIN,RLE_DICTIONARY ST:[min: 0, max: 9, num_nulls: 0] 

When I attempted to read this file with parquet-cpp, I got the following error:


>>> import pyarrow.parquet as pq
>>> pq.read_table('/tmp/f4a1c7f57cb1c98c2b9da3b25b16d027df5d2f1cf55adb79374c154fbd79011f')
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/skim/miniconda3/envs/arrow/lib/python3.8/site-packages/pyarrow/parquet.py", line 1536, in read_table
    return pf.read(columns=columns, use_threads=use_threads,
  File "/home/skim/miniconda3/envs/arrow/lib/python3.8/site-packages/pyarrow/parquet.py", line 1260, in read
    table = piece.read(columns=columns, use_threads=use_threads,
  File "/home/skim/miniconda3/envs/arrow/lib/python3.8/site-packages/pyarrow/parquet.py", line 707, in read
    table = reader.read(**options)
  File "/home/skim/miniconda3/envs/arrow/lib/python3.8/site-packages/pyarrow/parquet.py", line 336, in read
    return self.reader.read_all(column_indices=column_indices,
  File "pyarrow/_parquet.pyx", line 1130, in pyarrow._parquet.ParquetReader.read_all
  File "pyarrow/error.pxi", line 100, in pyarrow.lib.check_status
OSError: IOError: Corrupt Lz4 compressed data. 

 

https://github.com/apache/arrow/issues/3491 reported incompatibility in the other direction, using Spark (which uses the Hadoop lz4 codec) to read a parquet file that was written with parquet-cpp.

 

Given that the Hadoop lz4 codec has long been in use, and users have accumulated Parquet files that were written with this implementation, I propose changing parquet-cpp to match the Hadoop implementation.

 

See also:

Reporter: Steve M. Kim Assignee: Patrick Pai

Related issues:

Note: This issue was originally created as PARQUET-1878. Please see the migration documentation for further details.

asfimport commented 4 years ago

Wes McKinney / @wesm: [~chairmank] can you also send an e-mail to dev@parquet.apache.org about this? We've been going around in circles on this LZ4 stuff and I think it's time that we fix this up once and for all across the implementations

cc @pitrou @fsaintjacques @xhochy

asfimport commented 4 years ago

Antoine Pitrou / @pitrou: In https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/SnappyCodec.cc , it seems that Hadoop uses the same length prefix with Snappy. Can someone explain why the problem only occurs with LZ4?

asfimport commented 4 years ago

Antoine Pitrou / @pitrou: Responding to myself: parquet-mr doesn't use the Hadoop Snappy codec because:


 * Snappy compression codec for Parquet.  We do not use the default hadoop
 * one since that codec adds a blocking structure around the base snappy compression
 * algorithm.  This is useful for hadoop to minimize the size of compression blocks
 * for their file formats (e.g. SequenceFile) but is undesirable for Parquet since
 * we already have the data page which provides that.

but for some reason it didn't bother to do the same for LZ4. Uh oh :-/

asfimport commented 4 years ago

Wes McKinney / @wesm: Issue resolved by pull request 7789 https://github.com/apache/arrow/pull/7789

asfimport commented 3 years ago

mario luzi: Hello , we just tried latest apache-arrow version 3.0.0 and the write example included in low level api example, but lz4 still seems not compatible with Hadoop . we got this error reading over hadoop file parquet produced with 3.0.0 library  :

 [leal@sulu parquet]$ ./hadoop-3.2.2/bin/hadoop jar apache-parquet-1.11.1/parquet-tools/target/parquet-tools-1.11.1.jar head --debug parquet_2_0_example2.parquet 2021-02-04 21:24:36,354 INFO hadoop.InternalParquetRecordReader: RecordReader initialized will read a total of 1500001 records. 2021-02-04 21:24:36,355 INFO hadoop.InternalParquetRecordReader: at row 0. reading next block 2021-02-04 21:24:36,397 INFO compress.CodecPool: Got brand-new decompressor [.lz4] 2021-02-04 21:24:36,410 INFO hadoop.InternalParquetRecordReader: block read in memory in 55 ms. row count = 434436 org.apache.parquet.io.ParquetDecodingException: Can not read value at 0 in block -1 in file [file:/home/leal/parquet/parquet_2_0_example2.parquet|file://home/leal/parquet/parquet_2_0_example2.parquet] at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:255) at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:132) at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:136) at org.apache.parquet.tools.command.HeadCommand.execute(HeadCommand.java:87) at org.apache.parquet.tools.Main.main(Main.java:223) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.hadoop.util.RunJar.run(RunJar.java:323) at org.apache.hadoop.util.RunJar.main(RunJar.java:236) Caused by: java.lang.IllegalArgumentException at java.nio.Buffer.limit(Buffer.java:275) at org.apache.hadoop.io.compress.lz4.Lz4Decompressor.decompress(Lz4Decompressor.java:232) at org.apache.hadoop.io.compress.BlockDecompressorStream.decompress(BlockDecompressorStream.java:88) at org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:105) at java.io.DataInputStream.readFully(DataInputStream.java:195)   any advice ? we need to write Lz4 files by C++ and read oover Hadoop jobs but still stuck on this problem .   

asfimport commented 3 years ago

Antoine Pitrou / @pitrou: [~mario.luzi] I suggest you open a new issue for the Java implementation and attach the file there.

asfimport commented 3 years ago

mario luzi: @pitrou  would you be so kind to  tell me what it is the right java project for this ? are you sure that problem is there and not anymore here ? looking to this arrow parquet code i found the Codec LZ4_hadoop , so i suppose someone added it and saw it working before put this ticket as resolved.....but i'm not able to read LZ4 file from hadoop . 

asfimport commented 3 years ago

Antoine Pitrou / @pitrou: [~mario.luzi] Just create a new issue on this tracker, but be sure to select "parquet-mr" as component.