duckdb / duckdb_delta

DuckDB extension for Delta Lake
MIT License
123 stars 14 forks source link

Delta+Azure is slow #104

Open nfoerster opened 5 days ago

nfoerster commented 5 days ago

We are doing some tests as now azure and delta plugin work together, however we have heavy problems to write performant queries on our deltalake test.

The deltalake has around 70 columns and 1,5 billion rows, it is partitioned by 2 layers, the first on serialnumber has around 270 partitions and the second layer around 10-20 based on year-month. All files are parquets, we have only one deltalake version, the data is compacted and vacuumed, the metadata history is almost clean.

We are performing the queries with duckdb 1.1.1 from azure vm inside same vnet as the blob store is.

This is the setup:

LOAD azure;
LOAD delta;

set azure_transport_option_type = 'curl'; -- this is needed as we otherwise get SSL execptions, stated also in the azure plugin section on duckdb docs

-- two partition selection 
SELECT uuid FROM delta_scan('abfss://q2deltalake/delta/k8s/partition_sn_yyymm_i5m_v15-3') WHERE SN='XXXX0005' and yyyymm='202304';

-- 717931 rows in 2m30s

-- one partition selection
SELECT uuid FROM delta_scan('abfss://q2deltalake/delta/k8s/partition_sn_yyymm_i5m_v15-3') WHERE SN='XXXX0005';

-- 5493265 rows in 2m35s

As you can see there is not much of a difference between one or two partition clauses although its far less data. I think it scans the whole deltatable instead of pushing down the filters to partitions.

Did you ever had similar observations? Any hints would be nice.

nfoerster commented 5 days ago

This is something else we tried, the query time is around 4-5s. It has a range query so takes much more data in consideration.

import fsspec
from azure.identity.aio import DefaultAzureCredential
import pyarrow.parquet as pq
import time

credentials = DefaultAzureCredential()
fs = fsspec.filesystem("abfs", credential=credentials, account_name="saweud")
filters = []
filter_ = [("SN", "==", "XXXX2987"), ("yyyymm", ">=", 202107), ("yyyymm", "<=", 202204)]
filters.append(filter_)
start = time.time()
# filters.append(('billing_date', '<=', de.strftime('%Y-%m-%d')))
dt = pq.read_table(
    "q2deltalake/delta/k8s/partition_sn_yyymm_i5m_v15-3",
    partitioning="hive",
    filters=filters,
    filesystem=fs,
    columns=[
        "SN",
        "Date",
        "OperatingMode",
        "OP"
    ],
).to_pandas()
print(dt)
print(time.time() - start)