apache / iceberg-python

Apache PyIceberg
https://py.iceberg.apache.org/
Apache License 2.0
468 stars 170 forks source link

table.scan queries failing sometimes when result is empty #992

Closed jurossiar closed 3 months ago

jurossiar commented 3 months ago

Apache Iceberg version

0.7.0 (latest release)

Please describe the bug 🐞

Testing the new functionalities of pyiceberg 0.7.0 I found that some queries using table.scan through an error when the result is empty sometimes.

ResolveError: Field is required, and could not be found in the file: 1: table_id: required string
image

This doesn't happen with pyiceberg 0.6.0 (the version that we are currently using).

I was able to reproduce the issue consistently with the attached example. I think that this happens with tables with identifier fields (are the cases where I found the issue).

example.zip Contains: requirements.txt -> with the 4 dependencies required. example_scan_error.ipynb -> jupiter notebook where you can see the issue and reproduce it. .env.example: properties to configure.

See video running the same queries with a conda environment with pyiceberg 0.7.0 and pyiceberg 0.6.0 (created with the same requirements.txt just chaning the pyceberg version to 0.6.0). https://github.com/user-attachments/assets/9d21d247-e754-4830-9fe0-55ec4c960319

ndrluis commented 3 months ago

Hello @jurossiar,

I ran some tests and was unable to reproduce the error. Reading the exception, it looks like some files do not have the table_id filled in. Could you create a minimal example that reproduces the error? In your video, you are using a table that already exists. It would be good if the example includes a setup from scratch.

This is the test that I did

from pyiceberg.catalog import load_catalog
import pyarrow as pa
from pyiceberg.schema import Schema
from pyiceberg.types import NestedField, StringType
from pyiceberg.expressions import EqualTo

catalog = load_catalog(
    "demo",
    **{
        "type": "rest",
        "uri": "http://localhost:8181",
        "s3.endpoint": "http://localhost:9000",
        "s3.access-key-id": "admin",
        "s3.secret-access-key": "password",
    },
)

catalog.create_namespace_if_not_exists("default")

schema = Schema(
    NestedField(field_id=1, name="table_id", field_type=StringType(), required=True),
    NestedField(field_id=2, name="name", field_type=StringType(), required=True),
    NestedField(field_id=3, name="dataset", field_type=StringType(), required=True),
    NestedField(
        field_id=4, name="description", field_type=StringType(), required=False
    ),
    identifier_field_ids=[1],
)

data = pa.Table.from_pylist(
    [
        {
            "table_id": "table1",
            "name": "table1",
            "dataset": "default",
            "description": "table1",
        },
        {
            "table_id": "table2",
            "name": "table2",
            "dataset": "default",
            "description": "table2",
        },
    ],
    schema=schema.as_arrow(),
)

try:
    catalog.purge_table("default.some_table")
except:
    pass

table = catalog.create_table("default.some_table", schema=schema)

table.append(data)

result = table.scan(
    selected_fields=(["*"]),
    row_filter=EqualTo("dataset", ""),
)

result.to_pandas()
kevinjqliu commented 3 months ago

Here's what I ran based on the video, I cannot reproduce the issue using 0.7.0.

In Jupyter,

!pip install pyiceberg==0.7.0 --force -q

With pyiceberg integration docker running, docker compose -f dev/docker-compose-integration.yml up -d,

from pyiceberg.catalog import load_catalog
import pyarrow as pa
from pyiceberg.schema import Schema
from pyiceberg.types import NestedField, StringType
from pyiceberg.expressions import EqualTo

catalog = load_catalog(
    "demo",
    **{
        "type": "rest",
        "uri": "http://localhost:8181",
        "s3.endpoint": "http://localhost:9000",
        "s3.access-key-id": "admin",
        "s3.secret-access-key": "password",
    },
)

catalog.create_namespace_if_not_exists("default")

schema = Schema(
    NestedField(field_id=1, name="table_id", field_type=StringType(), required=True),
    NestedField(field_id=2, name="name", field_type=StringType(), required=True),
    NestedField(field_id=3, name="dataset", field_type=StringType(), required=True),
    NestedField(
        field_id=4, name="description", field_type=StringType(), required=False
    ),
    identifier_field_ids=[1],
)

try:
    catalog.purge_table("default.some_table")
except:
    pass

table = catalog.create_table("default.some_table", schema=schema)

schema = pa.schema([
    ("table_id", pa.string(), False),
    ("name", pa.string(), False),
    ("dataset", pa.string(), False),
    ("description", pa.string(), True)
])

# Create data arrays
data = [
    ['id_1', 'id_2', 'id_3', 'id_4'],
    ['name_1', 'name_2', 'name_3', 'name_4'],
    ['dataset_1', 'dataset_1', 'dataset_2', 'dataset_3'],
    ['desc_1', 'desc_2', 'desc_3', 'desc_4']
]

# Create PyArrow Table
table_data = pa.Table.from_arrays(data, schema=schema)
table = catalog.load_table("default.some_table")
table.append(table_data)

from pyiceberg.expressions import BooleanExpression, And, EqualTo, AlwaysTrue

def build_row_filter(equality_key_value: dict) -> BooleanExpression:
    if not equality_key_value or len(equality_key_value) == 0:
        return AlwaysTrue
    else:
        expressions: list[BooleanExpression] = []
        for key in equality_key_value:
            expressions.append(EqualTo(key, equality_key_value[key]))
        if len(expressions) == 1:
            return expressions[0]
        else:
            return And(*expressions)

requested_columns = ["*"]
scan = table.scan(
    selected_fields=(requested_columns),
    row_filter=build_row_filter({
        "dataset": "dataset_1",
    })
)
scan.to_pandas()
kevinjqliu commented 3 months ago

Reading the exception, it looks like some files do not have the table_id filled in.

That's my theory as well. Can you run

table.inspect.files().to_pandas()

This will produce a pandas table referencing all the parquet files for this Iceberg table

For example, Screenshot 2024-08-02 at 3 37 39 PM

jurossiar commented 3 months ago

Sorry! I've just saw the messages. I appended example.zip where you can create the example from scratch. I found the issue with existing tables but I was able to reproduce consistently with the example I provided. The video is only to show what happens with both versions.

jurossiar commented 3 months ago

Here's what I ran based on the video, I cannot reproduce the issue using 0.7.0.

In Jupyter,

!pip install pyiceberg==0.7.0 --force -q

With pyiceberg integration docker running, docker compose -f dev/docker-compose-integration.yml up -d,

from pyiceberg.catalog import load_catalog
import pyarrow as pa
from pyiceberg.schema import Schema
from pyiceberg.types import NestedField, StringType
from pyiceberg.expressions import EqualTo

catalog = load_catalog(
    "demo",
    **{
        "type": "rest",
        "uri": "http://localhost:8181",
        "s3.endpoint": "http://localhost:9000",
        "s3.access-key-id": "admin",
        "s3.secret-access-key": "password",
    },
)

catalog.create_namespace_if_not_exists("default")

schema = Schema(
    NestedField(field_id=1, name="table_id", field_type=StringType(), required=True),
    NestedField(field_id=2, name="name", field_type=StringType(), required=True),
    NestedField(field_id=3, name="dataset", field_type=StringType(), required=True),
    NestedField(
        field_id=4, name="description", field_type=StringType(), required=False
    ),
    identifier_field_ids=[1],
)

try:
    catalog.purge_table("default.some_table")
except:
    pass

table = catalog.create_table("default.some_table", schema=schema)

schema = pa.schema([
    ("table_id", pa.string(), False),
    ("name", pa.string(), False),
    ("dataset", pa.string(), False),
    ("description", pa.string(), True)
])

# Create data arrays
data = [
    ['id_1', 'id_2', 'id_3', 'id_4'],
    ['name_1', 'name_2', 'name_3', 'name_4'],
    ['dataset_1', 'dataset_1', 'dataset_2', 'dataset_3'],
    ['desc_1', 'desc_2', 'desc_3', 'desc_4']
]

# Create PyArrow Table
table_data = pa.Table.from_arrays(data, schema=schema)
table = catalog.load_table("default.some_table")
table.append(table_data)

from pyiceberg.expressions import BooleanExpression, And, EqualTo, AlwaysTrue

def build_row_filter(equality_key_value: dict) -> BooleanExpression:
    if not equality_key_value or len(equality_key_value) == 0:
        return AlwaysTrue
    else:
        expressions: list[BooleanExpression] = []
        for key in equality_key_value:
            expressions.append(EqualTo(key, equality_key_value[key]))
        if len(expressions) == 1:
            return expressions[0]
        else:
            return And(*expressions)

requested_columns = ["*"]
scan = table.scan(
    selected_fields=(requested_columns),
    row_filter=build_row_filter({
        "dataset": "dataset_1",
    })
)
scan.to_pandas()

This query doesn't fail. The issue happens when you have empty results and not with any combination. This is the query that fails:

requested_columns=['*']
scan = table.scan(
        selected_fields = (requested_columns),
        row_filter=build_row_filter({
                    "dataset": "dataset_2_xxxx",
                })
)
scan.to_pandas()
jurossiar commented 3 months ago

Seems that @grobgl was able to reproduce it -> https://github.com/grobgl/iceberg-python/commit/9f28f5ad94fd2f6a480b4cb788e730e57cc5c94c

kevinjqliu commented 3 months ago

Thank you. I was able to reproduce the error with

requested_columns=['*']
scan = table.scan(
        selected_fields = (requested_columns),
        row_filter=build_row_filter({
                    "dataset": "dataset_2_xxxx",
                })
)
scan.to_pandas()

Error:

ResolveError: Field is required, and could not be found in the file: 1: table_id: required string

Looks like @grobgl opened #997 as a possible fix

ndrluis commented 3 months ago

I believe the issue is not that it returns an empty result, because if you filter for a value that does not exist, like dataset_5, it will not return an error. However, if you try to filter for dataset_2_xxxx, it will return an error. It seems that EqualsTo is not working as intended.

cc/ @grobgl

ndrluis commented 3 months ago

@grobgl, I tested your branch with the example provided here, and your solution fixed the problem. I'll double-check why the test you wrote in your pull request is not failing on my end.