Query on nested struct field with PyIceberg? #953

Open cfrancois7 opened 1 month ago

cfrancois7 commented 1 month ago


I'm looking for a tutorial to make a query on one subfield of a struct field. I scrolled all internet but failed to find a way to do it simply with pyiceberg.

To make it concret, for instance how to get the row with "employment.status = 'Employed'" :

[{'id': 1,
  'name': 'Alice',
  'age': 28,
  'address': {'street': '123 Maple St',
   'city': 'Springfield',
   'postal_code': '12345'},
  'contact': {'email': 'alice@example.com', 'phone': '555-1234'},
  'employment': {'status': 'Employed',
   'position': 'Software Engineer',
   'company': {'name': 'Tech Corp', 'location': 'Silicon Valley'}},
  'preferences': {'newsletter': True,
   'notifications': {'email': True, 'sms': False}}},
 {'id': 2,
  'name': 'Bob',
  'age': 35,
  'address': {'street': '456 Oak St',
   'city': 'Metropolis',
   'postal_code': '67890'},
  'contact': {'email': 'bob@example.com', 'phone': '555-5678'},
  'employment': {'status': 'Self-employed',
   'position': 'Consultant',
   'company': {'name': 'Freelance', 'location': 'Remote'}},
  'preferences': {'newsletter': False,
   'notifications': {'email': True, 'sms': True}}}]

With the following schema:

 import pyarrow as pa

 schema = pa.schema([
  ('id', pa.int32()),
  ('name', pa.string()),
  ('age', pa.int32()),
  ('address', pa.struct([
      ('street', pa.string()),
      ('city', pa.string()),
      ('postal_code', pa.string())
  ('contact', pa.struct([
      ('email', pa.string()),
      ('phone', pa.string())
  ('employment', pa.struct([
      pa.field('status', pa.string(), nullable=True),
      pa.field('position', pa.string(), nullable=True),
      pa.field('company', pa.struct([
          ('name', pa.string()),
          ('location', pa.string())
      ]), nullable=True)
  ('preferences', pa.struct([
      ('newsletter', pa.bool_()),
      ('notifications', pa.struct([
          ('email', pa.bool_()),
          ('sms', pa.bool_())

I tried this kind of query, but without success:

row_filter = "employment.status = 'Employed'"

    selected_fields=["age", "employment", 'contact.email']

The command raises the error:

ValueError: Could not find field with name status, case_sensitive=True

The backend is supported by SQLite.


$ pip list | grep 'iceberg\|arrow\|sqlite'
arrow                     1.3.0
pyarrow                   15.0.2
pyiceberg                 0.6.1
kevinjqliu commented 1 month ago

Thanks for reporting this! I was able to reproduce this on the latest main branch.


from pyiceberg.catalog.sql import SqlCatalog
import pyarrow as pa

schema = pa.schema([
  ('id', pa.int32()),
  ('name', pa.string()),
  ('age', pa.int32()),
  ('address', pa.struct([
      ('street', pa.string()),
      ('city', pa.string()),
      ('postal_code', pa.string())
  ('contact', pa.struct([
      ('email', pa.string()),
      ('phone', pa.string())
  ('employment', pa.struct([
      pa.field('status', pa.string(), nullable=True),
      pa.field('position', pa.string(), nullable=True),
      pa.field('company', pa.struct([
          ('name', pa.string()),
          ('location', pa.string())
      ]), nullable=True)
  ('preferences', pa.struct([
      ('newsletter', pa.bool_()),
      ('notifications', pa.struct([
          ('email', pa.bool_()),
          ('sms', pa.bool_())

catalog = SqlCatalog("default", **{"uri": "sqlite:///:memory:", "warehouse": "."})
table = catalog.create_table("foo.bar", schema=schema)

# works for just selected_fields
table.scan(selected_fields=["age", "employment", 'contact.email', 'employment.status']).projection()
table.scan(selected_fields=["age", "employment", 'contact.email', 'employment.status']).to_pandas()

# works for regular row filter
table.scan(row_filter="age = '1'", selected_fields=["age", "employment", 'contact.email', 'employment.status']).projection()
table.scan(row_filter="age = '1'", selected_fields=["age", "employment", 'contact.email', 'employment.status']).to_pandas()

# works for projection
table.scan(row_filter="employment.status = 'Employed'", selected_fields=["age", "employment", 'contact.email', 'employment.status']).projection()
# errors for to_pandas
table.scan(row_filter="employment.status = 'Employed'", selected_fields=["age", "employment", 'contact.email', 'employment.status']).to_pandas()

Stack trace:

>>> table.scan(row_filter="employment.status = 'Employed'", selected_fields=["age", "employment", 'contact.email', 'employment.status']).to_pandas()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/Users/kevinliu/repos/iceberg-python/pyiceberg/table/__init__.py", line 2033, in to_pandas
    return self.to_arrow().to_pandas(**kwargs)
  File "/Users/kevinliu/repos/iceberg-python/pyiceberg/table/__init__.py", line 2003, in to_arrow
    return project_table(
  File "/Users/kevinliu/repos/iceberg-python/pyiceberg/io/pyarrow.py", line 1169, in project_table
    bound_row_filter = bind(table_metadata.schema(), row_filter, case_sensitive=case_sensitive)
  File "/Users/kevinliu/repos/iceberg-python/pyiceberg/expressions/visitors.py", line 213, in bind
    return visit(expression, BindVisitor(schema, case_sensitive))
  File "/Users/kevinliu/.pyenv/versions/3.11.0/lib/python3.11/functools.py", line 909, in wrapper
    return dispatch(args[0].__class__)(*args, **kw)
  File "/Users/kevinliu/repos/iceberg-python/pyiceberg/expressions/visitors.py", line 185, in _
    return visitor.visit_unbound_predicate(predicate=obj)
  File "/Users/kevinliu/repos/iceberg-python/pyiceberg/expressions/visitors.py", line 250, in visit_unbound_predicate
    return predicate.bind(self.schema, case_sensitive=self.case_sensitive)
  File "/Users/kevinliu/repos/iceberg-python/pyiceberg/expressions/__init__.py", line 671, in bind
    bound_term = self.term.bind(schema, case_sensitive)
  File "/Users/kevinliu/repos/iceberg-python/pyiceberg/expressions/__init__.py", line 182, in bind
    field = schema.find_field(name_or_id=self.name, case_sensitive=case_sensitive)
  File "/Users/kevinliu/repos/iceberg-python/pyiceberg/schema.py", line 215, in find_field
    raise ValueError(f"Could not find field with name {name_or_id}, case_sensitive={case_sensitive}")
ValueError: Could not find field with name status, case_sensitive=True
kevinjqliu commented 1 month ago

The issue might be in _parse_row_filter function

(Pdb) _parse_row_filter("employment = 'Employed'")
EqualTo(term=Reference(name='employment'), literal=literal('Employed'))

(Pdb) _parse_row_filter("employment.status = 'Employed'")
EqualTo(term=Reference(name='status'), literal=literal('Employed'))

Notice in the second case, the employment field was dropped

kevinjqliu commented 1 month ago

Specifically in the parsing code

from pyiceberg.expressions.parser import parse

parse("employment.status = 'Employed'")
# > EqualTo(term=Reference(name='status'), literal=literal('Employed')) 
kevinjqliu commented 1 month ago

this is an interesting test case https://github.com/apache/iceberg-python/blob/3966263d935b92bcf98877ce9ba8ea26c08a283f/tests/expressions/test_parser.py#L55

I'm not familiar with the parser, should this assertion be true?

kevinjqliu commented 1 month ago

Here is where employment.status becomes status


cfrancois7 commented 1 month ago

I tested many approach to understand what is the expected after the parser. The issue is only about the NestedField of the StructType became available to the row filter.

By the way the projection, so select works fine.

>>> table.scan(
>>>    selected_fields=['employment.status']
>>>    ).to_pandas()

0       {'status': 'Employed'}
1  {'status': 'Self-employed'}
2     {'status': 'Unemployed'}
3       {'status': 'Employed'}
4         {'status': 'Intern'}

>>> table.scan(
>>>    selected_fields=['employment.company.name']
>>>    ).to_pandas()

0       {'company': {'name': 'Tech Corp'}}
1       {'company': {'name': 'Freelance'}}
2              {'company': {'name': None}}
3  {'company': {'name': 'Data Solutions'}}
4  {'company': {'name': 'Market Masters'}}

I tried with dremio to get the SQL-like synthax used for curiosity:

SELECT * FROM nested_sample
WHERE employment['status'] = 'Employed'

It works by passing through duckdb.

import pandas as pd

selected_fields=["age", "employment", 'contact.email']
table_name= 'employees'

conn = table.scan(

query = f"""
    SELECT {','.join(selected_fields)} FROM {table_name}
    WHERE employment['status'] = 'Employed'
results = conn.sql(query).fetchall()
pd.DataFrame(results, columns=selected_fields)
kevinjqliu commented 1 month ago

The issue is only about the NestedField of the StructType became available to the row filter.

This is what I'm seeing as well.

table.scan's row_filter expects a string expression. It's ultimately passed from to_arrow() to project_table here https://github.com/apache/iceberg-python/blob/ee7e9f066df2763ce7d282e29a5b56c69c13de7e/pyiceberg/io/pyarrow.py#L1169-L1173

I think the bug here is that row_filter string expression is not properly parsed for nested structs.

cfrancois7 commented 1 month ago

It is just to explore the code deeper and understand how it works. In fact the error comes from the reference to the employment.status became status as you found. But even when we correct the filter, there is another in the pyarrow scan in a lower level where the scan does not go deeper than the first level.

And status is not related to any field regarding the dictionnary _name_to_id.

Next step, I'll look how the Reference is made or attributed.

>>>row_filter = "employment.status = 'Employed'"
>>>projection = table.scan(
169 def bind(self, schema: Schema, case_sensitive: bool = True) -> BoundReference[L]:
    170     """Bind the reference to an Iceberg schema.
    172     Args:
    180         BoundReference: A reference bound to the specific field in the Iceberg schema.
    181     """
--> 182     field = schema.find_field(name_or_id=self.name, case_sensitive=case_sensitive)
    183     accessor = schema.accessor_for_field(field.field_id)
    184     return self.as_bound(field=field, accessor=accessor)
ValueError: Could not find field with name status, case_sensitive=True

I noticed the function table.schema().find_field(name_or_id='status') raise the error when table.schema().find_field(name_or_id=13) works properly. For fact, the id of the status name is 13 .

The find_field function does not work properly there because find_field works on the dictionnary table.schema()._name_to_id

{'address.street': 8,
 'address.city': 9,
 'address.postal_code': 10,
 'contact.email': 11,
 'contact.phone': 12,
 'employment.status': 13,
 'employment.position': 14,
 'employment.company.name': 16,
 'employment.company.location': 17,
 'employment.company': 15,
 'preferences.newsletter': 18,
 'preferences.notifications.email': 20,
 'preferences.notifications.sms': 21,
 'preferences.notifications': 19,
 'id': 1,
 'name': 2,
 'age': 3,
 'address': 4,
 'contact': 5,
 'employment': 6,
 'preferences': 7}

And I clearly understand that is dangerous to use 'name': 13, when it could exist many name subfield. So in fact, the reference should be Reference('employment.statut') after the parsing.

Also, when now I correct the Reference:

>>>row_filter = EqualTo(Reference('employment.status'), literal('Employed'))
>>>projection = table.scan(

File [~/.anaconda3/envs/my_proj/lib/python3.12/site-packages/pyiceberg/io/pyarrow.py:965](http://localhost:8888/home/machine_learning/.anaconda3/envs/my_proj/lib/python3.12/site-packages/pyiceberg/io/pyarrow.py#line=964), in _task_to_table(fs, task, bound_row_filter, projected_schema, projected_field_ids, positional_deletes, case_sensitive, limit, name_mapping)
    962 if file_schema is None:
    963     raise ValueError(f"Missing Iceberg schema in Metadata for file: {path}")
--> 965 fragment_scanner = ds.Scanner.from_fragment(
    966     fragment=fragment,
    967     schema=physical_schema,
    968     # This will push down the query to Arrow.
    969     # But in case there are positional deletes, we have to apply them first
    970     filter=pyarrow_filter if not positional_deletes else None,
    971     columns=[col.name for col in file_project_schema.columns],
    972 )
    974 if positional_deletes:
    975     # Create the mask of indices that we're interested in
    976     indices = _combine_positional_deletes(positional_deletes, fragment.count_rows())

File [~/.anaconda3/envs/my_proj/lib/python3.12/site-packages/pyarrow/_dataset.pyx:3558](http://localhost:8888/home/machine_learning/.anaconda3/envs/my_proj/lib/python3.12/site-packages/pyarrow/_dataset.pyx#line=3557), in pyarrow._dataset.Scanner.from_fragment()

File [~/.anaconda3/envs/my_proj/lib/python3.12/site-packages/pyarrow/_dataset.pyx:3327](http://localhost:8888/home/machine_learning/.anaconda3/envs/my_proj/lib/python3.12/site-packages/pyarrow/_dataset.pyx#line=3326), in pyarrow._dataset._populate_builder()

File [~/.anaconda3/envs/my_proj/lib/python3.12/site-packages/pyarrow/_compute.pyx:2700](http://localhost:8888/home/machine_learning/.anaconda3/envs/my_proj/lib/python3.12/site-packages/pyarrow/_compute.pyx#line=2699), in pyarrow._compute._bind()

File [~/.anaconda3/envs/my_proj/lib/python3.12/site-packages/pyarrow/error.pxi:154](http://localhost:8888/home/machine_learning/.anaconda3/envs/my_proj/lib/python3.12/site-packages/pyarrow/error.pxi#line=153), in pyarrow.lib.pyarrow_internal_check_status()

File [~/.anaconda3/envs/my_proj/lib/python3.12/site-packages/pyarrow/error.pxi:91](http://localhost:8888/home/machine_learning/.anaconda3/envs/my_proj/lib/python3.12/site-packages/pyarrow/error.pxi#line=90), in pyarrow.lib.check_status()

ArrowInvalid: No match for FieldRef.Name(status) in id: int32
name: string
age: int32
address: struct<street: string, city: string, postal_code: string>
contact: struct<email: string, phone: string>
employment: struct<status: string, position: string, company: struct<name: string, location: string>>
preferences: struct<newsletter: bool, notifications: struct<email: bool, sms: bool>>
kevinjqliu commented 1 month ago

Thanks for stepping through the code. I believe fetching columns by ID is the recommended path. It seems like there are two issues. One with parsing the row filter expression and another with finding the specific column referenced in the expression.

sungwy commented 1 month ago

@cfrancois7 thank you for raising this tricky issue, and @kevinjqliu for spending the time to step through the issue as well to do the root cause investigation!

I've put up this fix in an attempt to solve this issue. @cfrancois7 would you be able to confirm if the version of code on the PR resolves this issue?


cfrancois7 commented 1 month ago

The first issue regarding the parsing is resolved by the PR. But the second issue related to the pyarrow command is still there: ArrowInvalid: No match for FieldRef.Name(status) in id: int32

File [~/projects/mpdata/my_proj/notebooks/pyiceberg/io/pyarrow.py:1195](http://localhost:8888/notebooks/pyiceberg/io/pyarrow.py#line=1194), in _task_to_record_batches(fs, task, bound_row_filter, projected_schema, projected_field_ids, positional_deletes, case_sensitive, name_mapping)
   1192 if file_schema is None:
   1193     raise ValueError(f"Missing Iceberg schema in Metadata for file: {path}")
-> 1195 fragment_scanner = ds.Scanner.from_fragment(
   1196     fragment=fragment,
   1197     # With PyArrow 16.0.0 there is an issue with casting record-batches:
   1198     # https://github.com/apache/arrow/issues/41884
   1199     # https://github.com/apache/arrow/issues/43183
   1200     # Would be good to remove this later on
   1201     schema=_pyarrow_schema_ensure_large_types(physical_schema),
   1202     # This will push down the query to Arrow.
   1203     # But in case there are positional deletes, we have to apply them first
   1204     filter=pyarrow_filter if not positional_deletes else None,
   1205     columns=[col.name for col in file_project_schema.columns],
   1206 )
   1208 current_index = 0
   1209 batches = fragment_scanner.to_batches()

File [~/.anaconda3/envs/my_proj/lib/python3.12/site-packages/pyarrow/_dataset.pyx:3558](http://localhost:8888/home/machine_learning/.anaconda3/envs/my_proj/lib/python3.12/site-packages/pyarrow/_dataset.pyx#line=3557), in pyarrow._dataset.Scanner.from_fragment()

File [~/.anaconda3/envs/my_proj/lib/python3.12/site-packages/pyarrow/_dataset.pyx:3327](http://localhost:8888/home/machine_learning/.anaconda3/envs/my_proj/lib/python3.12/site-packages/pyarrow/_dataset.pyx#line=3326), in pyarrow._dataset._populate_builder()

File [~/.anaconda3/envs/my_proj/lib/python3.12/site-packages/pyarrow/_compute.pyx:2700](http://localhost:8888/home/machine_learning/.anaconda3/envs/my_proj/lib/python3.12/site-packages/pyarrow/_compute.pyx#line=2699), in pyarrow._compute._bind()

File [~/.anaconda3/envs/my_proj/lib/python3.12/site-packages/pyarrow/error.pxi:154](http://localhost:8888/home/machine_learning/.anaconda3/envs/my_proj/lib/python3.12/site-packages/pyarrow/error.pxi#line=153), in pyarrow.lib.pyarrow_internal_check_status()

File [~/.anaconda3/envs/my_proj/lib/python3.12/site-packages/pyarrow/error.pxi:91](http://localhost:8888/home/machine_learning/.anaconda3/envs/my_proj/lib/python3.12/site-packages/pyarrow/error.pxi#line=90), in pyarrow.lib.check_status()

ArrowInvalid: No match for FieldRef.Name(status) in id: int32
name: large_string
age: int32
address: struct<street: large_string, city: large_string, postal_code: large_string>
contact: struct<email: large_string, phone: large_string>
employment: struct<status: large_string, position: large_string, company: struct<name: large_string, location: large_string>>
preferences: struct<newsletter: bool, notifications: struct<email: bool, sms: bool>>
sungwy commented 1 month ago

The first issue regarding the parsing is resolved by the PR.

But the second issue related to the pyarrow command is still there: ArrowInvalid: No match for FieldRef.Name(status) in id: int32

Thank you for checking @cfrancois7 . I'll put in a bit more time into this issue later in the evening to see if we can get it fixed for this release.

sungwy commented 1 month ago

Hi @cfrancois7 I investigated this a little more, and I think there could be a limitation on pyarrow in supporting predicate pushdown for nested fields. Here's a related open issue I found on this topic: https://github.com/apache/arrow/issues/20203

I'm running into the following error when I make a change to the pyarrow expression to attempt to refer to the column name as 'employment.status'

E   pyarrow.lib.ArrowInvalid: No match for FieldRef.Name(employment.status) in id: int32
E   name: large_string
E   employment: struct<status: large_string, position: large_string>
kevinjqliu commented 1 month ago

Here's something interesting.

I think the pyarrow_filter here is wrong.

(Pdb) bound_row_filter
BoundEqualTo(term=BoundReference(field=NestedField(field_id=13, name='status', field_type=StringType(), required=False), accessor=Accessor(position=5,inner=Accessor(position=0,inner=None))), literal=literal('Employed'))
(Pdb) translated_row_filter
EqualTo(term=Reference(name='employment.status'), literal=literal('Employed'))
(Pdb) bound_file_filter
BoundEqualTo(term=BoundReference(field=NestedField(field_id=13, name='status', field_type=StringType(), required=False), accessor=Accessor(position=5,inner=Accessor(position=0,inner=None))), literal=literal('Employed'))
(Pdb) pyarrow_filter
<pyarrow.compute.Expression (status == "Employed")>

The pyarrow_filter expression dropped its parent reference.

Setting pyarrow_filter = None allows pyarrow to scan successfully.

(Pdb) pyarrow_filter = None
(Pdb) c
   age                         contact                                         employment
0   28  {'email': 'alice@example.com'}  {'status': 'Employed', 'position': 'Software E...
1   35    {'email': 'bob@example.com'}  {'status': 'Self-employed', 'position': 'Consu...

Code to repro

from pyiceberg.catalog.sql import SqlCatalog
import pyarrow as pa

data = [{'id': 1,
  'name': 'Alice',
  'age': 28,
  'address': {'street': '123 Maple St',
   'city': 'Springfield',
   'postal_code': '12345'},
  'contact': {'email': 'alice@example.com', 'phone': '555-1234'},
  'employment': {'status': 'Employed',
   'position': 'Software Engineer',
   'company': {'name': 'Tech Corp', 'location': 'Silicon Valley'}},
  'preferences': {'newsletter': True,
   'notifications': {'email': True, 'sms': False}}},
 {'id': 2,
  'name': 'Bob',
  'age': 35,
  'address': {'street': '456 Oak St',
   'city': 'Metropolis',
   'postal_code': '67890'},
  'contact': {'email': 'bob@example.com', 'phone': '555-5678'},
  'employment': {'status': 'Self-employed',
   'position': 'Consultant',
   'company': {'name': 'Freelance', 'location': 'Remote'}},
  'preferences': {'newsletter': False,
   'notifications': {'email': True, 'sms': True}}}]

schema = pa.schema([
  ('id', pa.int32()),
  ('name', pa.string()),
  ('age', pa.int32()),
  ('address', pa.struct([
      ('street', pa.string()),
      ('city', pa.string()),
      ('postal_code', pa.string())
  ('contact', pa.struct([
      ('email', pa.string()),
      ('phone', pa.string())
  ('employment', pa.struct([
      pa.field('status', pa.string(), nullable=True),
      pa.field('position', pa.string(), nullable=True),
      pa.field('company', pa.struct([
          ('name', pa.string()),
          ('location', pa.string())
      ]), nullable=True)
  ('preferences', pa.struct([
      ('newsletter', pa.bool_()),
      ('notifications', pa.struct([
          ('email', pa.bool_()),
          ('sms', pa.bool_())
pyarrow_table = pa.Table.from_pylist(data, schema=schema)

catalog = SqlCatalog("default", **{"uri": "sqlite:///:memory:", "warehouse": "."})
table = catalog.create_table("foo.bar", schema=schema)
table.scan(row_filter="employment.status = 'Employed'", selected_fields=["age", "employment", 'contact.email', 'employment.status']).to_pandas()
cfrancois7 commented 1 month ago

I get the right code for pyarrow to works.

With the same data as previously shared:

import pyarrow as pa
import pyarrow.compute as pc

table_pa = table.scan().to_arrow() # to parse pyiceberg table into pyarrow one

# Extract the field using struct_field
employment_status = pc.field('employment', 'status')

# Query the table
status_employed = pc.equal(employment_status, pa.scalar('Employed'))

# Filter the table based on the employment status
filtered_table = table_pa.filter(status_employed)

# Convert the filtered table back to a list of dictionaries
filtered_data = filtered_table.to_pylist()

pd.DataFrame(filtered_data) # success

For three levels:

import pyarrow as pa
import pyarrow.compute as pc

employment_status = pc.field('employment', 'company', 'name')
status_employed = pc.equal(employment_status, pa.scalar('Freelance'))
filtered_table = table_pa.filter(status_employed)
filtered_data = filtered_table.to_pylist()

The right pc.field instanciation is pc.field('employment', 'status') and not pc.field('employment.status') for pyarrow.

sungwy commented 1 month ago

The right pc.field instanciation is pc.field('employment', 'status') and not pc.field('employment.status') for pyarrow.

Thank you very much for the pointer @cfrancois7 👍 I was trying to fix this issue while I was a bit tight on time the last two days, and it looks like I glossed over that detail :) I'll update the PR up now with this suggestion to get it to work

kevinjqliu commented 1 month ago

The right pc.field instanciation is pc.field('employment', 'status') and not pc.field('employment.status') for pyarrow.

THANK YOU! It took me forever yesterday and I still couldn't find how to access pyarrow nested fields. It was hidden in the pyarrow.dataset.field documentation all along!! Maybe someone will find this comment via Google next time.

pyarrow.dataset.field(name_or_index) name_or_indexstr, multiple strings, tuple or int The name or index of the (possibly nested) field the expression references to.

sungwy commented 1 month ago

As discussed in the PR attempting to fix this issue, we will need to propose a syntax for describing nested fields in a row_filter. This is unfortunately not an easy task with a clear answer, as any choice of delimiter may result in name collision.

We’ve moved this issue to 0.8.0 milestone due to the complexity of above issue.

sungwy commented 1 week ago

Moved to 0.9.0 milestone as https://github.com/apache/iceberg-python/pull/963 is a prerequisite for fixing this issue