dask-contrib / dask-deltatable

A Delta Lake reader for Dask
BSD 3-Clause "New" or "Revised" License
44 stars 14 forks source link

Order data by partitions if available #70

Open mrocklin opened 6 months ago

mrocklin commented 6 months ago

I've stored a bunch of data partitioned by date, and written it to delta using the deltalake package like so:

for df in dfs:
    write_deltalake("mytable", df, partition_by="date")

(although actually this was done in parallel, and so things maybe got written out of order

├── date=2024-01-01
│   ├── part-00001-5c3d1646-6a8b-4511-87f5-3cd0acf1c0e8-c000.zstd.parquet
│   ├── part-00001-869909e1-1079-49db-83d6-77a43d67370a-c000.zstd.parquet
│   └── part-00001-eeb823e8-b9ed-49bd-86d9-9f28f0f444b7-c000.zstd.parquet
└── date=2024-01-02
    ├── part-00001-32e6c973-6d2a-4132-82dd-e6b431cf5343-c000.zstd.parquet
    ├── part-00001-426c2af2-6e86-4cfe-86c9-853f243c35e6-c000.zstd.parquet
    ├── part-00001-6fa10ba1-6b14-4908-b328-f8a8fdaec258-c000.zstd.parquet
    └── part-00001-79cd49d4-dee1-4957-8cdf-86b6f86f95f6-c000.zstd.parquet

When I go to read it I find that the data isn't sorted by partition

df = ddt.read_deltalake("mytable")
df.date.head()
0    2024-01-02
1    2024-01-02
2    2024-01-02
3    2024-01-02
4    2024-01-02
Name: date, dtype: date32[day][pyarrow]
df.date.tail()
1392523    2024-01-01
1392524    2024-01-01
1392525    2024-01-01
1392526    2024-01-01
1392527    2024-01-01
Name: date, dtype: date32[day][pyarrow]

We should order things if we can I think. I propose the following:

  1. If partitions are available, order by partitions
  2. Maybe within that we can look at partition statistics? These are stored within the "stats" attribute of the deltalake metadata
  3. We could also think about setting an index with the partition value.

Probably both the effort and uncertainty increase as we go down that list. The first item seems pretty straightforward to me though.

mrocklin commented 6 months ago

I'd also like to sort partitions by max value of the partition column, but couldn't find an easy way to get statistics out of the metadata

mrocklin commented 6 months ago

The min/max values are available through the deltalake.DeltaTable.get_add_actions API mentioned in this issue: https://github.com/delta-io/delta-rs/issues/2233#issuecomment-1971991963

mrocklin commented 6 months ago

I'm going to reopen this for now. I think that we could probably do better by looking at the data coming out of the get_add_actions function.