lancedb / lance

Modern columnar data format for ML and LLMs implemented in Rust. Convert from parquet in 2 lines of code for 100x faster random access, vector index, and data versioning. Compatible with Pandas, DuckDB, Polars, Pyarrow, and PyTorch with more integrations coming..
https://lancedb.github.io/lance/
Apache License 2.0
3.91k stars 215 forks source link

Add flag in scan to turn off late materialization #1311

Open wjones127 opened 1 year ago

wjones127 commented 1 year ago

When scanning with a filter, we perform late materialization: We read only the filter columns, perform the filter, and then only load the other columns that satisfied the predicate. In the query plan this looks like: Scan(filter_columns) -> Filter() -> Take(remaining_columns).

Late materialization improves query performance when (1) selecting a large blob column and (2) are filtering out > 50% of rows. However in most other circumstances it can perform substantially worse (like 2x slower). In the future, we may be able to do automatic query optimization to turn this off automatically. For example, there are some cases where we can tell just from fragment-level statistics that > 50% of rows will fulfill a predicate.

However, there will always be some cases where we aren't sure. So we'll likely want a flag users can pass to turn this off.

wjones127 commented 1 year ago

cc @eddyxu What do you think of this?

eddyxu commented 1 year ago

Could you ELB what exactly does late materialization mean in this issue? Is it materialize the intermediate query results or something else?

Also why / how it works for selecting large blobs & much selective query (filter out > 50%)?

wjones127 commented 1 year ago

Also why / how it works for selecting large blobs & much selective query (filter out > 50%)?

This is from this benchmark: https://github.com/lancedb/lance/pull/1252#issuecomment-1714670847

--------------------------------------------------------------------------------------------------------------- benchmark 'scan_table': 16 tests --------------------------------------------------------------------------------------------------------------
Name (time in us)                                                 Min                       Max                      Mean                 StdDev                    Median                    IQR            Outliers         OPS            Rounds  Iterations
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
test_scan_table_filter_full[0.1] (NOW)                    19,836.9290 (59.15)       41,356.6760 (77.23)       26,247.9567 (69.95)     6,273.6104 (426.88)      24,066.9635 (65.01)     9,841.6900 (682.70)        9;0     38.0982 (0.01)         34           1
test_scan_table_full (NOW)                               142,711.7340 (425.51)     159,066.2180 (297.05)     147,327.2381 (392.62)    5,394.2075 (367.04)     145,630.2340 (393.35)    2,311.9917 (160.38)        1;1      6.7876 (0.00)          7           1
test_scan_table_filter_full[0.5] (NOW)                   252,892.9610 (754.02)     266,721.9670 (498.09)     263,610.9420 (702.50)    5,999.6729 (408.24)     266,013.7550 (718.52)    3,817.9468 (264.85)        1;1      3.7935 (0.00)          5           1
test_scan_table_filter_full[0.9] (NOW)                   467,037.5490 (>1000.0)    472,540.7690 (882.44)     469,588.1778 (>1000.0)   2,271.8836 (154.59)     468,838.2200 (>1000.0)   3,699.4333 (256.62)        2;0      2.1295 (0.00)          5           1
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

test_scan_table_full is loading all columns with no filter. test_scan_table_filter_full[X] is loading all columns with a filter that keeps only X proportion of rows (0.10 means keep 10% of row and filter out 90%).

Basically, Take is more expensive than just scanning, so the saving on IO from late materialization needs to be really worth it to make it faster.

wjones127 commented 1 year ago

Could you ELB what exactly does late materialization mean in this issue? Is it materialize the intermediate query results or something else?

Added an explanation in the topic. This is same terminology as redshift and impala, for example.

eddyxu commented 1 year ago

So is it a heuristic flag to act like a cost-based dynamic planner? The target users might not be as sophisticated as data infra users.

Assuming without an index and filter pushdown, all filters will be done via scan?

Can we do some simple like:

let selected = filter.execute(input);
if count_trues(selected) < 30% || remain_schema.contains_large_column() {
    Take(select(input, selected))
} else {
    Filter(Scan(input), selected))
}
wjones127 commented 1 year ago

The target users might not be as sophisticated as data infra users.

If it's just a flag, they can try their query with True and False and choose the faster one. So I don't think it requires much sophistication to tune, unlike a lot of other parameters.

Assuming without an index and filter pushdown, all filters will be done via scan?

Yes

let selected = filter.execute(input);
if count_trues(selected) < 30% || remain_schema.contains_large_column() {
    Take(select(input, selected))
} else {
    Filter(Scan(input), selected))
}

Oh hmm that's an interesting idea. Worth a shot at coding that up. Though it does make me wonder if we can get this to run fast why can't we get Take more optimized 🤔

westonpace commented 2 months ago

For cloud storage the limit should be much lower than 30%, probably around 1% or lower, if all columns are scalar.