delta-io / delta-rs

A native Rust library for Delta Lake, with bindings into Python
https://delta-io.github.io/delta-rs/
Apache License 2.0
2.14k stars 379 forks source link

delta-rs reading parquet from S3 is 2x slower than pyarrow.dataset #1684

Closed someaveragepunter closed 2 months ago

someaveragepunter commented 11 months ago

Description

opening a new issue from discussions from closed issue: https://github.com/delta-io/delta-rs/issues/631#issuecomment-1683036538

Use Case would like to use DeltaTable to read from S3 without needing to work around this performance limitation by reading parquet files using pyarrow instead.

Code Example reading from s3 (blob store) dataset size is roughly 700Mb. a) pyarrow on s3fs is twice as fast as delta-rs b) duckdb is twice as fast when loading all cols but roughly on par when selecting only 2 cols.


import boto3
from swarkn.helpers import timer, init_logger #pip install swarkn
from deltalake import DeltaTable as DT

init_logger()

# simple delta table with 2 parquet files 700Mb and 100Mb, no partitions
path = 's3://nyc-taxi-test/temp/weather_sorted_delta'
columns = ['flow_date', 'ssr'] # column pruning

boto3.setup_default_session()
creds = boto3.DEFAULT_SESSION.get_credentials()

############################## S3FS + pyarrow ##########################

import pyarrow.dataset as ds
from pyarrow import fs

s3 = fs.S3FileSystem()
with timer(): #1s negligible setup cost
    dataset = ds.dataset(path.replace('s3://', ''),
        format="parquet",
        filesystem=s3,
        #partitioning="hive"
    )

with timer(): #80s
    pyarrow_table = dataset.to_table(
        # columns=columns # 8s
    )

#################### DUCKDB #########################
import duckdb

s3 = boto3.resource('s3')
client = boto3.client("s3")
con = duckdb.connect(database=':memory:')

con.execute(f"""
INSTALL httpfs;
LOAD httpfs;
SET s3_region='eu-west-2';
SET s3_access_key_id={creds.access_key};
SET s3_secret_access_key='{creds.secret_key}';
""")

with timer(): # 17s
    out = con.execute(f"select flow_date, ssr from read_parquet('{path}/*.parquet')").df()

with timer(): # 89s
    out = con.execute(f"select * from read_parquet('{path}/*.parquet')").df()

#################### DELTA Lake #########################

with timer():
    det = DT(path,
        storage_options=dict(
            AWS_REGION='eu-west-2',
            AWS_ACCESS_KEY_ID=creds.access_key,
            AWS_SECRET_ACCESS_KEY=creds.secret_key,
            AWS_S3_ALLOW_UNSAFE_RENAME='true'
        )
    )
with timer(): # 160s
    dfpd = det.to_pyarrow_table(
        # columns=columns, # 16s
    )

deltalake.version '0.10.1' duckdb.version '0.8.1' boto3.version '1.28.16' pyarrow.version '12.0.1'

python 3.11 windows

ion-elgreco commented 11 months ago

Related issue: https://github.com/delta-io/delta-rs/issues/1569

This however makes it even faster with pre-buffering:

def read_delta_fast_lazy(
    file: str,
    storage_options: dict | None = None,
    version: int | None = None,
) -> pl.LazyFrame:
    """Fast path to read delta tables lazily, until upstream fixed: https://github.com/delta-io/delta-rs/issues/1569

    Args:
        file (str): file url (abfs://...)
        storage_options (dict): storage_options for fsspec, must contain, client_id, client_secret, tenant_id
        version (int | None, optional): Version number of table. Defaults to None.

    Returns:
        pl.LazyFrame: dataframe
    """
    if storage_options is None:
        fs = fsspec.filesystem("file")
    else:
        storage_options['account_name'] = file.split("@")[1].split('.')[0]
        fs = fsspec.filesystem("abfs", **storage_options)
    dt = DeltaTable(file, version=version, storage_options=storage_options)
    files = dt.file_uris()

    pq_data = pa.dataset.dataset(
        source=files,
        schema=dt.schema().to_pyarrow(),
        partitioning="hive",
        format=pa.dataset.ParquetFileFormat(
            default_fragment_scan_options=pa.dataset.ParquetFragmentScanOptions(pre_buffer=True),
        ),
        filesystem=fs,
    )
    return pl.scan_pyarrow_dataset(pq_data)
someaveragepunter commented 11 months ago

Apologies if I've misunderstood or you've already taken this into account but your code snippet above doesn't actually read the table, it returns a polars LazyFrame(). Adding the .collect() at the end will yield the true cost of the read. alternatively pl.from_arrow(dataset.to_table())yields the same result and rough performance

I however found that using pq.read_table(files, filesystem=fs.S3FileSystem()) is about 1.5x faster than your ds.dataset()snippet above for an EC2 instance reading 100Mb S3 file.

ion-elgreco commented 11 months ago

Apologies if I've misunderstood or you've already taken this into account but your code snippet above doesn't actually read the table, it returns a polars LazyFrame(). Adding the .collect() at the end will yield the true cost of the read. alternatively pl.from_arrow(dataset.to_table())yields the same result and rough performance

I however found that using pq.read_table(files, filesystem=fs.S3FileSystem()) is about 1.5x faster than your ds.dataset()snippet above for an EC2 instance reading 100Mb S3 file.

I know, I have another function that does read_delta_fast which does collect on read_delta_fast_lazy. This way I can do lazy and eager execution on delta table with polars by working on either of the two functions outputs.

Are you using the exact snippet? With prebuffer = True?

someaveragepunter commented 11 months ago

I am indeed, especially with

format=pa.dataset.ParquetFileFormat(
            default_fragment_scan_options=pa.dataset.ParquetFragmentScanOptions(pre_buffer=True),
        ),

if you've not already, worth testing out yours with pq.read_table() to see how it performs. it'd be interesting is Azure has different performance characteristics from AWS blob store

ion-elgreco commented 11 months ago

I am indeed, especially with

format=pa.dataset.ParquetFileFormat(
            default_fragment_scan_options=pa.dataset.ParquetFragmentScanOptions(pre_buffer=True),
        ),

if you've not already, worth testing out yours with pq.read_table() to see how it performs. it'd be interesting is Azure has different performance characteristics from AWS blob store

I'll try it out on Monday at work! Before I used parquet.dataset but and after changing tods.dataset I noticed some performance regression, where later out I found that pre_buffer = False with ds.dataset but parquet.dataset has it set to True.

ion-elgreco commented 11 months ago

@someaveragepunter I tried it, in my case pq.read_table is the same speed.

MrPowers commented 10 months ago

So PyArrow Datasets are fast and PyArrow tables are slow.

det.to_pyarrow_table will be slow and det.to_pyarrow_dataset will be fast.

See this notebook for an example I ran locally for this with duckdb.

Can you try your benchmarks with Delta Lake + a PyArrow dataset?

ion-elgreco commented 9 months ago

@someaveragepunter are you still having this issue?

ion-elgreco commented 2 months ago

Closing this one as we have enabled prebuffer in pyarrow datasets