lancedb / lance

Modern columnar data format for ML and LLMs implemented in Rust. Convert from parquet in 2 lines of code for 100x faster random access, vector index, and data versioning. Compatible with Pandas, DuckDB, Polars, Pyarrow, with more integrations coming..
https://lancedb.github.io/lance/
Apache License 2.0
3.55k stars 184 forks source link

perf: Chunk read performance on wide tables #2367

Open rok opened 1 month ago

rok commented 1 month ago

In light of recent Parquet discussion my colleague decided to benchmark random chunk ("row group") reading performance on Lance in comparison to current Parquet implementation. We expected Parquet to do worse due to inefficiencies at wide schema but we see a constant multiple (parquet ~25x faster) in performance no matter the parameters. It might be the benchmark is not using the most efficient code path in Lance. Any suggestions on how to optimize chunk reads in lance to take advantage of faster metadata deserialization?

import time
import numpy as np

import lance
import lance.fragment
import pyarrow as pa
import pyarrow.parquet as pq

row_groups = 50
columns = 2000
chunk_size = 1000
rows = row_groups * chunk_size
work_items = 1

parquet_path = "/tmp/my.parquet"
lancedbase_path = "/tmp/"

def get_table():
    data = np.random.rand(rows, columns)
    pa_arrays = [pa.array(data[:, i]) for i in range(columns)]
    column_names = [f'column_{i}' for i in range(columns)]
    return pa.Table.from_arrays(pa_arrays, names=column_names)

def worker_arrow():
   for rg in range(0, row_groups):
        pr = pq.ParquetReader()
        pr.open(parquet_path)        
        data = pr.read_row_groups([rg], use_threads=False)
        data = data

def worker_lance():

    for rg in range(0, row_groups):
        dataset = lance.dataset(lancedbase_path)
        data = dataset.to_table(offset=rg * chunk_size, limit=chunk_size)
        data = data

def genrate_data(table, store_schema):

   t = time.time()
   print(f"writing parquet file, columns={columns}, row_groups={row_groups}, rows={rows}")
   pq.write_table(table, parquet_path, row_group_size=chunk_size, use_dictionary=False, write_statistics=False, store_schema=store_schema, compression=None)
   dt = time.time() - t
   print(f"finished writing parquet file in {dt:.2f} seconds")

   lance.write_dataset(table, lancedbase_path, mode = "overwrite")

def measure_reading(worker):

    t = time.time()
    for i in range(0, work_items):
        worker()

    return time.time() - t

table = get_table()
genrate_data(table, False)

print(f"Reading all row groups using arrow (single-threaded) {measure_reading(worker_arrow):.3f} seconds")
print(f"Reading all row groups using lance (single-threaded) {measure_reading(worker_lance):.3f} seconds")
# /bin/python3 /workspace/benchmarks/benchmark_lancedb.py
writing parquet file, columns=2000, row_groups=50, rows=50000
finished writing parquet file in 1.85 seconds
Reading all row groups using arrow (single-threaded) 2.810 seconds
Reading all row groups using lance (single-threaded) 71.201 seconds

# /bin/python3 /workspace/benchmarks/benchmark_lancedb.py
writing parquet file, columns=1000, row_groups=50, rows=50000
finished writing parquet file in 1.51 seconds
Reading all row groups using arrow (single-threaded) 1.332 seconds
Reading all row groups using lance (single-threaded) 35.571 seconds

# /bin/python3 /workspace/benchmarks/benchmark_lancedb.py
writing parquet file, columns=1000, row_groups=100, rows=100000
finished writing parquet file in 6.67 seconds
Reading all row groups using arrow (single-threaded) 4.512 seconds
Reading all row groups using lance (single-threaded) 119.911 seconds
rok commented 1 month ago

cc @westonpace

westonpace commented 1 month ago

Thanks for playing around with lance! Your experiment is currently using lance v1. Were you intending to test the new v2 format? It's still early days in the v2 format and we haven't yet fully connected it to lance datasets.

If you're going to compare formats then the most direct comparison would probably be to use LanceFileReader and LanceFileWriter. The LanceFileReader has a get_range method which is perfect for this test.

If you want to compare parquet files against lance datasets (testing both the file format and the table format) then you will have to do a few things to get the comparison closer. First, use the use_experimental_writer flag which toggles our beta support for lance v2 in the table format. Next, it seems that we aren't actually pushing down limit / offset in our scanner today. So each of your queries was loading all of the data and then discarding the parts it didn't want. The closest thing we can do is probably to do a take with the desired indices. Lance's scanner will recognize the indices are sorted and contiguous and translate the request into a get_range call under the hood.

import time
import numpy as np

import lance
import lance.fragment
import pyarrow as pa
import pyarrow.parquet as pq

from lance.file import LanceFileReader, LanceFileWriter

row_groups = 50
columns = 2000
chunk_size = 1000
rows = row_groups * chunk_size
work_items = 1

parquet_path = "/tmp/my.parquet"
lancefile_path = "/tmp/my.lance"
lancedb_path = "/tmp/lancedb"

def get_table():
    data = np.random.rand(rows, columns)
    pa_arrays = [pa.array(data[:, i]) for i in range(columns)]
    column_names = [f"column_{i}" for i in range(columns)]
    return pa.Table.from_arrays(pa_arrays, names=column_names)

def worker_arrow():
    for rg in range(0, row_groups):
        pr = pq.ParquetReader()
        pr.open(parquet_path)
        data = pr.read_row_groups([rg], use_threads=False)
        data = data

def worker_lance():
    for rg in range(0, row_groups):
        lr = LanceFileReader(lancefile_path)
        data = lr.read_range(rg * chunk_size, chunk_size).to_table()
        data = data

def worker_lance_dataset():
    for rg in range(0, row_groups):
        ds = lance.dataset(lancedb_path)
        rows = [idx for idx in range(rg * chunk_size, (rg + 1) * chunk_size)]
        data = ds.take(rows)
        data = data

def genrate_data(table, store_schema):
    t = time.time()
    print(
        f"writing parquet file, columns={columns}, row_groups={row_groups}, rows={rows}"
    )
    pq.write_table(
        table,
        parquet_path,
        row_group_size=chunk_size,
        use_dictionary=False,
        write_statistics=False,
        store_schema=store_schema,
        compression=None,
    )
    dt = time.time() - t
    print(f"finished writing parquet file in {dt:.2f} seconds")

    with LanceFileWriter(lancefile_path, table.schema) as writer:
        writer.write_batch(table)

    lance.write_dataset(
        table, lancedb_path, use_experimental_writer=True, mode="overwrite"
    )

def measure_reading(worker):
    t = time.time()
    for i in range(0, work_items):
        worker()

    return time.time() - t

table = get_table()
genrate_data(table, False)

print(
    f"Reading all row groups using arrow (single-threaded) {measure_reading(worker_arrow):.3f} seconds"
)
print(
    f"Reading all row groups using lance (multi-threaded) {measure_reading(worker_lance):.3f} seconds"
)
print(
    f"Reading all row groups using lance dataset (multi-threaded) {measure_reading(worker_lance_dataset):.3f} seconds"
)
rok commented 1 month ago

Thanks @westonpace! We're indeed aiming to try v2 and your help is as always invaluable.

rok commented 1 month ago

Here are my results:

Change in columns
=================
writing parquet file, columns=1000, row_groups=50, rows=50000
finished writing parquet file in 1.80 seconds
Reading all row groups using arrow (single-threaded) 1.797 seconds
Reading all row groups using lance (multi-threaded) 0.408 seconds
Reading all row groups using lance dataset (multi-threaded) 0.746 seconds

writing parquet file, columns=2000, row_groups=50, rows=50000
finished writing parquet file in 1.80 seconds
Reading all row groups using arrow (single-threaded) 3.627 seconds
Reading all row groups using lance (multi-threaded) 0.988 seconds
Reading all row groups using lance dataset (multi-threaded) 1.648 seconds

writing parquet file, columns=3000, row_groups=50, rows=50000
finished writing parquet file in 4.98 seconds
Reading all row groups using arrow (single-threaded) 5.755 seconds
Reading all row groups using lance (multi-threaded) 1.312 seconds
Reading all row groups using lance dataset (multi-threaded) 2.872 seconds

Change in row_groups
====================
writing parquet file, columns=1000, row_groups=50, rows=50000
finished writing parquet file in 1.91 seconds
Reading all row groups using arrow (single-threaded) 1.808 seconds
Reading all row groups using lance (multi-threaded) 0.412 seconds
Reading all row groups using lance dataset (multi-threaded) 0.743 seconds

writing parquet file, columns=1000, row_groups=100, rows=100000
finished writing parquet file in 3.50 seconds
Reading all row groups using arrow (single-threaded) 5.421 seconds
Reading all row groups using lance (multi-threaded) 0.820 seconds
Reading all row groups using lance dataset (multi-threaded) 1.496 seconds

writing parquet file, columns=1000, row_groups=150, rows=150000
finished writing parquet file in 5.02 seconds
Reading all row groups using arrow (single-threaded) 11.432 seconds
Reading all row groups using lance (multi-threaded) 1.255 seconds
Reading all row groups using lance dataset (multi-threaded) 2.303 seconds
westonpace commented 1 month ago

That matches what I was seeing as well. I think, in the extremely wide schema case (tens of thousands of columns) we will still probably want specialized encodings (e.g. something that pivots the data) but it is nice to see that Lance is handling that reasonably well today.

rok commented 1 month ago

Great to see lance's design pay off!

Why would you want to pivot? To avoid column handling overhead? What would be an example of such an encoding?

westonpace commented 1 month ago

The case I had in my head was if you have many sparse columns. If you use a sparse encoding I think you end up with a lot of very tiny pages (because there is not enough data in any single column).

That being said, if your use case is to read a few columns at a time, this is probably fine. If your use case was reading in the entire table (into some kind of sparse feature matrix) then you would want a specialized encoding. I'm not sure what it would look like. If we had a variant type then you could do map<string,variant> (which I guess is not really a specialized encoding).

Also, if this is important to you, and your use case is to read a few columns at a time, then there is another optimization / TODO here that you might be interested in. It would cut down on the time required to open a reader.

rok commented 1 month ago

The case I had in my head was if you have many sparse columns. If you use a sparse encoding I think you end up with a lot of very tiny pages (because there is not enough data in any single column).

That being said, if your use case is to read a few columns at a time, this is probably fine. If your use case was reading in the entire table (into some kind of sparse feature matrix) then you would want a specialized encoding. I'm not sure what it would look like. If we had a variant type then you could do map<string,variant> (which I guess is not really a specialized encoding).

Interesting point! At the moment my use case is getting a few dense columns though.

Also, if this is important to you, and your use case is to read a few columns at a time, then there is another optimization / TODO here that you might be interested in. It would cut down on the time required to open a reader.

That's a nice optimization too yeah. Opening the reader currently doesn't dominate our cost but I imagine it'd be good for Lance to optimize this!