apache / arrow

Apache Arrow is a multi-language toolbox for accelerated data interchange and in-memory processing
https://arrow.apache.org/
Apache License 2.0
14.04k stars 3.43k forks source link

[C++][Parquet] Fast Random Rowgroup Reads #39676

Open corwinjoy opened 6 months ago

corwinjoy commented 6 months ago

Describe the enhancement requested

And a previous discussion around this with additional benchmarks: https://github.com/apache/arrow/issues/38149

Having a fast random access reader would also be beneficial for fast reading of a file with predicate pushdowns or other applications where specific rows and columns are desired.

Component(s)

C++

emkornfield commented 6 months ago

Thousands for rowgroups is an anti-pattern for laying out data (I understand some customers do it) but it creates exactly this type of performance bottleneck (sometimes this is out of our control though) but we should audit write config parameters to make sure there isn't something that is causing this type of spilling, and yes in general, parquet is not well suited to very large column widths. I think there is a better solution here but given that this touches metadata serialization I'm not sure the appetite in there will be for trying to incorporate metadata that parses faster. In any case format changes they need to be discussed on the parquet mailing list dev@parquet.apache.org

mapleFU commented 6 months ago

reading the full file metadata is prohibitively expensive when you just want a sample from a table.

So your bottleneck is reading metadata and row-group? (Since the statistics would be huge).

Firstly I think whether we can reducing the row-groups metadata in writer side. It would be much more easier.

Actually I think the idea is great, since decoding metadata would be heavy, but I think the "decode_first_row_group" is so tricky here. If we can do it better (like decoding specific metadata), I think it would be great

emkornfield commented 6 months ago

One potential way of doing this could be to reduce the current parquet.thrift to just the metadata needed. I believe in that case it should generate code that will skip over unknown fields (would need to double check if there are specific settings to make this happen).

corwinjoy commented 6 months ago

@emkornfield @mapleFU I 100% agree that these giant tables with huge numbers of rows and columns are an anti-pattern. Yet, here we are. It all started out so well with reasonably sized tables, but then, over the years we have found many informative columns for our models and collected more data so that now we are up to tens of thousands of columns and millions of rows. Eventually, we hope to add some kind of indexing layer (e.g. Apache Iceberg, database, etc.). But that will require a significant investment in design, maintenance, and even refactoring our code away from Arrow. In the meantime, we'd really like to make arrow performance a lot better for this case and I think we are not the only users whose data has grown over the years. As for the random sampling, I think this is a pretty common usage case, many deep learning models sample random batches for training. Also, ensemble methods such as random forests will randomly sample rows and columns to create "independent" models.

So, back to the high-level design for faster reads, there are two parts:

  1. Read a "minimal" set of metadata. This acts as a kind of "prototype" for the individual rowgroups. Yes, this is slightly risky since I think it is technically possible for a parquet file to change up the set of columns inside the file, but I'd rather just throw an error for this case. (Although I'm not sure how to detect it / have not built a test case for it.). As a first attempt, what I have done here is read only the first RowGroup details in the large metadata segment.
  2. Leverage the PageIndex feature to point to the correct rowgroup data. This is designed to make "point lookups I/O efficient" so we should be able to leverage that index and this PR shows one attempt to do so.
corwinjoy commented 6 months ago

To your other comments, yes the giant metadata header is the big bottleneck for large tables. Once there are a lot of rows and columns it can take 100x longer to parse that big header than to read the data (if we are just taking a sample). I think this is because the header gets huge, and, parsing the thrift data is actually quite slow since it needs to be decoded field by field and then recopied. It is probably hard to get the format changed, so my first thought was to only read the metadata for the first rowgroup and use this as a kind of prototype. But, there may be better approaches although I'm not sure what they would be.

corwinjoy commented 6 months ago

Somewhat related, this article gives a nice overview of where this optimization fits in with ideas like predicate push-down, fast queries, etc. Querying Parquet with Millisecond Latency. @m4rs-mt @marcin-krystianc

mapleFU commented 6 months ago

I understand why don't read all row-group metadata, but why a "first RowGroup" is read in this experiment? Since we already has schema here: https://github.com/apache/parquet-format/blob/master/src/main/thrift/parquet.thrift#L1116

emkornfield commented 6 months ago

@corwinjoy I think we should likely address a issues here before proceeding to an implementation:

  1. Do you have a flame-graph or other granular statistics of where the parsing is spending time. I'd imagine a fair bit of it might be in copying unneeded string data but having data would help identify the solution space for this (again it feels like potentially maintaining a fork of parquet.thrift that removes all statistics fields and use generated code from that might help improve this if the majority of time is spent copying that data. Less so if the time is spent allocating lists/actually parsing)
  2. I think the second part of this if IIUC API that make sense for communicating that we want to avoid any metadata that doesn't help with reading data (i.e. we don't desire any sort of statistics that could help with pruning). This could maybe per a reader property? It seems the initial PR focused on the first row group which seems maybe more specific than something we would want?
  3. It sounds like some sort of pushdown sampling is desired if we can gain efficiencies by doing so in the parquet library vs one of the existing or proposed extension points. For this point are the APIs proposed in https://github.com/apache/arrow/issues/38865 sufficient?
corwinjoy commented 6 months ago

@emkornfield @mapleFU These are a lot of questions. I have started by running a more detailed performance profile using the PR posted previously + a larger data set for clarity. Details are below:

Performance profile:

Large table, all integer columns with the following properties:
parquet::WriterProperties::Builder builder;
builder.enable_write_page_index()->max_row_group_length(chunkSize)->disable_dictionary();

Built using debug settings for readability when profiling so this may slightly distort things.
Benchmark is via perf:
> /usr/bin/perf record --freq=1000 --call-graph dwarf -q -o bm_reader_perf /src/arrow/cpp/cmake-build-debug-arrow_debug/debug/parquet-internals-test --gtest_filter=PageIndexBuilderTest.BenchmarkReader:PageIndexBuilderTest/*.BenchmarkReader:PageIndexBuilderTest.BenchmarkReader/*:*/PageIndexBuilderTest.BenchmarkReader/*:*/PageIndexBuilderTest/*.BenchmarkReader --gtest_color=no

> perf report -i bm_reader_perf

Benchmark Results: 
(nColumn=6000, nRow=10000), chunk_size=10, pages=1000, time with index=BenchmarkIndexedRead=4.85429s, time with full metadata=BenchmarkRegularRead=14.172s

At a high level, we have two benchmarking routines. 
BenchmarkRegularRead - Opens the file as usual, reads full metadata, subsets rowgroups using subset method. Reads subset using FileReaderBuilder with metadata for rowgroup.
BenchmarkIndexedRead - Opens the file with "only rowgroup 0" metadata read, reads OffsetIndex goes to target rowgroups via index. Reads subset using FileReaderBuilder with metadata for rowgroup.

Benchmarks from perf:
-   63.83%     0.00%  parquet-interna  parquet-internals-test        [.] parquet::BenchmarkReadColumnsUsingOffsetIndex  
   - parquet::BenchmarkReadColumnsUsingOffsetIndex                                                                                          ▒
      + 48.77% parquet::BenchmarkRegularRead                                                                                                ▒
      + 15.07% parquet::BenchmarkIndexedRead 

      - 48.77% parquet::BenchmarkRegularRead                                                                                                ▒
         + 43.72% parquet::ReadMetaData                                                                                                     ▒
         + 3.44% std::shared_ptr<parquet::FileMetaData>::~shared_ptr                                                                        ▒
         + 1.59% parquet::ReadIndexedRow   

      - 48.77% parquet::BenchmarkRegularRead                                                                                                ▒
         - 43.72% parquet::ReadMetaData                                                                                                     ▒
              parquet::ParquetFileReader::Open                                                                                              ▒
              parquet::ParquetFileReader::Contents::Open                                                                                    ▒
            - parquet::SerializedFile::ParseMetaData                                                                                        ▒
               - 42.85% parquet::SerializedFile::ParseUnencryptedFileMetadata                                                               ▒
                    parquet::FileMetaData::Make                                                                                             ▒
                    parquet::FileMetaData::FileMetaData                                                                                     ▒
                  - parquet::FileMetaData::FileMetaDataImpl::FileMetaDataImpl                                                               ▒
                     - 42.81% parquet::ThriftDeserializer::DeserializeMessage<parquet::format::FileMetaData>                                ▒
                          parquet::ThriftDeserializer::DeserializeUnencryptedMessage<parquet::format::FileMetaData>                         ▒
                        - parquet::format::FileMetaData::read                                                                               ▒
                           - 42.60% parquet::format::RowGroup::read                                                                         ▒
                              + 38.23% parquet::format::ColumnChunk::read                                                                   ▒
                              + 3.10% std::vector<parquet::format::ColumnChunk, std::allocator<parquet::format::ColumnChunk> >::resize      ▒
               + 0.74% arrow::io::internal::RandomAccessFileConcurrencyWrapper<arrow::io::ReadableFile>::ReadAt                             ▒
         + 3.44% std::shared_ptr<parquet::FileMetaData>::~shared_ptr                                                                        ▒
         + 1.59% parquet::ReadIndexedRow                

So, essentially, the regular read is spending a huge amount of time reading all of the rowgroup metadata. In this example it 
takes 40x to read the metadata than to read the actual row data.

In contrast, the IndexedRead only reads the first rowgroup, but then spends a bit of extra time reading the OffsetIndex addresses. 
(I have optimized the OffsetIndex reader, this is the GetAllOffsets function below).

- 15.07% parquet::BenchmarkIndexedRead                                                                                                ▒
         - 11.67% parquet::ReadPageIndexesDirect                                                                                            ▒
            + 11.65% parquet::(anonymous namespace)::PageIndexReaderImpl::GetAllOffsets       # This is the offset index reader             ▒
         + 1.57% parquet::ReadIndexedRow                                                                                                    ▒
         + 1.12% parquet::ReadMetaData                                                                                                      ▒
         + 0.69% std::vector<std::vector<std::shared_ptr<parquet::OffsetIndex>, std::allocator<std::shared_ptr<parquet::OffsetIndex> > >, st▒
+   48.77%     0.00%  parquet-interna  parquet-internals-test        [.] parquet::BenchmarkRegularRead                  

Breaking down the slow metadata read further it does seem that statistics may play a big role:

                       - parquet::format::FileMetaData::read                                                                               ▒
                           - 42.60% parquet::format::RowGroup::read                                                                         ▒
                              - 38.23% parquet::format::ColumnChunk::read                                                                   ▒
                                 - 30.53% parquet::format::ColumnMetaData::read                                                             ▒
                                    + 6.04% parquet::format::Statistics::read                                                               ▒
                                    + 4.10% parquet::format::PageEncodingStats::read                                                        ▒
                                    + 3.30% apache::thrift::protocol::TProtocol::readFieldBegin                                             ▒
                                    + 2.70% std::vector<parquet::format::Encoding::type, std::allocator<parquet::format::Encoding::type> >::▒
                                    + 2.40% std::vector<parquet::format::PageEncodingStats, std::allocator<parquet::format::PageEncodingStat▒
                                    + 1.98% std::vector<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, std▒
                                    + 1.55% apache::thrift::protocol::TProtocol::readI32                                                    ▒
                                    + 1.51% apache::thrift::protocol::TProtocol::readListBegin                                              ▒
                                    + 1.48% apache::thrift::protocol::TProtocol::readI64                                                    ▒
                                    + 0.59% apache::thrift::protocol::TProtocol::readString                                                 ▒
                                 + 2.22% apache::thrift::protocol::TProtocol::readFieldBegin                                                ▒
                                 + 1.63% apache::thrift::protocol::TProtocol::readI64                                                       ▒
                                 + 0.88% apache::thrift::protocol::TProtocol::readI32                                                       ▒
                              + 3.10% std::vector<parquet::format::ColumnChunk, std::allocator<parquet::format::ColumnChunk> >::resize 

Try the above test with builder.disable_statistics() to try and remove the statistics.  

Benchmark Results: 
(nColumn=6000, nRow=10000), chunk_size=10, pages=1000, time with index=4.57732s, time with full metadata=11.0561s

This is a lot better. Still, reading the metadata takes a long time:

-   62.17%     0.00%  parquet-interna  parquet-internals-test        [.] parquet::BenchmarkReadColumnsUsingOffsetIndex                      ▒
   - parquet::BenchmarkReadColumnsUsingOffsetIndex                                                                                          ▒
      - 44.23% parquet::BenchmarkRegularRead                                                                                                ▒
         + 38.88% parquet::ReadMetaData                                                                                                     ▒
         + 3.52% std::shared_ptr<parquet::FileMetaData>::~shared_ptr                                                                        ▒
         + 1.76% parquet::ReadIndexedRow                                                                                                    ▒
      + 17.94% parquet::BenchmarkIndexedRead   

The statistics are reduced but still a significant chunk:

      - 44.23% parquet::BenchmarkRegularRead                                                                                                ▒
         - 38.88% parquet::ReadMetaData                                                                                                     ▒
              parquet::ParquetFileReader::Open                                                                                              ▒
              parquet::ParquetFileReader::Contents::Open                                                                                    ▒
            - parquet::SerializedFile::ParseMetaData                                                                                        ▒
               - 38.33% parquet::SerializedFile::ParseUnencryptedFileMetadata                                                               ▒
                    parquet::FileMetaData::Make                                                                                             ▒
                    parquet::FileMetaData::FileMetaData                                                                                     ▒
                  - parquet::FileMetaData::FileMetaDataImpl::FileMetaDataImpl                                                               ▒
                     - 38.31% parquet::ThriftDeserializer::DeserializeMessage<parquet::format::FileMetaData>                                ▒
                          parquet::ThriftDeserializer::DeserializeUnencryptedMessage<parquet::format::FileMetaData>                         ▒
                        - parquet::format::FileMetaData::read                                                                               ▒
                           - 38.20% parquet::format::RowGroup::read                                                                         ▒
                              - 33.54% parquet::format::ColumnChunk::read                                                                   ▒
                                 - 26.74% parquet::format::ColumnMetaData::read                                                             ▒
                                    + 4.87% parquet::format::PageEncodingStats::read                                                        ▒
                                    + 3.44% apache::thrift::protocol::TProtocol::readFieldBegin                                             ▒
                                    + 2.80% std::vector<parquet::format::Encoding::type, std::allocator<parquet::format::Encoding::type> >::▒
                                    + 2.57% std::vector<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, std▒
                                    + 2.57% std::vector<parquet::format::PageEncodingStats, std::allocator<parquet::format::PageEncodingStat▒
                                    + 1.88% apache::thrift::protocol::TProtocol::readI32                                                    ▒
                                    + 1.77% apache::thrift::protocol::TProtocol::readListBegin                                              ▒
                                    + 1.61% apache::thrift::protocol::TProtocol::readI64                                                    ▒
                                    + 0.78% apache::thrift::protocol::TProtocol::readString                                                 ▒
                                 + 1.84% apache::thrift::protocol::TProtocol::readFieldBegin                                                ▒
                                 + 1.19% apache::thrift::protocol::TProtocol::readI64                                                       ▒
                              + 3.83% std::vector<parquet::format::ColumnChunk, std::allocator<parquet::format::ColumnChunk> >::resize    
corwinjoy commented 6 months ago

Points from the profiling session:

  1. This supports my claim that the metadata read is extremely expensive (up to 40x the read time with statistics).
  2. Removing statistics helps, but there are still some left after turning them off. Overall, I believe the problem is just the large number of rowgroups and columns that need to be read for the full metadata. In the above profile, even without statistics the metadata read time still about 22x the data read time.
  3. This is why I believe it makes sense to create a method that can avoid this full metadata read. Reading only the first rowgroup as a kind of prototype is one way, there may be others.
corwinjoy commented 6 months ago

@emkornfield wrote:

@corwinjoy I think we should likely address a issues here before proceeding to an implementation:

  1. Do you have a flame-graph or other granular statistics of where the parsing is spending time. I'd imagine a fair bit of it might be in copying unneeded string data but having data would help identify the solution space for this (again it feels like potentially maintaining a fork of parquet.thrift that removes all statistics fields and use generated code from that might help improve this if the majority of time is spent copying that data. Less so if the time is spent allocating lists/actually parsing)

see above

  1. I think the second part of this if IIUC API that make sense for communicating that we want to avoid any metadata that doesn't help with reading data (i.e. we don't desire any sort of statistics that could help with pruning). This could maybe per a reader property? It seems the initial PR focused on the first row group which seems maybe more specific than something we would want?

I'm not sure how much we can reduce this without changing the parquet spec. My main argument is that I think that reading all the rowgroups (and some of the other metadata) is simply unnecessary to retrieve the data.

  1. It sounds like some sort of pushdown sampling is desired if we can gain efficiencies by doing so in the parquet library vs one of the existing or proposed extension points. For this point are the APIs proposed in [C++][Parquet] support passing a RowRange to RecordBatchReader  #38865 sufficient?

The PR listed here is fine as an interface. It suffers from the same problem as the benchmarks presented here. Opening the file still has to read the full metadata before accessing rowgroups and that can be super-expensive. The kind of optimization presented here would provide internals to avoid reading the full metadata but still be able to access rowgroup data.

corwinjoy commented 6 months ago

@mapleFU wrote:

I understand why don't read all row-group metadata, but why a "first RowGroup" is read in this experiment? Since we already has schema here: https://github.com/apache/parquet-format/blob/master/src/main/thrift/parquet.thrift#L1116

I like this idea and I think it has the potential to be even faster than what I have done in the PR. To be specific in parquet_types.cpp:8435 we have:

uint32_t FileMetaData::read(::apache::thrift::protocol::TProtocol* iprot) {

  ::apache::thrift::protocol::TInputRecursionTracker tracker(*iprot);
  uint32_t xfer = 0;
  std::string fname;
  ::apache::thrift::protocol::TType ftype;
  int16_t fid;
  bool read_only_rowgroup_0 = this->read_only_rowgroup_0;

  xfer += iprot->readStructBegin(fname);

  using ::apache::thrift::protocol::TProtocolException;

  bool isset_version = false;
  bool isset_schema = false;
  bool isset_num_rows = false;
  bool isset_row_groups = false;

  while (true)
  {
    xfer += iprot->readFieldBegin(fname, ftype, fid);
    if (ftype == ::apache::thrift::protocol::T_STOP) {
      break;
    }
    switch (fid)
    {
      case 1:
        if (ftype == ::apache::thrift::protocol::T_I32) {
          xfer += iprot->readI32(this->version);
          isset_version = true;
        } else {
          xfer += iprot->skip(ftype);
        }
        break;
      case 2:
        if (ftype == ::apache::thrift::protocol::T_LIST) {
          {
            this->schema.clear();
            uint32_t _size321;
            ::apache::thrift::protocol::TType _etype324;
            xfer += iprot->readListBegin(_etype324, _size321);
            this->schema.resize(_size321);
            uint32_t _i325;
            for (_i325 = 0; _i325 < _size321; ++_i325)
            {
              xfer += this->schema[_i325].read(iprot);
            }
            xfer += iprot->readListEnd();
          }
          isset_schema = true;
        } else {
          xfer += iprot->skip(ftype);
        }
        break;
...

So the second item read is actually this schema which doesn't even show up in the profile so I think it may be quite fast. But, there is a problem, I think we still need to construct a prototype RowGroup from it in order for the readers to work. @mapleFU - do you know how to create a RowGroup from the schema? Is there such a function?

As to why I am using RowGroups the data readers seem to be intimately tied up with using the RowGroup metadata information. For example: reader.cc: 262

  Status ReadColumn(int i, const std::vector<int>& row_groups, ColumnReader* reader,
                    std::shared_ptr<ChunkedArray>* out) {
    BEGIN_PARQUET_CATCH_EXCEPTIONS
    // TODO(wesm): This calculation doesn't make much sense when we have repeated
    // schema nodes
    int64_t records_to_read = 0;
    for (auto row_group : row_groups) {
      // Can throw exception
      records_to_read +=
          reader_->metadata()->RowGroup(row_group)->ColumnChunk(i)->num_values();
    }
    ...
    return reader->NextBatch(records_to_read, out);   # and this uses the data page records from the rowgroup
  }
dannyfriar commented 6 months ago

This would certainly benefit out use case which is a dataset consisting of many 1000s of columns and a few billion rows (with row groups of a few GB in size). When reading a specific row group, the time to read metadata can be a significant fraction of the time

mapleFU commented 6 months ago

@corwinjoy I get what you'd like to do, i'll go through this patch this week. The main issue is that hacking the thrift generated data structure is a bit hacking 🤔, especially when we only want "first row group"...

Besides, could you share a similiar file in your test case( with mocked data ), so I can trying to reproduce the problem? At least would i know the file size, row count, column count, and row-group count?

corwinjoy commented 6 months ago

@mapleFU Thanks for taking a look! In terms of reading only the first row group I can think of two ways to do this cleanly:

  1. Work the thrift compiler to create a specialized FileMetaData class (say FileMetaDataFast) that only reads the first row group. I haven't really explored this since I am unfamiliar with the thrift compiler and how it is invoked in this project.
  2. Create a derived class of FileMetaData in a new file with a specialized read method where we copy and specialize the existing read code to only read the first row. Then, the read_only_rowgroup_0 flag could invoke a read of this derived class.

In both cases, I think the function would need to return after reading the first row group since we can't safely skip bytes. (This is a bit of a problem if we want to support fields that come after row_groups such as encryption_algorithm. But, for files created by arrow there is a workaround. Since each field has a field_id we could change the order that fields are written to grab critical fields before the row group).

In terms of providing a test file, the new unit tests in src/parquet/page_index_test.cc create their own test data. In the PR this is set to a somewhat smaller size of (nColumn=6000, nRow=1000) for ease of development vs the larger file used as illustration in the perf above. Anyway, this is easily configurable as shown below:

TEST_F(PageIndexBuilderTest, BenchmarkReader) {
  std::string dir_string(parquet::test::get_data_dir());
  std::string path = dir_string + "/index_reader_bm_lg.parquet";

  int nColumn = 6000; \\ <----------------Adjust as needed. These are the sizes used in the above perf report
  int nRow = 10000; \\ Large file size. 10x what is in the PR
  int chunk_size = 10;
  WriteTableToParquet(nColumn, nRow, path.c_str(), chunk_size, false); \\ Creates file only if it does not alread exist
  ...
}

To be consistent with the other tests I am using the test data directory so you will need to set the test data environment variable, e.g. PARQUET_TEST_DATA=/src/arrow/cpp/submodules/parquet-testing/data

pitrou commented 2 months ago

Related: https://github.com/apache/arrow/pull/41761

corwinjoy commented 2 months ago

Thanks @pitrou