apache / arrow

Apache Arrow is the universal columnar format and multi-language toolbox for fast data interchange and in-memory analytics
https://arrow.apache.org/
Apache License 2.0
14.52k stars 3.53k forks source link

[C++][Parquet] lz4-hadoop doesn't support block compression #43745

Open Polepoint opened 2 months ago

Polepoint commented 2 months ago

Describe the bug, including details regarding any error messages, version, and platform.

platform: Ubuntu 22.04, x86_64 arrow: release-17.0.0-rc2

According to the https://github.com/apache/hadoop/blob/release-3.4.1-RC1/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BlockCompressorStream.java#L82 and https://github.com/apache/hadoop/blob/release-3.4.1-RC1/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/Lz4Codec.java#L92 the lz4-hadoop should be implemented with block stream, which means that the input may be split into blocks, each block will be compressed with lz4. The outputs will be like that - 4-byte big endian uncompressed_size of all blocks - 4-byte big endian compressed_size of the flowing block < lz4 compressed block > - 4-byte big endian compressed_size of the flowing block < lz4 compressed block > - 4-byte big endian compressed_size of the flowing block < lz4 compressed block > ... repeated until uncompressed_size from outer block is consumed ...

The implement of lz4-hadoop decompression in arrow seems only accept one block, as it will return kNotHadoop immediately if the maybe_decompressed_size of the first block is not equal to expected_decompressed_size(actually, it is the size of all blocks's decompressed_size). https://github.com/apache/arrow/blob/release-17.0.0-rc2/cpp/src/arrow/util/compression_lz4.cc#L509

Code Example

Java write

import org.apache.hadoop.fs.Path;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.example.data.GroupFactory;
import org.apache.parquet.example.data.simple.SimpleGroupFactory;
import org.apache.parquet.hadoop.ParquetFileWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.example.ExampleParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.MessageTypeParser;

import java.io.IOException;

public class WriteParquetFileExample {
    public static void main(String[] args) {
        MessageType schema = MessageTypeParser.parseMessageType(
                "message Pair {\n" +
                        "  required binary field1;\n" +
                        "  required int32 field2;\n" +
                        "}"
        );

        GroupFactory factory = new SimpleGroupFactory(schema);
        ParquetWriter<Group> writer;
        try {
            writer = ExampleParquetWriter.builder(new Path("/path/to/example.parquet"))
                    .withWriteMode(ParquetFileWriter.Mode.CREATE)
                    .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_1_0)
                    .withCompressionCodec(CompressionCodecName.LZ4)
                    .config("io.compression.codec.lz4.buffersize", "65536")
                    .withType(schema)
                    .build();
            for (int i = 0; i < 65536; i++) {
                Group group = factory.newGroup();
                group.add("field1", "abcdefghijklmn___________" + i);
                group.add("field2", i);
                writer.write(group);
            }
            writer.close();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
}

C++ read

#include <arrow/io/file.h>
#include <arrow/pretty_print.h>
#include <arrow/record_batch.h>
#include <parquet/arrow/reader.h>

int main(int argc, char** argv) {
    parquet::ReaderProperties reader_props;
    std::string file_path = "/path/to/example.parquet";
    std::unique_ptr<parquet::ParquetFileReader> file_reader = parquet::ParquetFileReader::OpenFile(file_path, false, reader_props);
    std::unique_ptr<::parquet::ParquetFileReader> reader = ::parquet::ParquetFileReader::OpenFile(file_path);
    std::unique_ptr<::parquet::arrow::FileReader> out;
    auto st = parquet::arrow::FileReader::Make(arrow::default_memory_pool(), std::move(reader), &out);
    assert(st.ok());

    for (int i = 0; i < out->num_row_groups(); ++i) {
        auto row_group = out->RowGroup(i);
        std::unique_ptr<::arrow::RecordBatchReader> batch_reader;
        auto status = out->GetRecordBatchReader(&batch_reader);
        std::cout << status.message() << std::endl;
        assert(status.ok());
        std::shared_ptr<arrow::RecordBatch> _batch;
        status = batch_reader->ReadNext(&_batch);
        std::cout << status.message() << std::endl;
        assert(status.ok());
        arrow::PrettyPrint(*_batch, 0, &std::cout);
    }

    return 0;
}

Create the parquet file with java and read the file with c++ will get the error Corrupt Lz4 compressed data. from arrow status message.

Component(s)

C++

mapleFU commented 2 months ago

cc @pitrou @wgtmac does this in spec or we have similar problem before? 🤔

pitrou commented 2 months ago

We see here the problem with the "lz4-hadoop" codec: its spec only exists as Java source code. That's why it's superseded by LZ4_RAW.

wgtmac commented 2 months ago

IIRC, LZ4_HADOOP has been deprecated in the Parquet spec. We should use LZ4_RAW in favor of LZ4_HADOOP at all times.

mapleFU commented 2 months ago

This can be handled like the code below you mind. If you want this you can add it here and add a file in parquet-testing? @Polepoint ?

Polepoint commented 2 months ago

This can be handled like the code below you mind. If you want this you can add it here and add a file in parquet-testing? @Polepoint ?

:ok_hand: I will try.