lancedb / lance

Modern columnar data format for ML and LLMs implemented in Rust. Convert from parquet in 2 lines of code for 100x faster random access, vector index, and data versioning. Compatible with Pandas, DuckDB, Polars, Pyarrow, with more integrations coming..
https://lancedb.github.io/lance/
Apache License 2.0
3.64k stars 193 forks source link

Add `n_jobs` parameter to `lance.write_dataset` to speed up writing large in-memory tables #1980

Open mkleinbort-ic opened 5 months ago

mkleinbort-ic commented 5 months ago

I was just testing the lance dataset writer, and to my surprise there is a lot of headroom when using multi-processing.

This is what I did:

I have many identical tables (polars dataframes), and I'm writing them to Azure Blob Storage.

I repeated each 3 times and these are the median values

Writing 1 table: 3 minutes (3min/table) Writing 2 tables, each in its own thread (2 threads): 3 minutes (1.5min/table) Writing 4 tables, each in its own thread (4 threads): 4.5 minutes (1.125 min/table) Writing 12 tables, each in its own thread (12 threads, and maxed out RAM and other system resources): 15 minutes (1.25min/table)

Seems to me the writer is not using all the cores/network that are available - I see that when writing a single table I am getting < 10% CPU usage, and only 20% network bandwidth usage.

Btw, this is what I used to write the tables in parallel:

from joblib import Parallel, delayed, parallel_backend

def exec_in_parallel(callables:list[(Callable, 'args', 'kwargs')], n_jobs=1):
    '''Use joblib to execute the callables via c() for c in callables'''

    with parallel_backend('threading', n_jobs=n_jobs):
        ans = Parallel(verbose=False)(delayed(c)(*args, **kwargs) for c, args, kwargs in callables)

    return ans

def test_punish_write_lance(dfs, n_files, n_cores):

    files = [f'tf{i}.lance' for i in range(n_files)]

    exec_in_parallel([
            (lance.write_dataset, [], {'uri':f'az://lance/{f}', 'data_obj':df}) for df, f in zip(dfs,files)
        ], n_jobs=n_cores)

At least in my set up I will benefit from writing my tables in parallel, but it'd be much better to have the writer write each table at maximum speed - that's usually what most people want.

I'll repeat my testing when v0.2 comes out.

mkleinbort-ic commented 5 months ago

EDIT:

In case somebody asks - it's a big table (5m+ rows / 200+ columns).

df.estimated_size('gb') 
>>> 17
wjones127 commented 5 months ago

This isn't surprising. The writer just writes one file as a time, and treats the input data as a stream. It doesn't assume that it's all in-memory, and assumes it wouldn't be acceptable to buffer too much data in memory.

If you want to write in parallel to the same table, you could:

  1. Call write_fragments() on separate slices of your table (docs)
  2. Commit LanceOperation.Append with the resulting fragments using LanceDataset.commit() (example)

Are you starting from an in-memory dataframe? I wonder if we could automatically detect if you have a larger in-memory df and dispatch to a parallel write if so.

mkleinbort-ic commented 5 months ago

Yes, this is one big table in memory

mkleinbort-ic commented 5 months ago

I think a n_jobs or some other parameter to control resource utilization would be nice in the lance.write_dataset API

wjones127 commented 5 months ago

Yeah we could add an n_jobs parameter to write_dataset. 👍

mkleinbort-ic commented 4 months ago

Any update to this? I have a workflow where I have large parquet files that I "copy" to lance, but this takes 15min+ per parquet file.

~1min loading the parquet file ~14 min writing the lance file (admittedly, with scalar indexing on two columns)

wjones127 commented 4 months ago

Hi @mkleinbort-ic. I haven't had time to work on this.

Here is a quick script to do parallel writes with a polars dataframe, with the approach suggested above. Do you want to try that out and see if it makes your writes substantially faster?

If you are interested, I would welcome a PR to add this parameter.

from threading import Thread

import lance
from lance.fragment import write_fragments
import polars as pl

def ceildiv(a, b):
    return -(a // -b)

def write_lance_parallel(data: pl.DataFrame, uri, n_jobs, **kwargs):
    # split data between parts
    part_size = ceildiv(len(data), n_jobs)
    data_parts = []
    for i in range(n_jobs):
        start = i * part_size
        end = min((i + 1) * part_size, len(data))
        data_parts.append(data[start:end])

    fragments = []
    def task(uri, data, **kwargs):
        new_frags = write_fragments(data.to_arrow(), uri, **kwargs)
        fragments.extend(new_frags)

    # Write each part in a different thread
    threads = []
    for i in range(n_jobs):
        t = Thread(target=task, args=(uri, data_parts[i]), kwargs=kwargs)
        threads.append(t)
        t.start()
    for t in threads:
        t.join()

    # Commit the dataset
    try:
        dataset = lance.dataset(uri)
        operation = lance.LanceOperation.Append(fragments, read_version=dataset.version)
    except:
        dataset = None
        operation = lance.LanceOperation.Overwrite(data.to_arrow().schema, fragments)

    dataset = lance.LanceDataset.commit(uri, operation)
    return dataset

# Test
df = pl.DataFrame({
    "a": [1, 2, 3, 4, 5],
    "b": [6, 7, 8, 9, 10]
})

write_lance_parallel(df, "./test", 2)

ds = lance.dataset("./test")
print(ds.to_table())