pola-rs / polars

Dataframes powered by a multithreaded, vectorized query engine, written in Rust
https://docs.pola.rs
Other
30.51k stars 1.98k forks source link

Another Approach for scan_delta #12217

Closed aersam closed 8 months ago

aersam commented 1 year ago

Description

Hi there

scan_delta currently uses the PyArrow Dataset Feature of the Deltalake Package. So basically scan_delta is just a thin wrapper around scan_pyarrow_dataset.

This has some downsides:

I don't think PyArrow datasets will the missing features anytime soon, so I did something different which is working for deltalake version 2 tables:

import polars as pl
from deltalake import DeltaTable
import pyarrow.parquet as pq
import os

def scan_delta_union(delta_table: DeltaTable) -> pl.LazyFrame:
    colmaps: dict[str, str] = dict()
    for field in delta_table.schema().fields:
        colmaps[field.name] = field.metadata.get("delta.columnMapping.physicalName", field.name)
    all_ds = []
    for ac in delta_table.get_add_actions(flatten=True).to_pylist():
        fullpath = os.path.join(delta_table.table_uri, ac["path"])
        base_ds = pl.scan_parquet(fullpath, storage_options=delta_table._storage_options)
        selects = []
        part_cols = []
        for ln, pn in colmaps.items():
            if "partition_values" in ac and pn in ac["partition_values"]:
                part_vl = ac["partition_values"][pn]
                part_cols.append(pl.lit(part_vl).alias(ln))
            elif "partition." + pn in ac:
                part_vl = ac["partition." + pn]
                part_cols.append(pl.lit(part_vl).alias(ln))
            else:
                selects.append(pl.col(pn).alias(ln))
        ds = base_ds.select(*selects).with_columns(*part_cols)
        all_ds.append(ds)
    return pl.concat(all_ds)

Basically I'm just doing a union of the files which the partition columns as lit columns to let polars optimize filters on partition columns. This missing here:

ion-elgreco commented 1 year ago

Read speeds will be still slower with polars parquet reader. PyArrow has faster IO with parquet.

Also, you can pass a list of paths nowadays to scan_parquet with hive_partitioning to True. This simplifies your code a lot

ion-elgreco commented 1 year ago

Hmm interesting, Polars reader is 2x faster than pyarrow dataset

def custom_read_delta(dt: DeltaTable):
    colmaps: dict[str, str] = dict()
    for field in dt.schema().fields:
        colmaps[field.name] = field.metadata.get("delta.columnMapping.physicalName", field.name)

    return pl.scan_parquet(source=dt.file_uris(), hive_partitioning=True).rename(colmaps).collect()
dominikpeter commented 1 year ago

@aersam Would it even be possible to support deletion vectors with this approach?

aersam commented 1 year ago

@aersam Would it even be possible to support deletion vectors with this approach?

Yes. Decoding the Deletion Vectors is not that easy, but would be possible (I've done that in Rust already some time ago)

aersam commented 1 year ago

Hive Partitioning only works for lower Delta Versions, newer Versions often use Random File Prefixes

dominikpeter commented 1 year ago

@aersam But the rows can be exposed by the delta-rs API, no? Could it not be filtered with Polars filter method?

ion-elgreco commented 1 year ago

Actually, it's only faster with local files in some cases. Reading a large delta table on Azure is read in 38 secs with PyArrow. Polars simply errors out with connection error after 2,5 miute

ComputeError: Generic MicrosoftAzure error: Error performing token request: response error "request error", after 0 retries: error sending request for url

dominikpeter commented 1 year ago

Actually, it's only faster with local files in some cases. Reading a large delta table on Azure is read in 38 secs with PyArrow. Polars simply errors out with connection error after 2,5 miute

ComputeError: Generic MicrosoftAzure error: Error performing token request: response error "request error", after 0 retries: error sending request for url

Pretty sure Polars has some improvement potential here😉

ion-elgreco commented 1 year ago

Reading the files one by one with @aersam's code will work, but it's slower than PyArrow and you also need to fix the datetime casting inside the function.

image

Actually, it's only faster with local files in some cases. Reading a large delta table on Azure is read in 38 secs with PyArrow. Polars simply errors out with connection error after 2,5 miute

ComputeError: Generic MicrosoftAzure error: Error performing token request: response error "request error", after 0 retries: error sending request for url

Actually, this is due to me forgetting to add storage_options to scan_parquet 😅, however it complains now that schema of all files in single scan_parquet must be equal

aersam commented 1 year ago

Well, the data types mapping is something one would need to have a deeper look. Delta supports fewer Data Types then Parquet, so should be relatively easy (see spec)

I also had a deeper look into the Hive Partitioning stuff, it seems to be an implementation detail you cannot rely on. Databricks uses Hive Partitioning for Delta Lake v1, as soon as you turn on Column Mapping you have to use Random File Prefixes - they claim it optimizes distribution across S3 servers (see here). There nothing mentioned about it in the spec, however.

The Deletion Vectors are currently exposed as pretty basic metadata in delta-rs only, I could do a PR there to expose the actual indexes to be skipped, which requires decoding some Roaring Bitmaps. However I'd wait some time for that, since there are large refactorings ongoing in that repo as part of introducing a delta-kernel. Maybe it's easier to wait for that.

ion-elgreco commented 1 year ago

@aersam we currently also don't support writing anything above v2 in the writer protocol and reader is also still at v1 afaik

aersam commented 1 year ago

Yes, that's true. For writer I guess we're out of luck, supporting v2 will be impossible without support from delta-rs. For reader however it's possible to implement all Delta Lake Features it's sufficient if delta-rs exposes all required metadata.

Our use case is creating the files with Databricks/Spark/Microsoft Fabric and reading them with Polars later on. I can imagine we're not the only one's who would like to do something like that.

ion-elgreco commented 1 year ago

Probably not, but I could argue you don't need spark in many cases :) In most cases vertically scaled compute with Polars is faster.

aersam commented 1 year ago

Yes, but Polars still needs improvements in it's deltalake support 😉

aersam commented 8 months ago

If anyone finds this, there is now a package providing this approach: deltalake2db. I'll close this for now

ion-elgreco commented 8 months ago

@aersam this may be interesting for you, I started working on a native polars reader https://github.com/ion-elgreco/polars-deltalake

aersam commented 8 months ago

Yes, this is promising! I'll watch it and maybe I dare to copy your tests :)