lancedb / lance

Modern columnar data format for ML and LLMs implemented in Rust. Convert from parquet in 2 lines of code for 100x faster random access, vector index, and data versioning. Compatible with Pandas, DuckDB, Polars, Pyarrow, with more integrations coming..
https://lancedb.github.io/lance/
Apache License 2.0
3.84k stars 212 forks source link

Differing behavior of `dataset.to_batches` depending on if `batch_size` is set. #2699

Open keunhong opened 2 months ago

keunhong commented 2 months ago

Hello!

We are experiencing differing behavior of to_batches depending on whether we set the batch_size or not.

When using a filter and with_row_id=True, we get the correct behavior if we set the batch size:

>> rows = next(iter(db.to_batches(
    columns=["column_name"],
    filter="column_name IS NOT NULL",
    with_row_id=True,
    batch_size=1024,
)))
>> rows["_rowid"]
<pyarrow.lib.UInt64Array object at 0x7a9950190340>
[
  2,
  4,
  5,
  6,
  7,
  9,
  11,
  12,
  13,
  14,
  ...
  1000,
  1001,
  1003,
  1009,
  1011,
  1015,
  1016,
  1017,
  1020,
  1021
]

But if we don't set the batch size then it ignores the filter and just returns all the rows:

>> rows = next(iter(db.to_batches(
    columns=["column_name"],
    filter="column_name IS NOT NULL",
    with_row_id=True,
    batch_size=None,
)))
>> rows["_rowid"]
<pyarrow.lib.UInt64Array object at 0x7a99502cb640>
[
  0,
  1,
  2,
  3,
  4,
  5,
  6,
  7,
  8,
  9,
  ...
  1014,
  1015,
  1016,
  1017,
  1018,
  1019,
  1020,
  1021,
  1022,
  1023
]

We are using pylance==0.16.0. The dataset is using the V1 file format.

wjones127 commented 1 month ago

What data type is column_name? You say you are using the V1 file format, which doesn't support nulls for most data types, but are filtering on IS NOT NULL.

I've attempted to repro here with a string column (where we do support nulls), but I seem to be getting correct answers here:

import lance
import pyarrow as pa

data = pa.table({
    'id': ['a', 'b', None, 'c', 'd', None, 'e']
})
ds = lance.write_dataset(data, 'test', mode='overwrite')

scan_batch_size = ds.scanner(
    columns=['id'],
    filter="id IS NOT NULL",
    with_row_id=False,
    batch_size=3,
)
print(scan_batch_size.explain_plan())
scan_batch_size.to_table()
ProjectionExec: expr=[id@0 as id]
  FilterExec: id@0 IS NOT NULL
    LanceScan: uri=Users/willjones/Documents/notebooks/test/data, projection=[id], row_id=true, row_addr=false, ordered=true

pyarrow.Table
id: string
----
id: [["a","b"],["c","d"],["e"]]
scan_no_batch_size = ds.scanner(
    columns=['id'],
    filter="id IS NOT NULL",
    with_row_id=False,
    batch_size=None,
)
print(scan_no_batch_size.explain_plan())
scan_no_batch_size.to_table()
LancePushdownScan: uri=Users/willjones/Documents/notebooks/test/data, projection=[id], predicate=id IS NOT NULL, row_id=false, row_addr=false, ordered=true

pyarrow.Table
id: string
----
id: [["a","b","c","d","e"]]
scan_no_row_id = ds.scanner(
    columns=['id'],
    filter="id IS NOT NULL",
    with_row_id=False,
    batch_size=None,
)
print(scan_no_row_id.explain_plan())
scan_no_row_id.to_table()
LancePushdownScan: uri=Users/willjones/Documents/notebooks/test/data, projection=[id], predicate=id IS NOT NULL, row_id=false, row_addr=false, ordered=true

pyarrow.Table
id: string
----
id: [["a","b","c","d","e"]]
keunhong commented 1 month ago

It is a byte column so I suppose the fact that it works is just a coincidence then.