apache / arrow

Apache Arrow is a multi-language toolbox for accelerated data interchange and in-memory processing
https://arrow.apache.org/
Apache License 2.0
14.36k stars 3.49k forks source link

[C++] How to concatenate multiple tables in one parquet? #41858

Open zliucd opened 4 months ago

zliucd commented 4 months ago

Hi,

It's possible to write multiple tables in a single parquet by appending each rows from individual parquet? All tables read from parquets have same columns. This functionality is similar to Python dataframe.concat([df1, df2]).

For example:

table1
Name   Age
Jim     36
Bill      30

table2
Name   Age
Sam    28
Joe     30

The concatenated table and parquet file should be:

Name   Age
Jim       36
Bill        30
Sam      28
Joe       30

We can concatenate tables using auto con_tables = arrow::ConcatenateTables, but it's not possible to write con_tables using parquet::arrow::WriteTable(). The first param of WriteTable() is a single arrow::Table.

This post shows how to merge tables by appending columns, but my context is appending rows. https://stackoverflow.com/questions/71183352/merging-tables-in-apache-arrow

Following code from Apache-arrow's arrow_reader_writer_test.cc. It's likely we can manually read rows and columns from parquets(tables), but is there an easier way to concatenate?

::arrow::Result<std::shared_ptr<Table>> ReadTableManually(FileReader* reader) {
  std::vector<std::shared_ptr<Table>> tables;

  std::shared_ptr<::arrow::Schema> schema;
  RETURN_NOT_OK(reader->GetSchema(&schema));

  int n_row_groups = reader->num_row_groups();
  int n_columns = schema->num_fields();
  for (int i = 0; i < n_row_groups; i++) {
    std::vector<std::shared_ptr<ChunkedArray>> columns{static_cast<size_t>(n_columns)};

    for (int j = 0; j < n_columns; j++) {
      RETURN_NOT_OK(reader->RowGroup(i)->Column(j)->Read(&columns[j]));
    }

    tables.push_back(Table::Make(schema, columns));
  }

  return ConcatenateTables(tables);
}

Thanks.


Notes: I have implemented a simple version that read each column from every table, and 'concatenate'. The performance is not sound.

Component(s)

C++

davlee1972 commented 4 months ago

In Python I would normally just concatenate entire parquet files as parquet row groups into a single file.. No need to dig into individual rows..

image

davlee1972 commented 4 months ago

To write out data batch-by-batch, use [arrow::FileWriter]

https://arrow.apache.org/docs/cpp/parquet.html

// #include "parquet/arrow/writer.h"
// #include "arrow/util/type_fwd.h"
using parquet::ArrowWriterProperties;
using parquet::WriterProperties;

// Data is in RBR
std::shared_ptr<arrow::RecordBatchReader> batch_stream;
ARROW_ASSIGN_OR_RAISE(batch_stream, GetRBR());

// Choose compression
std::shared_ptr<WriterProperties> props =
    WriterProperties::Builder().compression(arrow::Compression::SNAPPY)->build();

// Opt to store Arrow schema for easier reads back into Arrow
std::shared_ptr<ArrowWriterProperties> arrow_props =
    ArrowWriterProperties::Builder().store_schema()->build();

// Create a writer
std::shared_ptr<arrow::io::FileOutputStream> outfile;
ARROW_ASSIGN_OR_RAISE(outfile, arrow::io::FileOutputStream::Open(path_to_file));
std::unique_ptr<parquet::arrow::FileWriter> writer;
ARROW_ASSIGN_OR_RAISE(
    writer, parquet::arrow::FileWriter::Open(*batch_stream->schema().get(),
                                             arrow::default_memory_pool(), outfile,
                                             props, arrow_props));

// Write each batch as a row_group
for (arrow::Result<std::shared_ptr<arrow::RecordBatch>> maybe_batch : *batch_stream) {
  ARROW_ASSIGN_OR_RAISE(auto batch, maybe_batch);
  ARROW_ASSIGN_OR_RAISE(auto table,
                        arrow::Table::FromRecordBatches(batch->schema(), {batch}));
  ARROW_RETURN_NOT_OK(writer->WriteTable(*table.get(), batch->num_rows()));
}

// Write file footer and close
ARROW_RETURN_NOT_OK(writer->Close());