Modern columnar data format for ML and LLMs implemented in Rust. Convert from parquet in 2 lines of code for 100x faster random access, vector index, and data versioning. Compatible with Pandas, DuckDB, Polars, Pyarrow, and PyTorch with more integrations coming..
We've now aligned the interfaces for Fragment.merge_columns and LanceDataset.add_columns. However, the process of using these APIs is fairly complex and, with features like balanced storage, may be getting even more complex.
I would like to prototype a "task" API similar to what we have for compaction. The basic usage would work like this:
## On head node
add_column_task = dataset.start_add_column(new_col_name, new_col_type)
results = []
## On workers
# can pickle and send add_column_task across network
results.append(add_column_task.add_ordinal_data(some_new_data, row_start))
# If you aren't adding a new value for every row you can also do
results.append(add_column_task.add_id_data(some_new_data, row_ids))
## On head node
commit_plan = add_column_task.plan_commit(results)
commit_results = []
## On workers
# can pickle commit tasks
for commit_task in commit_plan["tasks"]:
commit_results.append(commit_task.execute())
## On head node
commit_plan.finish_commit(commit_results)
The workflow is as follows:
The first thing that happens is users call start_add_column. This sets up the job, and returns a pickleable task object. It does not make any changes to the dataset. This task can be sent to remote workers if users need to distribute the task generation.
Next, the user (perhaps on worker nodes) calls add_ordinal_data or add_id_data on the task.
The first method (ordinal) assumes the user is generating values for every row in the dataset. Here, row_start is assumed to be an offset into the dataset (e.g. 10 means the 10th row in the dataset). For example, to add rows to a 1M row dataset, the user can send the task to 1000 workers and each worker can generate 1000 rows.
The second method (id) assumes that the generated data contains a _rowid column and it will only work on datasets that have stable row ids.
Both of the above methods, when run, will trigger write_fragments_internal to create new fragments on the dataset. These fragments will not be part of any version (yet). The API will be similar to write_dataset. The methods will return a small pickleable result object which will contain the UUID of the fragment (and the id set / ordinal range). The user should return these to the head node if distributing the work.
The next step is to run plan_commit on the task. This creates a commit task based on the set of results. The commit task has a series of sub-tasks (this is exactly the same as plan_compaction).
Lance will need to look at the various IDs / ranges and figure out what compaction steps are needed.
Next the user will send out the various tasks in the commit plan and run them, collecting the results. Each one of these tasks will compact the files written by the add_..._data steps into fragment sized files, inserting nulls as needed, and return the UUID of the file.
Finally, the user will run finish_commit to perform the actual commit.
Compared to add_columns / merge_columns this has a few advantages:
More friendly API, similar to that of plan_compaction
Writing fragments with balanced storage is actually quite complex. The user needs to create at least two different fragments, using different compaction thresholds. This complexity can all be hidden in the tasks.
The add_id_data method allows users to add new columns without specifying a value for every single row and takes care of the burden of inserting nulls.
We can follow this up with a similar API for bulk ingestion.
We've now aligned the interfaces for
Fragment.merge_columns
andLanceDataset.add_columns
. However, the process of using these APIs is fairly complex and, with features like balanced storage, may be getting even more complex.I would like to prototype a "task" API similar to what we have for compaction. The basic usage would work like this:
The workflow is as follows:
start_add_column
. This sets up the job, and returns a pickleable task object. It does not make any changes to the dataset. This task can be sent to remote workers if users need to distribute the task generation.add_ordinal_data
oradd_id_data
on the task.ordinal
) assumes the user is generating values for every row in the dataset. Here,row_start
is assumed to be an offset into the dataset (e.g. 10 means the 10th row in the dataset). For example, to add rows to a 1M row dataset, the user can send the task to 1000 workers and each worker can generate 1000 rows.id
) assumes that the generated data contains a_rowid
column and it will only work on datasets that have stable row ids.write_fragments_internal
to create new fragments on the dataset. These fragments will not be part of any version (yet). The API will be similar towrite_dataset
. The methods will return a small pickleable result object which will contain the UUID of the fragment (and the id set / ordinal range). The user should return these to the head node if distributing the work.plan_commit
on the task. This creates a commit task based on the set of results. The commit task has a series of sub-tasks (this is exactly the same as plan_compaction).add_..._data
steps into fragment sized files, inserting nulls as needed, and return the UUID of the file.Compared to
add_columns
/merge_columns
this has a few advantages:plan_compaction
add_id_data
method allows users to add new columns without specifying a value for every single row and takes care of the burden of inserting nulls.