pola-rs / polars

Dataframes powered by a multithreaded, vectorized query engine, written in Rust
https://docs.pola.rs
Other
27.52k stars 1.68k forks source link

Schema issue when writing new delta tables - parquet schema not valid delta lake schema #9795

Open philszep opened 11 months ago

philszep commented 11 months ago

Checks

Reproducible example

import polars as pl
import deltalake

pl_df = pl.DataFrame([{'this': 'that'}])

pl_df.write_delta('polars_test', mode='overwrite', overwrite_schema=True)
pl_df.write_delta('polars_test', mode='append')

polars_test = deltalake.DeltaTable('polars_test')
polars_test.optimize.compact()

Outputs a DeltaError:

DeltaError: Data does not match the schema or partitions of the table: Unexpected Arrow schema: got: Field { name: "this", data_type: LargeUtf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, expected: Field { name: "this", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }

In this case, if you look at the delta table, it has two parquet files. In the first parquet file the this field is of type large_string whereas in the second the this field is of type string.

Issue description

There is an invalid schema generated when creating a new delta table. This has to do with delta lake not distinguishing between arrow datatypes Utf8 and LargeUtf8.

I believe this is caused by these lines 3307-3314 of frame.py. See pull request #7616.

There, it relies on an existing table to fix the schema to be consistent with a delta table schema. To remedy this we can cast the existing data.schema object to a deltalake schema object and back, for example, I think if we replace the code in frame.py referenced above with:

data_schema = deltalake.Schema.from_pyarrow(data.schema).to_pyarrow()

data = data.cast(data_schema)

then the problem will be resolved for any table that is created.

Expected behavior

New delta table created with valid deltalake schema.

Installed versions

``` --------Version info--------- Polars: 0.18.6 Index type: UInt32 Platform: Windows-10-10.0.19044-SP0 Python: 3.11.4 | packaged by conda-forge | (main, Jun 10 2023, 17:59:51) [MSC v.1935 64 bit (AMD64)] ----Optional dependencies---- adbc_driver_sqlite: connectorx: deltalake: 0.10.0 fsspec: matplotlib: numpy: 1.25.0 pandas: pyarrow: 12.0.1 pydantic: sqlalchemy: xlsx2csv: xlsxwriter: ```
ion-elgreco commented 11 months ago

I ran into the same issue today, I made an upstream issue in delta-rs repo: https://github.com/delta-io/delta-rs/issues/1528

philszep commented 11 months ago

I thought about posting an issue in delta-rs as well, but I thought I saw some issues there about adding support for arrow LargeUtf8 and other data types, so assumed they are at least thinking about addressing it already.

I also felt like it is the duty of the application writing the data to ensure schema consistency on read/write.

The delta transaction protocol doesn't distinguish between Utf8 and LargeUtf8 types -- string columns in parquet files are just byte arrays of arbitrary size anyway. So the issue is with reading and writing a delta table to/from arrow format, which does distinguish Utf8 vs LargeUtf8.

I'm not familiar enough with the delta-rs implementation, but perhaps there is a solution in which delta-rs requires an explicit schema when translating a delta table to arrow format, so that the Utf8 and LargeUtf8 formats are both aliases for a string type in the delta table. So the files defining the delta table don't themselves distinguish Utf8 vs LargeUtf8 (they are both just string type) but the distinction of which arrow type is needed can still be specified on read by the application -- this way the delta tables are still consistent with the protocol. As it stands I don't think the delta-rs library supports reading delta tables that contain fields that require the arrow LargeUtf8 datatype.

Tomperez98 commented 11 months ago

I have encountered the same issue. I wrote a delta-table first in s3 with the following params


data_to_write.write_delta(
            target=s3_location,
            mode="error",
            storage_options={
                "AWS_REGION": self.region_name,
                "AWS_ACCESS_KEY_ID": self.boto_session.get_credentials().access_key,
                "AWS_SECRET_ACCESS_KEY": self.boto_session.get_credentials().secret_key,
                "AWS_S3_ALLOW_UNSAFE_RENAME": "true",
            },
            overwrite_schema=True,
            delta_write_options={
                "partition_by": [
                    "ingested_at_year",
                    "ingested_at_month",
                    "ingested_at_day",
                    "ingested_at_hour",
                ],
                "name":"raw_events",
                "description":"Events loaded from source bucket",
            },

        )

On the next run, it fails due to the following error

E               ValueError: Schema of data does not match table schema
E               Table schema:
E               obj_key: large_string
E               data: large_string
E               ingested_at: timestamp[us, tz=UTC]
E               ingested_at_year: int32
E               ingested_at_month: uint32
E               ingested_at_day: uint32
E               ingested_at_hour: uint32
E               ingested_at_minute: uint32
E               ingested_at_second: uint32
E               Data Schema:
E               obj_key: string
E               data: string
E               ingested_at: timestamp[us]
E               ingested_at_year: int32
E               ingested_at_month: int32
E               ingested_at_day: int32
E               ingested_at_hour: int32
E               ingested_at_minute: int32
E               ingested_at_second: int32

No possible solution I've found

ion-elgreco commented 11 months ago

@philszep I think you can close it here. It's going to be fixed upstream.

edgBR commented 7 months ago

Same issue here btw,

Do you know when it will be fixed upstram?

ion-elgreco commented 7 months ago

@edgBR Actually this is a different issue. Can you create one upstream? Then I will look at it, its probably a trivial fix.

kevinjqliu commented 3 months ago

Ran into a similar issue implementing write support for Iceberg (#15018)

Example to reproduce: A simple polars dataframe

import polars as pl
df = pl.DataFrame(
    {
        "foo": [1, 2, 3, 4, 5],
        "bar": [6, 7, 8, 9, 10],
        "ham": ["a", "b", "c", "d", "e"],
    }
)

Dataframe schema:

> df.schema
OrderedDict([('foo', Int64), ('bar', Int64), ('ham', String)])

Arrow schema:

>df.to_arrow().schema
foo: int64
bar: int64
ham: large_string

.to_arrow() casting string to large_string is causing schema mismatch when parquet writer writes. Not sure why the type is large_string when casting to Arrow.

pyarrow.large_string doc says "This data type may not be supported by all Arrow implementations. Unless you need to represent data larger than 2GB, you should prefer string()."

ion-elgreco commented 3 months ago

@kevinjqliu I resolved it upstream in delta-rs, with the large_dtypes parameter

kevinjqliu commented 3 months ago

Thanks @ion-elgreco I'll take a look at Iceberg's schema handling

ion-elgreco commented 3 months ago

@kevinjqliu actually I may even be able to let go of this parameter in delta-rs if I just always convert to lower for schema check :p

kevinjqliu commented 3 months ago

there seems to be 2 issues to me

  1. Iceberg doesn't handle large_string type gracefully
  2. Polars dataframe changes type when converting to Arrow (string -> large_string)

Looks like in PyIceberg, we're casting large_string into string type (link), I'll open an issue for that.

I have no idea why Polars defaults to large_string when converting to Arrow (link)

deanm0000 commented 3 months ago

Polars isn't changing from string to large_string when it converts to arrow. It doesn't use string, it only uses large_string so for brevity it simply names its own dtype String even though it is backed by arrow's large_string