apache / iceberg

Apache Iceberg
https://iceberg.apache.org/
Apache License 2.0
6.37k stars 2.2k forks source link

Python: Infer Iceberg schema from the Parquet file #6505

Closed Fokko closed 1 year ago

Fokko commented 1 year ago

Feature Request / Improvement

In PyIceberg we rely on fetching the schema from the Parquet metadata. If this is not available (because the parquet file is written by something else than an Iceberg writer), we want to go over the actual schema and construct the Iceberg schema from it.

Query engine

None

JonasJ-ap commented 1 year ago

I'm interested in solving this issue. Would you mind assigning it to me? Thank you so much!

Fokko commented 1 year ago

@JonasJ-ap Anything I can help with? If you don't have time, maybe @amogh-jahagirdar is interested in picking this up. I'd love to get this in 0.4.0

JonasJ-ap commented 1 year ago

Sorry that I haven't got enough time to work this out. @amogh-jahagirdar please feel free to pick this up if you are interested in.

kiran94 commented 1 year ago

Hello, I wanted to report that I've also observed this issue. Adding some details about how I got into this state in case it's helpful.

I've created an iceberg table via AWS glue:

partition_column = 'id'
partition_bucket_size = 4
udf_name = 'iceberg_bucket_long_' + str(partition_bucket_size)

spark.sparkContext._jvm.org.apache.iceberg.spark.IcebergSpark.registerBucketUDF(
        spark._jsparkSession, udf_name, spark.sparkContext._jvm.org.apache.spark.sql.types.DataTypes.LongType, partition_bucket_size)

df = df.sortWithinPartitions(F.expr(f"{udf_name}({partition_column})"))

df = df.writeTo('my_iceberg_table') \
        .partitionedBy(F.bucket(partition_bucket_size, partition_column))
        .createOrReplace()

At this point I could read the table fine via Athena and pyiceberg. However it led to many small files being which I believe was leading to poor query performance so I decided to follow https://docs.aws.amazon.com/athena/latest/ug/querying-iceberg-data-optimization.html

OPTIMIZE my_iceberg_table REWRITE DATA USING BIN_PACK

After this had completed successfully, I was able to still query the table from Athena but no longer from pyiceberg:

ValueError: Iceberg schema is not embedded into the Parquet file, see https://github.com/apache/iceberg/issues/6505

Let me know if there are any more details I can provide

Guillem96 commented 1 year ago

Just for further information I'll add here a code snippet that leads to the same error message

from pyiceberg.catalog import load_catalog
from pyiceberg.expressions import EqualTo

# pyiceberg.yaml
# catalog:
#   default:
#     type: glue
#     py-io-impl: pyiceberg.io.pyarrow.PyArrowFileIO

catalog = load_catalog(
    "default",
    warehouse="...",
)

table = catalog.load_table(("...", "..."))

df = (
    table.scan()
    .filter(EqualTo("uuid", "..."))
    .select("rt", "cs1", "in")
    .to_arrow()
)

print(df)
JonasJ-ap commented 1 year ago

I created a draft PR #6997 containing a raw visitor to support inferring iceberg schema and verified that the new feature could solve the problem described above and in #6647. @amogh-jahagirdar Please let me know if you are working on this or still interested in picking this up. I am willing to re-pick this issue if you do not have enough time.

bigluck commented 1 year ago

Ciao @Fokko, maybe I'm facing a similar issue, but I'm a bit confused. I'm using Glue and I'm querying an iceberg table created with Dremio.

The table in question derives from the open dataset of NY taxis.

import os
from pyiceberg.catalog import load_glue

catalog = load_glue(name='biglake', conf={})
table = catalog.load_table('biglake.taxi_dremio_by_month')

print(table.identifier)
print(table.metadata)
print(table.metadata_location)

con = table.scan().to_duckdb(table_name='taxi')
print(con.execute('SELECT COUNT(*) FROM taxi').fetchall())

This is the output:

('biglake', 'taxi_dremio_by_month')

location='s3://my-s3-bucket/biglake/taxi_dremio_by_month'
table_uuid=UUID('80a4d129-4919-4b2a-8784-ec845a853130')
last_updated_ms=1677495898549
last_column_id=24
schemas=[
    Schema(
        NestedField(field_id=1, name='hvfhs_license_num', field_type=StringType(), required=False),
        NestedField(field_id=2, name='dispatching_base_num', field_type=StringType(), required=False),
        NestedField(field_id=3, name='originating_base_num', field_type=StringType(), required=False),
        NestedField(field_id=4, name='request_datetime', field_type=TimestamptzType(), required=False),
        NestedField(field_id=5, name='on_scene_datetime', field_type=TimestamptzType(), required=False),
        NestedField(field_id=6, name='pickup_datetime', field_type=TimestamptzType(), required=False),
        NestedField(field_id=7, name='dropoff_datetime', field_type=TimestamptzType(), required=False),
        NestedField(field_id=8, name='PULocationID', field_type=LongType(), required=False),
        NestedField(field_id=9, name='DOLocationID', field_type=LongType(), required=False),
        NestedField(field_id=10, name='trip_miles', field_type=DoubleType(), required=False),
        NestedField(field_id=11, name='trip_time', field_type=LongType(), required=False),
        NestedField(field_id=12, name='base_passenger_fare', field_type=DoubleType(), required=False),
        NestedField(field_id=13, name='tolls', field_type=DoubleType(), required=False),
        NestedField(field_id=14, name='bcf', field_type=DoubleType(), required=False),
        NestedField(field_id=15, name='sales_tax', field_type=DoubleType(), required=False),
        NestedField(field_id=16, name='congestion_surcharge', field_type=DoubleType(), required=False),
        NestedField(field_id=17, name='airport_fee', field_type=IntegerType(), required=False),
        NestedField(field_id=18, name='tips', field_type=DoubleType(), required=False),
        NestedField(field_id=19, name='driver_pay', field_type=DoubleType(), required=False),
        NestedField(field_id=20, name='shared_request_flag', field_type=StringType(), required=False),
        NestedField(field_id=21, name='shared_match_flag', field_type=StringType(), required=False),
        NestedField(field_id=22, name='access_a_ride_flag', field_type=StringType(), required=False),
        NestedField(field_id=23, name='wav_request_flag', field_type=StringType(), required=False),
        NestedField(field_id=24, name='wav_match_flag', field_type=StringType(), required=False),
        schema_id=0,
        identifier_field_ids=[]
    )
]
current_schema_id=0
partition_specs=[
    PartitionSpec(
        PartitionField(source_id=4, field_id=1000, transform=MonthTransform(), name='request_datetime_month'),
        spec_id=0
    )
]
default_spec_id=0
last_partition_id=1000
properties={
    'compatibility.snapshot-id-inheritance.enabled': 'true',
    'commit.manifest.target-size-bytes': '153600'
}
current_snapshot_id=666682962113515828
snapshots=[
    Snapshot(
        snapshot_id=666682962113515828,
        parent_snapshot_id=None,
        sequence_number=None,
        timestamp_ms=1677495898549,
        manifest_list='s3://my-s3-bucket/biglake/taxi_dremio_by_month/metadata/snap-666682962113515828-1-133eb191-5a2e-43da-a773-9f87eeb6b495.avro',
        summary=Summary(
            Operation.APPEND, **{
                'added-data-files': '260',
                'added-records': '745287023',
                'total-records': '745287023',
                'total-files-size': '0',
                'total-data-files': '260',
                'total-delete-files': '0',
                'total-position-deletes': '0',
                'total-equality-deletes': '0'
            }
        ),
        schema_id=0
    )
]
snapshot_log=[
    SnapshotLogEntry(
        snapshot_id='666682962113515828',
        timestamp_ms=1677495898549
    )
]
metadata_log=[]
sort_orders=[SortOrder(order_id=0)]
default_sort_order_id=0
refs={
    'main': SnapshotRef(
        snapshot_id=666682962113515828,
        snapshot_ref_type=SnapshotRefType.BRANCH,
        min_snapshots_to_keep=None,
        max_snapshot_age_ms=None,
        max_ref_age_ms=None
    )
}
format_version=1
schema_=Schema(
    NestedField(field_id=1, name='hvfhs_license_num', field_type=StringType(), required=False),
    NestedField(field_id=2, name='dispatching_base_num', field_type=StringType(), required=False),
    NestedField(field_id=3, name='originating_base_num', field_type=StringType(), required=False),
    NestedField(field_id=4, name='request_datetime', field_type=TimestamptzType(), required=False),
    NestedField(field_id=5, name='on_scene_datetime', field_type=TimestamptzType(), required=False),
    NestedField(field_id=6, name='pickup_datetime', field_type=TimestamptzType(), required=False),
    NestedField(field_id=7, name='dropoff_datetime', field_type=TimestamptzType(), required=False),
    NestedField(field_id=8, name='PULocationID', field_type=LongType(), required=False),
    NestedField(field_id=9, name='DOLocationID', field_type=LongType(), required=False),
    NestedField(field_id=10, name='trip_miles', field_type=DoubleType(), required=False),
    NestedField(field_id=11, name='trip_time', field_type=LongType(), required=False),
    NestedField(field_id=12, name='base_passenger_fare', field_type=DoubleType(), required=False),
    NestedField(field_id=13, name='tolls', field_type=DoubleType(), required=False),
    NestedField(field_id=14, name='bcf', field_type=DoubleType(), required=False),
    NestedField(field_id=15, name='sales_tax', field_type=DoubleType(), required=False),
    NestedField(field_id=16, name='congestion_surcharge', field_type=DoubleType(), required=False),
    NestedField(field_id=17, name='airport_fee', field_type=IntegerType(), required=False),
    NestedField(field_id=18, name='tips', field_type=DoubleType(), required=False),
    NestedField(field_id=19, name='driver_pay', field_type=DoubleType(), required=False),
    NestedField(field_id=20, name='shared_request_flag', field_type=StringType(), required=False),
    NestedField(field_id=21, name='shared_match_flag', field_type=StringType(), required=False),
    NestedField(field_id=22, name='access_a_ride_flag', field_type=StringType(), required=False),
    NestedField(field_id=23, name='wav_request_flag', field_type=StringType(), required=False),
    NestedField(field_id=24, name='wav_match_flag', field_type=StringType(), required=False),
    schema_id=0,
    identifier_field_ids=[]
)
partition_spec=[
    {
        'name': 'request_datetime_month',
        'transform': 'month',
        'source-id': 4,
        'field-id': 1000
    }
]

s3://my-s3-bucket/biglake/taxi_dremio_by_month/metadata/00000-2cf6fb41-1c37-4e7c-a35b-3ab1c4670b45.metadata.json

And then it crashes:

Traceback (most recent call last):
  File "/home/ubuntu/src/query.py", line 17, in <module>
    con = table.scan().to_duckdb(table_name='taxi')
  File "/home/ubuntu/src/.venv/lib/python3.10/site-packages/pyiceberg/table/__init__.py", line 360, in to_duckdb
    con.register(table_name, self.to_arrow())
  File "/home/ubuntu/src/.venv/lib/python3.10/site-packages/pyiceberg/table/__init__.py", line 349, in to_arrow
    return project_table(
  File "/home/ubuntu/src/.venv/lib/python3.10/site-packages/pyiceberg/io/pyarrow.py", line 552, in project_table
    tables = pool.starmap(
  File "/usr/lib/python3.10/multiprocessing/pool.py", line 375, in starmap
    return self._map_async(func, iterable, starmapstar, chunksize).get()
  File "/usr/lib/python3.10/multiprocessing/pool.py", line 774, in get
    raise self._value
  File "/usr/lib/python3.10/multiprocessing/pool.py", line 125, in worker
    result = (True, func(*args, **kwds))
  File "/usr/lib/python3.10/multiprocessing/pool.py", line 51, in starmapstar
    return list(itertools.starmap(args[0], args[1]))
  File "/home/ubuntu/src/.venv/lib/python3.10/site-packages/pyiceberg/io/pyarrow.py", line 491, in _file_to_table
    raise ValueError(
ValueError: Iceberg schema is not embedded into the Parquet file, see https://github.com/apache/iceberg/issues/6505

I'm confused because the query is a simple COUNT(*), and I thought pyiceber would read the metadata stored on the metadata folder to get the number of records.

Screenshot 2023-03-03 at 09 31 39

I've also tested PR #6997, but the python operator crashed:

s3://my-s3-bucket/biglake/taxi_dremio_by_month/metadata/00000-2cf6fb41-1c37-4e7c-a35b-3ab1c4670b45.metadata.json
Killed
Fokko commented 1 year ago

@bigluck Thanks for giving it a try.

I'm confused because the query is a simple COUNT(*), and I thought pyiceber would read the metadata stored on the metadata folder to get the number of records.

Unfortunately, with the current DuckDB implementation, it pulls in all the (relevant) data. Since there is no filter on the scan, this means the entire table.

How big is the table? Could it be that it runs out of memory? Running echo $? will tell you the exit code of the process, which might indicate an out-of-memory situation.

bigluck commented 1 year ago

Oh, I've got it, thanks @Fokko . The EC2 I'm using is a t2.medium, 2 vCPU/4GB RAM. It's not big, so it can be the root cause (I'm querying the full hvfhs dataset, ~745,287,023 records)

The exit code is 137, OOM :)

sheinbergon commented 1 year ago

@Fokko @JonasJ-ap what's the status of dealing with this issue? How can I help to have a fix for this included in version 0.4.0?

Fokko commented 1 year ago

@sheinbergon The PR had been merged and will be part of the 0.4.0 release