apache / iceberg-python

Apache PyIceberg
https://py.iceberg.apache.org/
Apache License 2.0
434 stars 158 forks source link

`parquet_path_to_id_mapping` generates incorrect path for List types #716

Open cgbur opened 5 months ago

cgbur commented 5 months ago

Apache Iceberg version

main (development)

Please describe the bug 🐞

When using the add_files table api, the parquet metadata needs to be read and a mapping of Dict[str, int] is used by data_file_statistics_from_parquet_metadata in order to link the field ID to the name in the parquet file for statistics collection. However during the mapping lookup I was receiving an error that a key was not present.

My schema contains one of the following (its a subfield of a Details struct which is important for the full name later):

extras: large_list<item: struct<key: string not null, value: string>> not null
  child 0, item: struct<key: string not null, value: string>
      child 0, key: string not null
      child 1, value: string

Which based on the parquet schema path definition has a path of:

Details.extras.list.item.key
Details.extras.list.item.value

The issue is that the parquet_path_to_id_mapping returns a mapping for these two fields as follows:

Details.extras.list.element.key -> 189
Details.extras.list.element.value -> 190

So, the issue appears to be that the visitor for constructing the schema paths is incorrectly using element instead of item as expected in the parquet schema paths. I am not sure how this manifests yet, as I have not dug into it too closely.

Fokko commented 5 months ago

@cgbur Thanks for raising this. Could you share the stack trace that you're seeing?

I tried to reproduce it, but it works on my end:

def test_data_file_statistics_from_parquet_metadata_list(tmp_path_factory: pytest.TempPathFactory) -> None:
    pyarrow_list = pa.schema([
        pa.field("extras", pa.list_(pa.field("element", pa.struct([pa.field("key", pa.string()), pa.field("value", pa.string())]))))
    ])
    tbl = pa.Table.from_pylist([{'some_list': [{"key": "a", "value": "b"}]}], schema=pyarrow_list)
    file_path = tmp_path_factory.mktemp('test_statistics') / "test.parquet"
    pq.write_table(tbl, file_path)

    parquet_metadata = pq.read_metadata(file_path)

    iceberg_schema = Schema(
        NestedField(
            1,
            "extras",
            ListType(
                10,
                StructType(
                    NestedField(10, "key", StringType()),
                    NestedField(11, "value", StringType()),
                ),
                element_required=False,
            ),
        )
    )

    statistics = data_file_statistics_from_parquet_metadata(
        parquet_metadata=parquet_metadata,
        stats_columns=compute_statistics_plan(iceberg_schema, EMPTY_DICT),
        parquet_column_mapping=parquet_path_to_id_mapping(iceberg_schema),
    )

    assert statistics == DataFileStatistics(
        record_count=1,
        column_sizes={10: 51, 11: 51},
        value_counts={10: 1, 11: 1},
        null_value_counts={10: 1, 11: 1},
        nan_value_counts={},
        column_aggregates={},
        split_offsets=[4],
    )
cgbur commented 5 months ago

Here is a complete example recreating the error. Here I am using polars to make the table which results in the same schema that I am producing with pyarrow.

import polars as pl
from pyiceberg.catalog.sql import SqlCatalog
import pyarrow.parquet as pq
import os
import shutil

pl.DataFrame(
    {
        "a": [[{"a": 1}, {"a": 2}], [{"a": 3}]],
    }
).write_parquet("example.parquet")

warehouse_path = "/tmp/warehouse"

# wipe the warehouse
if os.path.exists(warehouse_path):
    shutil.rmtree(warehouse_path)

os.makedirs(warehouse_path)

catalog = SqlCatalog(
    "default",
    **{
        "uri": f"sqlite:///{warehouse_path}/pyiceberg_catalog.db",
        "warehouse": f"file://{warehouse_path}",
    },
)
df = pq.read_table("example.parquet")
catalog.create_namespace("default")
table = catalog.create_table(
    "default.example",
    schema=df.schema,
)
table.add_files(["example.parquet"])

And here is the error. The top two lines were debug statements showing how the mapping file has the incorrect path.

    print(f"column mappings {len(parquet_column_mapping)}")
    print(parquet_column_mapping)
column mappings 1
{'a.list.element.a': 3}
Traceback (most recent call last):
  File "/home/cgbur/pyice-test/failure.py", line 36, in <module>
    table.add_files(["example.parquet"])
  File "/local/home/cgbur/pyice-test/iceberg-python/pyiceberg/table/__init__.py", line 1355, in add_files
    tx.add_files(file_paths=file_paths, snapshot_properties=snapshot_properties)
  File "/local/home/cgbur/pyice-test/iceberg-python/pyiceberg/table/__init__.py", line 462, in add_files
    for data_file in data_files:
  File "/local/home/cgbur/pyice-test/iceberg-python/pyiceberg/table/__init__.py", line 2737, in _parquet_files_to_data_files
    yield from parquet_files_to_data_files(io=io, table_metadata=table_metadata, file_paths=iter(file_paths))
  File "/local/home/cgbur/pyice-test/iceberg-python/pyiceberg/io/pyarrow.py", line 1869, in parquet_files_to_data_files
    statistics = data_file_statistics_from_parquet_metadata(
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/local/home/cgbur/pyice-test/iceberg-python/pyiceberg/io/pyarrow.py", line 1734, in data_file_statistics_from_parquet_metadata
    field_id = parquet_column_mapping[column.path_in_schema]
               ~~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^
KeyError: 'a.list.item.a'

You can see how the parquet path_in_schema using the item instead of element.

cgbur commented 5 months ago

Ah, confusingly there appears to be writer differences that cause the issue. My Rust pyarrow implementation matches when polars has pyarrow=True.

import polars as pl
import pyarrow.parquet as pq

df = pl.DataFrame(
    {
        "a": [[{"a": 1}, {"a": 2}], [{"a": 3}]],
    }
)

def print_schema_path(path, col_name):
    metadata = pq.read_metadata(path)
    for group_number in range(metadata.num_row_groups):
        row_group = metadata.row_group(group_number)
        for column_number in range(row_group.num_columns):
            column = row_group.column(column_number)
            if column.path_in_schema.startswith(col_name):
                print(f"path_in_schema: {column.path_in_schema}")

df.write_parquet("example.parquet", use_pyarrow=False)
print("with polars")
print(pq.read_schema("example.parquet"))
print_schema_path("example.parquet", "a")
df.write_parquet("example.parquet", use_pyarrow=True)
print("with pyarrow")
print(pq.read_schema("example.parquet"))
print_schema_path("example.parquet", "a")
with polars
a: large_list<item: struct<a: int64>>
  child 0, item: struct<a: int64>
      child 0, a: int64
path_in_schema: a.list.item.a
with pyarrow
a: large_list<element: struct<a: int64>>
  child 0, element: struct<a: int64>
      child 0, a: int64
path_in_schema: a.list.element.a

Perhaps the visitor is not respecting the name used in the schema? Or there is a mismatch in the method used to acquire between the iceberg and parquet change? When use_pyarrow=True is set, the example I pasted above does not error out which makes me think something is lost in translation.

felixscherz commented 5 months ago

Hi, I investigated this a bit further and it seems to be related to the way the visitor works as @cgbur suggested. Here is what I tried:

def test_parquet_path_to_id_mapping():
    # set field name to "item"
    pyarrow_list = pa.schema([
            pa.field("extras", pa.list_(pa.field("item", pa.struct([pa.field("key", pa.string()), pa.field("value", pa.string())]))))
        ])

    # this is called during table creation
    schema = Catalog._convert_schema_if_needed(pyarrow_list)

    mapping = parquet_path_to_id_mapping(schema)
    assert "extras.list.item.key" in mapping  # raises error

The mapping that Catalog._convert_schema_if_needed creates overrides the field name and sets it to "element":

{'extras.list.element.key': -1, 'extras.list.element.value': -1}

Looking into the visitor I found the method dealing with list types sets a default field name of "elements". https://github.com/apache/iceberg-python/blob/20c273104257f0a1ccd74a09f6d4601643115ffd/pyiceberg/io/pyarrow.py#L865-L870 https://github.com/apache/iceberg-python/blob/20c273104257f0a1ccd74a09f6d4601643115ffd/pyiceberg/io/pyarrow.py#L172

So we lose the information on the field name of the value field, setting it to "elements".

~Unfortunately I haven't found a way to access the field name as both pyarrow.lib.ListType and pyarrow.lib.DataType don't seem to make that available.~

We can access the name with the name attribute, maybe we can use that instead of going with a default of "element"?

sungwy commented 4 months ago

Hi @cgbur and @felixscherz thank you for raising this and taking this investigation further. I'm not a polars user myself, but the difference in the behavior is quite interesting, and I think there would be value in trying to fix this issue.

I just read the write_table API documentation in pyarrow.parquet and found something rather interesting:

https://arrow.apache.org/docs/python/generated/pyarrow.parquet.write_table.html#pyarrow.parquet.write_table

If you check the documentation for use_compliant_nested_type flag, it mentions that having element as the single item field name is the parquet compliant format as specified here on the Parquet Spec for Nested Types. PyArrow defaults to using this flag and writes the list element field name as element.

For some reason it looks like polars has decided to use item - the non-parquet compliant list element name instead. While I'm curious to know why the polars community has decided to go this route and whether this is common practice in the Parquet community, here are some opinions I've formed on this issue for now:

  1. This is an issue we are running into only when we are trying to add the files of a polars produced parquet file as Iceberg data files. We do not have the same issues when writing parquet files using existing Iceberg clients.
  2. While adding support for both element or item name in the function will be very simple, and will allow us to increase our scope of support for add_files API, item is not the correct name for the single value field in the LIST field type according to the Parquet Spec. As a result, adding this as a data file to an Iceberg table may have unintended consequences for other parts of our infrastructure that make similar assumptions in accordance to the Parquet Spec. Hence, my hunch is that we should review the potential downstream impact of this change, carefully.
cgbur commented 4 months ago

Here is the code in Polars where the "item" name crops up.

https://github.com/pola-rs/polars/blob/main/crates/polars-core/src/datatypes/dtype.rs#L575

I believe this is because they are not converting to parquet, but to arrow, then parquet. A subtle difference in their internal logic. However, perhaps the confusion arises because in arrow, the List single element name is often item not element.

https://arrow.apache.org/docs/format/Columnar.html#recordbatch-message

import pyarrow as pa

py_list = pa.array([[1, 2, 3], [1, 2]])
print(py_list.type)
list<item: int64>

I'll go ahead and open an issue on the Polars repo and see if they have anything to say, there are multiple ways to fix this in their package and agree they should likely be producing parquet files according to the spec.