Eventual-Inc / Daft

Distributed DataFrame for Python designed for the cloud, powered by Rust
https://getdaft.io
Apache License 2.0
2.15k stars 144 forks source link

add an option to export pyarrow record batch #2679

Closed djouallah closed 3 weeks ago

djouallah commented 1 month ago

it seems daft support only exporting arrow table, which materialized the data into memory, I have a scenario where I run daft in an environment with limited RAM and adding support for record batch will be useful as the data is streamed in a smaller batch

thanks

universalmind303 commented 1 month ago

@djouallah you should be able to iterate over the partitions to get a similar effect

df = daft.read_parquet('/path/to/files')

for partition in df.iter_partitions():
    tbl = partition.to_arrow()
    # do something with tbl

But I agree, a native to_arrow_iter may be more intuitive

djouallah commented 1 month ago

please don't close a bug report, unless it is confirmed that it fixes the issue

does not work for my use case

df = df.with_column('DATE', col('SETTLEMENTDATE').cast(DataType.date()))
df = df.with_column('year', col('SETTLEMENTDATE').dt.year())
write_deltalake(f"/lakehouse/default/Tables/T10/daft",df.to_arrow_iter(),schema= df.schema().to_pyarrow_schema(), mode="append", partition_by=['year'], storage_options= storage_options)

getting this error

File ~/jupyter-env/python3.10/lib/python3.10/site-packages/pyarrow/dataset.py:1030, in write_dataset(data, base_dir, basename_template, format, partitioning, partitioning_flavor, schema, filesystem, file_options, use_threads, max_partitions, max_open_files, max_rows_per_file, min_rows_per_group, max_rows_per_group, file_visitor, existing_data_behavior, create_dir)
   1027         raise ValueError("Cannot specify a schema when writing a Scanner")
   1028     scanner = data
-> 1030 _filesystemdataset_write(
   1031     scanner, base_dir, basename_template, filesystem, partitioning,
   1032     file_options, max_partitions, file_visitor, existing_data_behavior,
   1033     max_open_files, max_rows_per_file,
   1034     min_rows_per_group, max_rows_per_group, create_dir
   1035 )

File ~/jupyter-env/python3.10/lib/python3.10/site-packages/pyarrow/_dataset.pyx:4010, in pyarrow._dataset._filesystemdataset_write()

File ~/jupyter-env/python3.10/lib/python3.10/site-packages/pyarrow/error.pxi:92, in pyarrow.lib.check_status()

ArrowTypeError: Could not unwrap RecordBatch from Python object of type 'pyarrow.lib.Table'
djouallah commented 1 month ago

@jaychia I am doing a presentation in a two months and really would like to show a different story,

image
jaychia commented 1 month ago

please don't close a bug report, unless it is confirmed that it fixes the issue

does not work for my use case

df = df.with_column('DATE', col('SETTLEMENTDATE').cast(DataType.date()))
df = df.with_column('year', col('SETTLEMENTDATE').dt.year())
write_deltalake(f"/lakehouse/default/Tables/T10/daft",df.to_arrow_iter(),schema= df.schema().to_pyarrow_schema(), mode="append", partition_by=['year'], storage_options= storage_options)

getting this error

File ~/jupyter-env/python3.10/lib/python3.10/site-packages/pyarrow/dataset.py:1030, in write_dataset(data, base_dir, basename_template, format, partitioning, partitioning_flavor, schema, filesystem, file_options, use_threads, max_partitions, max_open_files, max_rows_per_file, min_rows_per_group, max_rows_per_group, file_visitor, existing_data_behavior, create_dir)
   1027         raise ValueError("Cannot specify a schema when writing a Scanner")
   1028     scanner = data
-> 1030 _filesystemdataset_write(
   1031     scanner, base_dir, basename_template, filesystem, partitioning,
   1032     file_options, max_partitions, file_visitor, existing_data_behavior,
   1033     max_open_files, max_rows_per_file,
   1034     min_rows_per_group, max_rows_per_group, create_dir
   1035 )

File ~/jupyter-env/python3.10/lib/python3.10/site-packages/pyarrow/_dataset.pyx:4010, in pyarrow._dataset._filesystemdataset_write()

File ~/jupyter-env/python3.10/lib/python3.10/site-packages/pyarrow/error.pxi:92, in pyarrow.lib.check_status()

ArrowTypeError: Could not unwrap RecordBatch from Python object of type 'pyarrow.lib.Table'

Looks like our .to_arrow_iter() API currently returns an Arrow Table rather than a RecordBatch.

@djouallah here's a way you can convert the Tables into a RecordBatches instead for a quick workaround. I'll look into a way to make this native to Daft tomorrow.

import daft
from daft import DataType, col

df = daft.from_pydict({"SETTLEMENTDATE": ["2021-01-01", "2021-01-01", "2021-01-01"]})
df = df.with_column('DATE', col('SETTLEMENTDATE').cast(DataType.date()))
df = df.with_column('year', col('DATE').dt.year())

# Convert the tables into record batches as a streaming iterator
arrow_batches_iter = (batch for tbl in arrow_iter for batch in tbl.to_batches())

next(arrow_batches_iter)
Project-Project [Stage:1]:   0%|                                                                                                                                                          | 0/1 [00:00<?, ?it/s]Out[9]: 
pyarrow.RecordBatch
SETTLEMENTDATE: large_string
DATE: date32[day]
year: int32
----
SETTLEMENTDATE: ["2021-01-01","2021-01-01","2021-01-01"]
DATE: [2021-01-01,2021-01-01,2021-01-01]
year: [2021,2021,2021]

It seems like returning RecordBatch should be the more canonical API here, it was our mistake that we returned Tables. We'll rectify that, and likely provide alternative .to_arrow_record_batches()/.to_arrow_tables() APIs. Let me check with the team to see what they think tomorrow 🙂

samster25 commented 1 month ago

Hi @djouallah,

this should be in the next release!

djouallah commented 3 weeks ago

thank you, it is working,