apache / arrow

Apache Arrow is a multi-language toolbox for accelerated data interchange and in-memory processing
https://arrow.apache.org/
Apache License 2.0
14.28k stars 3.47k forks source link

c++ Parquet StreamWriter Dictionary Encoding #43682

Closed adampinky85 closed 1 day ago

adampinky85 commented 1 month ago

Describe the usage question you have. Please include as many useful details as possible.

Hi team,

We extensively use Arrow / Parquet files for data analysis with Pandas, it's excellent! We are attempting to use the parquet stream writer to build parquet files which are then consumed in Python for research.

The code below works except for dictionary fields types. These are important to our work as we have billions of rows and a large number of repeating short strings. In Pandas with the dictionary field type it will load columns as the category dtype rather than object which results in unmanageable memory requirements.

Our original approach first builds arrow tables with an arrow schema using StringDictionaryBuilder and then writes the parquet files. This method works with the dictionary fields but does not offer the streaming approach we require.

We would really appreciate any support on how to set the baz column to be a dictionary field? Many thanks!

Versions:

C++:
libarrow 16.1.0-1
libparquet 16.1.0-1

Python:
pyarrow 16.1.0
Python 3.12.3

Parquet Stream Writer

// parquet writer options
auto parquet_properties = parquet::WriterProperties::Builder()
        .compression(arrow::Compression::SNAPPY)
        ->data_page_version(parquet::ParquetDataPageVersion::V2)
        ->encoding(parquet::Encoding::DELTA_BINARY_PACKED)
        ->enable_dictionary()
        ->enable_statistics()
        ->version(parquet::ParquetVersion::PARQUET_2_6)
        ->build();

// parquet schema
auto fields = parquet::schema::NodeVector{};
fields.push_back(parquet::schema::PrimitiveNode::Make(
        "foo",
        parquet::Repetition::REQUIRED,
        parquet::Type::INT64,
        parquet::ConvertedType::INT_64)
);
fields.push_back(parquet::schema::PrimitiveNode::Make(
        "bar",
        parquet::Repetition::REQUIRED,
        parquet::LogicalType::Timestamp(false, parquet::LogicalType::TimeUnit::MILLIS, false, true),
        parquet::Type::INT64)
);
fields.push_back(parquet::schema::PrimitiveNode::Make(
        "baz",
        parquet::Repetition::REQUIRED,
        parquet::LogicalType::String(),
        parquet::Type::BYTE_ARRAY)
);
auto schema = std::static_pointer_cast<parquet::schema::GroupNode>(parquet::schema::GroupNode::Make("schema", parquet::Repetition::REQUIRED, fields));

// open filestream
auto file_system = arrow::fs::LocalFileSystem{};
auto outfile = file_system.OpenOutputStream("new.parquet").ValueOrDie();

// open parquet stream writer
auto parquet_file_writer = parquet::ParquetFileWriter::Open(outfile, schema, parquet_properties);
auto parquet_stream = parquet::StreamWriter{std::move(parquet_file_writer)};

Pandas Reading New parquet::StreamWriter

import pandas as pd
import pyarrow.parquet as pq

df = pd.read_parquet("new.parquet")
df.dtypes

foo             int64
bar    datetime64[ms]
baz            object <-------- should be category
dtype: object

schema = pq.read_schema("new.parquet")
schema

foo: int64 not null
bar: timestamp[ms] not null
baz: string not null <-------- string not dictionary

Pandas Reading Original parquet::arrow::WriteTable

import pandas as pd
import pyarrow.parquet as pq

df = pd.read_parquet("original.parquet")
df.dtypes

foo             int64
bar    datetime64[ms]
baz            category <-------- correct
dtype: object

schema = pq.read_schema("original.parquet")
schema

foo: int64 not null
bar: timestamp[ms] not null
baz: dictionary<values=string, indices=int32, ordered=0> <-------- correct

Component(s)

C++

mapleFU commented 1 month ago

read_parquet/read_table would have read-dictionary api, perhaps you should also use that

adampinky85 commented 2 weeks ago

thanks @mapleFU for the suggestion. It unfortunately doesn't work for our use case; it would require the user to be aware of the dictionary columns. We need the parquet to be written with the correct schema.

mapleFU commented 2 weeks ago

Can you elaborate a case that this doesn't work? I'll recheck it

adampinky85 commented 2 weeks ago

Hi @mapleFU, with the example code above, using pyarrow.parquet.read_table doesn't work for our needs as; 1. users would need to use pyarrow instead of pandas; 2. users would need to be aware of the dictionary columns fields for each file (should be part of the schema); and 3. the data is not stored in the format we require i.e. dictionary int fields rather than compression string encoding.

We want to be able to build parquet files with the dictionary fields (which works for the non streaming C++ API) and utilise the stream C++ stream writer. thanks

mapleFU commented 2 weeks ago

We need the parquet to be written with the correct schema.

I've check this @adampinky85

// parquet writer options
auto parquet_properties = parquet::WriterProperties::Builder()
        .compression(arrow::Compression::SNAPPY)
        ->data_page_version(parquet::ParquetDataPageVersion::V2)
        ->encoding(parquet::Encoding::DELTA_BINARY_PACKED)
        ->enable_dictionary()
        ->enable_statistics()
+        ->store_schema()
        ->version(parquet::ParquetVersion::PARQUET_2_6)
        ->build();

Maybe you can try to add a store_schema here. So sorry for delaying

adampinky85 commented 2 weeks ago

Thanks @mapleFU, with our current non-streaming approach, see below, we're building an arrow schema which provides the string dictionary fields. These are then stored into the parquet file using the store_schema approach as you suggest.

With the the streaming API, see top of the issue, we're unable to set dictionary fields. I would have thought it will always store these fields as byte array strings rather than integers with dictionaries?

  // schema and builder
  auto schema =
      arrow::schema({
                     arrow::field("foo", arrow::timestamp(arrow::TimeUnit::MILLI)),
                     arrow::field("bar", dictionary(arrow::int8(), arrow::utf8())),
                     arrow::field("baz", arrow::float64())
     });

  auto foo_builder = arrow::TimestampBuilder(arrow::timestamp(arrow::TimeUnit::MILLI),
                                                   arrow::default_memory_pool());
  auto bar_builder = arrow::StringDictionaryBuilder{};
  auto baz_builder= arrow::DoubleBuilder();

  ...

  // parquet file output
  const auto parquet_properties = parquet::WriterProperties::Builder()
                                      .compression(arrow::Compression::SNAPPY)
                                      ->data_page_version(parquet::ParquetDataPageVersion::V2)
                                      ->enable_dictionary()
                                      ->encoding(parquet::Encoding::DELTA_BINARY_PACKED)
                                      ->version(parquet::ParquetVersion::PARQUET_2_6)
                                      ->build();

  // required to store the arrow schema for pandas to retrieve categorical types
  const auto arrow_properties = parquet::ArrowWriterProperties::Builder().store_schema()->build();
mapleFU commented 2 weeks ago

So sorry to be misleading...

void WriteParquetFile() {
  std::shared_ptr<arrow::io::FileOutputStream> outfile;

  PARQUET_ASSIGN_OR_THROW(
      outfile, arrow::io::FileOutputStream::Open("parquet-stream-api-example.parquet"));

  parquet::WriterProperties::Builder builder;

  // schema and builder
  auto arrow_schema =
      arrow::schema({
                     arrow::field("foo", arrow::timestamp(arrow::TimeUnit::MILLI)),
                     arrow::field("bar", dictionary(arrow::int8(), arrow::utf8())),
                     arrow::field("baz", arrow::float64())
     });

#if defined ARROW_WITH_BROTLI
  builder.compression(parquet::Compression::BROTLI);
#elif defined ARROW_WITH_ZSTD
  builder.compression(parquet::Compression::ZSTD);
#endif

  // parquet schema
  auto fields = parquet::schema::NodeVector{};
  fields.push_back(parquet::schema::PrimitiveNode::Make(
          "foo",
          parquet::Repetition::REQUIRED,
          parquet::Type::INT64,
          parquet::ConvertedType::INT_64)
  );
  fields.push_back(parquet::schema::PrimitiveNode::Make(
          "bar",
          parquet::Repetition::REQUIRED,
          parquet::LogicalType::Timestamp(false, parquet::LogicalType::TimeUnit::MICROS, false, true),
          parquet::Type::INT64)
  );
  fields.push_back(parquet::schema::PrimitiveNode::Make(
          "baz",
          parquet::Repetition::REQUIRED,
          parquet::LogicalType::String(),
          parquet::Type::BYTE_ARRAY)
  );
  auto parquet_schema = std::static_pointer_cast<parquet::schema::GroupNode>(parquet::schema::GroupNode::Make("schema", parquet::Repetition::REQUIRED, fields));
  auto parquet_writer = parquet::ParquetFileWriter::Open(outfile, parquet_schema, builder.build());
  auto serialized_schema =
                        ::arrow::ipc::SerializeSchema(*arrow_schema, ::arrow::default_memory_pool()).ValueOrDie();

  // The serialized schema is not UTF-8, which is required for Thrift
  std::string schema_as_string = serialized_schema->ToString();
  std::string schema_base64 = ::arrow::util::base64_encode(schema_as_string);
  parquet_writer->AddKeyValueMetadata(::arrow::KeyValueMetadata::Make({"ARROW:schema"}, {schema_base64}));

  parquet::StreamWriter os{std::move(parquet_writer)};

  os.SetMaxRowGroupSize(1000);

  for (auto i = 0; i < TestData::num_rows; ++i) {
    os << int64_t(1000000);
    os << UserTimestamp{std::chrono::milliseconds{1000000 * i}};
    os << "def";
    os << parquet::EndRow;

    if (i == TestData::num_rows / 2) {
      os << parquet::EndRowGroup;
    }
  }
  std::cout << "Parquet Stream Writing complete." << std::endl;
}

@adampinky85 The core point here is

  // The serialized schema is not UTF-8, which is required for Thrift
  std::string schema_as_string = serialized_schema->ToString();
  std::string schema_base64 = ::arrow::util::base64_encode(schema_as_string);
  parquet_writer->AddKeyValueMetadata(::arrow::KeyValueMetadata::Make({"ARROW:schema"}, {schema_base64}));

It's a hack, but I think it could be use

With the the streaming API, see top of the issue, we're unable to set dictionary fields. I would have thought it will always store these fields as byte array strings rather than integers with dictionaries?

Nope...This is hack in arrow. Parquet dictionary is unrelated to dictionary schema. This schema is set by arrow parquet writer. The stream writer lacks this.

mapleFU commented 2 weeks ago

I've verified with my file:

>>> df = pd.read_parquet("parquet-stream-api-example.parquet")
>>> df.dtypes
foo             int64
bar    datetime64[us]
baz          category
dtype: object
>>> schema = pq.read_schema("parquet-stream-api-example.parquet")
>>> schema
foo: int64 not null
bar: timestamp[us] not null
baz: dictionary<values=string, indices=int32, ordered=0> not null

So sorry for misunderstanding

mapleFU commented 2 weeks ago

@adampinky85 Have this solve the problem?

adampinky85 commented 2 weeks ago

Hi @mapleFU, thanks for the details, I’m on leave currently. I’ll be back in a week and will confirm. Thanks!

adampinky85 commented 1 day ago

Hi @mapleFU, thank you, that works! Much appreciated :)

mapleFU commented 1 day ago

Sorry for mistake so many times