apache / arrow

Apache Arrow is a multi-language toolbox for accelerated data interchange and in-memory processing
https://arrow.apache.org/
Apache License 2.0
14.3k stars 3.48k forks source link

[Python] [Parquet] Compression degradation when column type changed from INT64 to INT32 #35726

Open droher opened 1 year ago

droher commented 1 year ago

Describe the bug, including details regarding any error messages, version, and platform.

Within a CSV of ~17M rows, I have a column of unique integers that are fairly uniformly distributed between 0 and 200,000,000. I am reading the CSV as follows:

from pyarrow import csv, parquet

def file_to_data_frame_to_parquet(local_file: str, parquet_file: str) -> None:
    table = csv.read_csv(local_file, convert_options=csv.ConvertOptions(strings_can_be_null=True))
    parquet.write_table(table, parquet_file, compression='zstd')

When I read the column without any type specification, the uncompressed size is 133.1MB, and the compressed size is 18.0 MB.

When I add an explicit type mapping for that column in the read_csv step of either uint32 or int32, the total uncompressed size shrinks to 67.0 MB, but the compressed size expands to 55.8 MB. (I'm getting these statistics from the parquet schema metadata functions in DuckDB, but I've validated the difference is real from the total file size.)

This degradation stays the same with a variety of different changes to settings/envs:

Component(s)

Parquet, Python

mapleFU commented 1 year ago

Hi @droher . There are so many reason that can cause a file size grow. So, can you run Printer https://github.com/apache/parquet-cpp/blob/642da055adf009652689b20e68a198cffb857651/src/parquet/printer.cc#L30 or reader https://github.com/apache/parquet-cpp/blob/642da055adf009652689b20e68a198cffb857651/tools/parquet_reader.cc#L66 to print the Metadata of parquet file? This would helps a lot

jorisvandenbossche commented 1 year ago

You can also the python APIs to read the parquet FileMetaData, that should be a bit easier (in the case this includes all the relevant information).

Small example:

import numpy as np
import pyarrow as pa
import pyarrow.parquet as pq

arr = np.random.randint(0, 200_000, size=10_000)
table1 = pa.table({"col": arr})
table2 = pa.table({"col": arr.astype("int32")})

pq.write_table(table1, "data_int64.parquet")
pq.write_table(table2, "data_int32.parquet")

gives:

In [25]: pq.read_metadata("data_int64.parquet").row_group(0).column(0)
Out[25]: 
<pyarrow._parquet.ColumnChunkMetaData object at 0x7f91d8d309a0>
  file_offset: 63767
  file_path: 
  physical_type: INT64
  num_values: 10000
  path_in_schema: col
  is_stats_set: True
  statistics:
    <pyarrow._parquet.Statistics object at 0x7f91d8406de0>
      has_min_max: True
      min: 10
      max: 199969
      null_count: 0
      distinct_count: 0
      num_values: 10000
      physical_type: INT64
      logical_type: None
      converted_type (legacy): NONE
  compression: SNAPPY
  encodings: ('RLE_DICTIONARY', 'PLAIN', 'RLE')
  has_dictionary_page: True
  dictionary_page_offset: 4
  data_page_offset: 46165
  total_compressed_size: 63763
  total_uncompressed_size: 95728

In [26]: pq.read_metadata("data_int32.parquet").row_group(0).column(0)
Out[26]: 
<pyarrow._parquet.ColumnChunkMetaData object at 0x7f91d265cc70>
  file_offset: 56672
  file_path: 
  physical_type: INT32
  num_values: 10000
  path_in_schema: col
  is_stats_set: True
  statistics:
    <pyarrow._parquet.Statistics object at 0x7f91d265d710>
      has_min_max: True
      min: 10
      max: 199969
      null_count: 0
      distinct_count: 0
      num_values: 10000
      physical_type: INT32
      logical_type: None
      converted_type (legacy): NONE
  compression: SNAPPY
  encodings: ('RLE_DICTIONARY', 'PLAIN', 'RLE')
  has_dictionary_page: True
  dictionary_page_offset: 4
  data_page_offset: 39086
  total_compressed_size: 56668
  total_uncompressed_size: 56656

For this toy example, it's also peculiar how the total compressed size is actually a tiny bit larger than the total uncompressed size in the case of int32 (I used the default of snappy, though, with zstd it actually does compress a bit). And int64 also compresses better, but int32 compressed size is still a bit smaller for this case.

jorisvandenbossche commented 1 year ago

You can also the python APIs to read the parquet FileMetaData, that should be a bit easier (in the case this includes all the relevant information).

Small example:

import numpy as np
import pyarrow as pa
import pyarrow.parquet as pq

arr = np.random.randint(0, 200_000, size=10_000)
table1 = pa.table({"col": arr})
table2 = pa.table({"col": arr.astype("int32")})

pq.write_table(table1, "data_int64.parquet")
pq.write_table(table2, "data_int32.parquet")

gives:

In [25]: pq.read_metadata("data_int64.parquet").row_group(0).column(0)
Out[25]: 
<pyarrow._parquet.ColumnChunkMetaData object at 0x7f91d8d309a0>
  file_offset: 63767
  file_path: 
  physical_type: INT64
  num_values: 10000
  path_in_schema: col
  is_stats_set: True
  statistics:
    <pyarrow._parquet.Statistics object at 0x7f91d8406de0>
      has_min_max: True
      min: 10
      max: 199969
      null_count: 0
      distinct_count: 0
      num_values: 10000
      physical_type: INT64
      logical_type: None
      converted_type (legacy): NONE
  compression: SNAPPY
  encodings: ('RLE_DICTIONARY', 'PLAIN', 'RLE')
  has_dictionary_page: True
  dictionary_page_offset: 4
  data_page_offset: 46165
  total_compressed_size: 63763
  total_uncompressed_size: 95728

In [26]: pq.read_metadata("data_int32.parquet").row_group(0).column(0)
Out[26]: 
<pyarrow._parquet.ColumnChunkMetaData object at 0x7f91d265cc70>
  file_offset: 56672
  file_path: 
  physical_type: INT32
  num_values: 10000
  path_in_schema: col
  is_stats_set: True
  statistics:
    <pyarrow._parquet.Statistics object at 0x7f91d265d710>
      has_min_max: True
      min: 10
      max: 199969
      null_count: 0
      distinct_count: 0
      num_values: 10000
      physical_type: INT32
      logical_type: None
      converted_type (legacy): NONE
  compression: SNAPPY
  encodings: ('RLE_DICTIONARY', 'PLAIN', 'RLE')
  has_dictionary_page: True
  dictionary_page_offset: 4
  data_page_offset: 39086
  total_compressed_size: 56668
  total_uncompressed_size: 56656

For this toy example, it's also peculiar how the total compressed size is actually a tiny bit larger than the total uncompressed size in the case of int32 (I used the default of snappy, though, with zstd it actually does compress a bit). And int64 also compresses better, but int32 compressed size is still a bit smaller for this case.

mapleFU commented 1 year ago

Thanks @jorisvandenbossche

Compression depends on window-size and data distribution. Generally, int64 might be better than int32, but maybe we can find some counter-example.

It's possible that compression size is greater than it's size before compression. Parquet has two versions of data pages. In Page Version1, all page in a column chunk should share same compression and must be compressed if "compression" is used. On Page Version 2, a page can decide not to compress if it's size grows larger after compressed.

Other info including Metadata size, RowGroup numbers also metters, so I cannot give an answer without metadata and info

droher commented 1 year ago

Thanks for the help - here's the metadata output for INT32:

  file_offset: 58502786
  file_path: 
  physical_type: INT32
  num_values: 16602160
  path_in_schema: event_key
  is_stats_set: True
  statistics:
    <pyarrow._parquet.Statistics object at 0x102d4d440>
      has_min_max: True
      min: 1
      max: 195205394
      null_count: 0
      distinct_count: 0
      num_values: 16602160
      physical_type: INT32
      logical_type: None
      converted_type (legacy): NONE
  compression: ZSTD
  encodings: ('RLE_DICTIONARY', 'PLAIN', 'RLE', 'PLAIN')
  has_dictionary_page: True
  dictionary_page_offset: 2690531
  data_page_offset: 3599395
  total_compressed_size: 55812255
  total_uncompressed_size: 67036335

And for INT64:

  file_offset: 20698306
  file_path: 
  physical_type: INT64
  num_values: 16602160
  path_in_schema: event_key
  is_stats_set: True
  statistics:
    <pyarrow._parquet.Statistics object at 0x102d4c540>
      has_min_max: True
      min: 1
      max: 195205394
      null_count: 0
      distinct_count: 0
      num_values: 16602160
      physical_type: INT64
      logical_type: None
      converted_type (legacy): NONE
  compression: ZSTD
  encodings: ('RLE_DICTIONARY', 'PLAIN', 'RLE', 'PLAIN')
  has_dictionary_page: True
  dictionary_page_offset: 2690531
  data_page_offset: 2831269
  total_compressed_size: 18007775
  total_uncompressed_size: 133123584

For context, the reason I want the smaller data type is for an accurate schema for downstream systems - I don't need/expect significantly better compression with the different type within the file itself. If I could specify a logical uint32 with a physical int64 that would solve my problem, but I don't think that's possible from my reading of the Parquet spec.

mapleFU commented 1 year ago

I guess I know what the answer is. Would you mind not use dictionary, and use "PLAIN" to encode your data? And then give me the new metadata?

I guess you have a large dictionary, but dictionary doesn't works well. "ZSTD" with int32 also doesn't work well, which makes the problem worse

droher commented 1 year ago

~It worked! It was the explicit "PLAIN" setting that allowed it to work - just turing off "use_dictionary" didn't improve performance.~ Made a mistake, see comment below

<pyarrow._parquet.ColumnChunkMetaData object at 0x10748e340>
  file_offset: 38389383
  file_path: 
  physical_type: INT64
  num_values: 16602160
  path_in_schema: event_key
  is_stats_set: True
  statistics:
    <pyarrow._parquet.Statistics object at 0x10748d350>
      has_min_max: True
      min: 1
      max: 195205394
      null_count: 0
      distinct_count: 0
      num_values: 16602160
      physical_type: INT64
      logical_type: None
      converted_type (legacy): NONE
  compression: ZSTD
  encodings: ('PLAIN', 'RLE')
  has_dictionary_page: False
  dictionary_page_offset: None
  data_page_offset: 20663906
  total_compressed_size: 17725477
  total_uncompressed_size: 132826931

The encodings tuple is the same regardless of whether PLAIN is specified, but the performance is different.

mapleFU commented 1 year ago

haha, encoding tuple has a problem, I'm fixing it now https://github.com/apache/arrow/pull/35758

Maybe you can have a try on DELTA_BINARY_PACKED, but I'm not sure it would works, seems it highly depends on the data distribution

You can also try to change the compression level, maybe it helps

droher commented 1 year ago

You read my mind - I just tried delta with a sort on that column prior to write, and it worked really well - 1.6 MB (compared to the 18 on the INT64).

The pyarrow doc might need updating as it doesn't list delta as supported yet: https://arrow.apache.org/docs/python/generated/pyarrow.parquet.write_table.html Not sure if there's a reason for that or if it's just out of date.

mapleFU commented 1 year ago

Amazing! Maybe it's out of date. @rok Mind take a look?

droher commented 1 year ago

Sorry, but I made a mistake on the "PLAIN" encoding test - I didn't set it to INT32. Once I do that, there's no improvement from the original 😞 . Delta is still valid, though.

mapleFU commented 1 year ago

Hmmm, @wgtmac do you know why INT32 might larger than INT64 after default ZSTD compression with PLAIN encoding?

wgtmac commented 1 year ago

I encountered a similar issue before, in which case the encoded int64s have worse compression ratio compared to raw int64s: https://github.com/facebook/zstd/issues/1325#issuecomment-422307423

mapleFU commented 1 year ago

Seems that @droher can submit an issue or ask zstd maintainer for data in your distribution, personally I've no idea

droher commented 1 year ago

The issue is more prominent when using ZSTD, but it's still there with GZIP - the GZIPped column is still larger as INT32 than it is as INT64.

wgtmac commented 1 year ago

IIUC, both zstd and gzip algorithm are belong to the LZ family. Therefore you may observe similar behavior between them. They compress data in the unit of bytes instead of integers. So algorithms designed to compress integers (e.g. bit packing in parquet or FastPFor) would significantly perform better than these generic compression algorithms. BTW, have you tried different levels? @droher

alippai commented 1 year ago

I can imagine that there are some edge cases when smaller sliding window sizes (in terms of values it’s similar to int32 vs int64) prevents finding rare patterns and improving the entropy coding in the next step.

droher commented 1 year ago

Increasing the compression level doesn't help - even going as high as 15 on ZSTD brings it down to 55.1MB from 55.8.

From what I've learned here, it seems like what I observed is counterintuitive behavior, but not a bug. Fine with closing it on my end. Thanks to everyone who helped out.

mapleFU commented 1 year ago

By the way, in DeltaBinaryPacked, I guess INT64 is better because of my patch in 12.0: https://github.com/apache/arrow/pull/34632

alippai commented 1 year ago

Out of curiosity, can you try saving the parquet file using duckdb (with same row group size and ZSTD codec)?

mapleFU commented 10 months ago

https://github.com/apache/arrow/pull/37940

@droher This patch also fix something about DELTA