delta-io / delta

An open-source storage framework that enables building a Lakehouse architecture with compute engines including Spark, PrestoDB, Flink, Trino, and Hive and APIs
https://delta.io
Apache License 2.0
7.58k stars 1.7k forks source link

[Feature Request][Spark] Pushdown "order by" with "limit" operation by using Delta metadata #2421

Open rishitesh-snt opened 10 months ago

rishitesh-snt commented 10 months ago

Feature request

Which Delta project/connector is this regarding?

Overview

A very typical use case while doing exploratory analysis is to check latest records with some limit, mostly to understand data pattern and behaviour. e.g.

select * from table order by timestamp desc limit 10

In normal scenario Spark would read all the files to get to top 10 records. However, if timestamp column creates mostly disjointed sets for each file we can just read the min/max & number of record to determine the top 10 records.

In the case of non disjoint sets also, we can improve the performance by reading a subset of files up to the number specified in the limit. In the above example it would be 10 files.

Motivation

Sorting the whole table can take number of minutes for 500GB + tables. Reading the metadata would give this information in seconds.

Further details

An example on disjoint sets

Query : select * from table order by timestamp desc limit 10

File Number Of Records Timestamp Min Timestamp Max
file 1 100 0 100
file 2 100 101 200
file 3 100 201 300

With the query being select * from table order by timestamp desc limit 10 right now we need to read all the files. However, if we can make use of the metadata, we only need to read file number 3.

An example on non disjoint sets Query : select * from table order by timestamp asc limit 10

File Number Of Records Timestamp Min Timestamp Max
file 1 100 0 100
file 2 100 50 150
file 3 100 70 90
file 4 100 40 170

While working with non disjoint sets of file we can follow the below algorithm,

  1. Sort all Min/Max and keep it in a linear form ( 0, 40, 50, 70, 90, 100, 150, 170)
  2. Each of them are associated with their corresponding files e.g. 0 -> File1, 40 -> File4 and so on.
  3. Read the number of files based on the limit value and not by the number of records in the file. E.g. for limit 2 read File 1 & File 4 traversing the list created in step 1.
  4. Why only two files ? If a number is less than 40 it has to be in file 1. If there is no such number, 40 will act as that number.
  5. Same thing can be done for finding max 2 records.

The same principle can be applied even after a partition filter is applied.

Limitation : It would be applicable only in the case of a single order by clause.

Even though it's applicable to very limited set of queries, the frequency of such queries are very high.

Willingness to contribute

The Delta Lake Community encourages new feature contributions. Would you or another member of your organization be willing to contribute an implementation of this feature?

zzl-7 commented 10 months ago

Someone please correct me if I am mistaken, but looks like there is a plan to do this via this issue https://github.com/delta-io/delta/issues/2229

rishitesh-snt commented 10 months ago

@zzl-7 This issue was specifically for reading from Spark, inline with the PR https://github.com/delta-io/delta/pull/1525. It is just one of the data skipping mechanism and can be included in the kernel as well, so that connector can benefit from it. IMO https://github.com/delta-io/delta/issues/2229 deals with the overall design of data skipping mechanism for kernel. Happy to get more feedback on this so that it can proceed in the right direction.