pola-rs / polars

Dataframes powered by a multithreaded, vectorized query engine, written in Rust
https://docs.pola.rs
Other
30.57k stars 1.98k forks source link

Errors in scan_delta and write_delta with nested struct schema evolution (aka adding new field) #19915

Open TinoSM opened 6 days ago

TinoSM commented 6 days ago

Checks

Reproducible example


import polars as pl
# Make sure you extract the  delta_table_with_nested.zip attachment below
pl.scan_delta("./delta_table_with_nested").collect()

print(pl.scan_delta(delta_table_path).collect())
df = pl.DataFrame({
    "id": [1, 2],
    "field1": ["value1", "value2"],
}).with_columns(pl.lit(1).alias("field2").cast(pl.Int32),pl.col("id").cast(pl.Int32))

df = df.with_columns(
    pl.struct(["field1", "field2", pl.lit("x").alias("newcol")]).alias("X")
).select(["id", "X"])

# This will not work either. it works fine if you don't use the rust engine, but pyarrow engine is being deprecated by deltalake and has another issues with nested writes ("fake" nulls in non-null cols...) i'm not mentioning here
df.write_delta(delta_table_path, delta_write_options={"engine": "rust"}, mode="append")
df.write_delta(delta_table_path, delta_write_options={"engine": "rust"}, mode="overwrite")

delta_table_with_nested.zip

Make sure you

This is how I generated the table using Spark 3.4.1

import shutil
from pyspark.sql import SparkSession
import polars as pl
# Initialize Spark session with Delta Lake
spark = SparkSession.builder \
    .appName("DeltaNestedStructure") \
    .config("spark.jars.packages", "io.delta:delta-core_2.12:2.4.0") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

# Define Delta table path
delta_table_path = "/tmp/delta_table_with_nested"
shutil.rmtree(delta_table_path, ignore_errors=True)

# Step 1: Create Delta table with a nested structure X
create_table_query = """
CREATE TABLE delta_table_with_nested (
    id INT,
    X STRUCT<field1: STRING, field2: INT>
)
USING DELTA
LOCATION '{}'
""".format(delta_table_path)

spark.sql(create_table_query)

# Step 2: Insert some data into the table
insert_data_query = """
INSERT INTO delta_table_with_nested VALUES
(1, STRUCT('value1', 10)),
(2, STRUCT('value2', 20))
"""
spark.sql(insert_data_query)

# Step 3: Add a new column `new_col` to the nested structure X using SQL
# This operation does not write new data, just updates the schema.
add_column_query = """
ALTER TABLE delta_table_with_nested 
ADD COLUMNS X.newcol STRING
"""
spark.sql(add_column_query)

# Verify the schema update
df = spark.read.format("delta").load(delta_table_path)
df.show()

print(pl.scan_delta(delta_table_path).collect())
df = pl.DataFrame({
    "id": [1, 2],
    "field1": ["value1", "value2"],
}).with_columns(pl.lit(1).alias("field2").cast(pl.Int32),pl.col("id").cast(pl.Int32))

df = df.with_columns(
    pl.struct(["field1", "field2", pl.lit("x").alias("newcol")]).alias("X")
).select(["id", "X"])

# This will not work either
df.write_delta(delta_table_path, delta_write_options={"engine": "rust"}, mode="append")

spark.stop()

Log output

--------Version info---------
Polars:              1.14.0
Index type:          UInt32
Platform:            macOS-14.7.1-arm64-arm-64bit
Python:              3.11.10 (main, Sep  7 2024, 01:03:31) [Clang 15.0.0 (clang-1500.3.9.4)]
LTS CPU:             False

----Optional dependencies----
adbc_driver_manager  <not installed>
altair               5.4.1
boto3                1.35.36
cloudpickle          <not installed>
connectorx           <not installed>
deltalake            0.17.0
fastexcel            <not installed>
fsspec               2024.10.0
gevent               <not installed>
google.auth          2.36.0
great_tables         <not installed>
matplotlib           <not installed>
nest_asyncio         <not installed>
numpy                1.26.4
openpyxl             <not installed>
pandas               2.2.3
pyarrow              18.0.0
pydantic             2.9.2
pyiceberg            <not installed>
sqlalchemy           <not installed>
torch                <not installed>
xlsx2csv             <not installed>
xlsxwriter           <not installed>
Traceback (most recent call last):
  File "/Users/FlorentinoSainz/Projects/gdt-opendata/fsainz_test_polars_list.py", line 49, in <module>
    print(pl.scan_delta(delta_table_path).collect())
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/FlorentinoSainz/Projects/gdt-opendata/.venv/lib/python3.11/site-packages/polars/lazyframe/frame.py", line 2029, in collect
    return wrap_df(ldf.collect(callback))
                   ^^^^^^^^^^^^^^^^^^^^^
polars.exceptions.SchemaError: dtypes differ for column X: Struct([Field { name: "field1", dtype: Utf8View, is_nullable: true, metadata: {} }, Field { name: "field2", dtype: Int32, is_nullable: true, metadata: {} }]) != Struct([Field { name: "field1", dtype: Utf8View, is_nullable: true, metadata: {} }, Field { name: "field2", dtype: Int32, is_nullable: true, metadata: {} }, Field { name: "newcol", dtype: Utf8View, is_nullable: true, metadata: {} }])

Issue description

  1. I created the table with a nested struct X
  2. I wrote into it
  3. I added a new column within X nested struct (typical case of schema-evolution, adding a new field)
  4. Scans in polars not working anymore (they work in Spark, and in polars with the workaround below)
  5. Also writes in polars not working anymore (they work in Spark, and in polars with the workaround below)

As a workaround I'm doing this instead of using scan_delta (and also instead of scan_parquet, I tried to use polars scan_parquet as much as possible, but I didn't manage to remove the schema when reading the parquets in python code as rust fails if I do so)

            dl_tbl = _get_delta_lake_table(
        table_path=table_path,
        version=kwargs.get("version",None),
        storage_options=storage_options,
        delta_table_options=kwargs.get("delta_table_options",None),
        )
        # .... some other code stolen from pl.scan_delta
        arrow_dataset=ds.dataset(urls, filesystem=fs, # type: ignore
                                 partitioning=part, partition_base_dir=partition_base_dir, 
                                 format="parquet")
        scan_df = pl.scan_pyarrow_dataset(arrow_dataset)
        # This concat fixes the case in which someone added a new column in delta
        # but no parquet file contains data with this column yet
        # also should solve the issue of reading empty dataframes
        # Similar to allow_missing_columns, but also works for nested structs
        schema_from_parquets= scan_df.collect_schema()
        aligned= align_schemas(empty_delta_schema_lf.collect_schema(), schema_from_parquets)
        casted = scan_df.cast(aligned, strict=False) # Doing this to correct Timestamp TZ=None and TimeStamp=UTC conversion

        return pl.concat([empty_delta_schema_lf, casted], how="diagonal_relaxed") #this will add the missing nested columns to the data

I do have this other code for the writing bug aswell

try:
            batch.write_delta(
                target=table_path, mode=mode_batch, storage_options=storage_options, 
                delta_write_options=delta_write_options, **kwargs
            )
        except Exception as e:
            if isinstance(e, SchemaMismatchError) and table.schema==batch.schema:
                logging.info("Retrying with schema override. "
                             f"Got SchemaMistMatchError {str(e)} and the schemas are the same."
                             " This usually happens when a new field is added to the table and "
                             "no parquet file contains it")
                batch.write_delta(
                    target=table_path, mode=mode_batch, storage_options=storage_options, 
                    delta_write_options={**delta_write_options,"schema_mode": "merge"}, **kwargs
                )
            else:
                raise e

Expected behavior

Polars reading the data with a NULL for the parquet files who are missing the data

Installed versions

``` --------Version info--------- Polars: 1.14.0 Index type: UInt32 Platform: macOS-14.7.1-arm64-arm-64bit Python: 3.11.10 (main, Sep 7 2024, 01:03:31) [Clang 15.0.0 (clang-1500.3.9.4)] LTS CPU: False ----Optional dependencies---- adbc_driver_manager altair 5.4.1 boto3 1.35.36 cloudpickle connectorx deltalake 0.17.0 fastexcel fsspec 2024.10.0 gevent google.auth 2.36.0 great_tables matplotlib nest_asyncio numpy 1.26.4 openpyxl pandas 2.2.3 pyarrow 18.0.0 pydantic 2.9.2 pyiceberg sqlalchemy torch xlsx2csv xlsxwriter ```