pola-rs / polars

Dataframes powered by a multithreaded, vectorized query engine, written in Rust
https://docs.pola.rs
Other
28.15k stars 1.74k forks source link

Add BytesIO support to `scan_csv` #4950

Open nebfield opened 1 year ago

nebfield commented 1 year ago

Problem Description

Firstly, thank you for making this fantastic library 😀

I have the following use case:

import zstandard
import polars as pl

with open(path, 'rb') as f:
    dctx = zstandard.ZstdDecompressor()
        with dctx.stream_reader(f) as reader:
            df = pl.read_csv(reader, sep='\t')

Where path is the path of a Zstandard compressed TSV file. I'm working with bioinformatics data, and bioinformaticians love to generate massive CSVs/TSVs and then compress them.

I would like to use scan_csv to read the decompressed BytesIO stream instead and take advantage of all the cool lazy evaluation features to reduce memory usage. Alternatively, it would be great if scan_csv supported Zstandard compressed file paths directly.

Thanks for your time!

ghuls commented 1 year ago

Out of curiosity, is this Zstandard compressed TSV file public data or is it from an internal pipeline? Never saw any Zstandard compressed TSV files in the wild myself unfortunately.

nebfield commented 1 year ago

The file I'm trying to work with is a variant information file from plink2 (.pvar.zst or .bim.zst). plink is a popular toolkit used in genome wide association studies and related areas.

Some public data are available here.

.pvar spec

indigoviolet commented 1 year ago

+1 to this feature request; in the meantime, you can use pandas.read_csv or pyarrow.csv.open_csv to get some of this behavior.

ritchie46 commented 1 year ago

Why go via pandas? pl.read_csv accepts a BytesIO?

AroneyS commented 1 year ago

This would also help with https://github.com/pola-rs/polars/issues/7514

corneliusroemer commented 1 year ago

I managed to get read_csv to work with with xopen("data.csv.zst", "rb") as f, see #7287

@ghuls at Nextstrain we've started to zst compress everything, so yeah, there are now metadata.tsv.zst's in the wild :)

corneliusroemer commented 1 year ago

@ritchie46: Why go via pandas? pl.read_csv accepts a BytesIO?

See #9266 why pl.read_csv doesn't work in this case, unless I'm doing something wrong, read_csv reads the whole file into memory. I haven't found a way to emulate pandas usecol behaviour for zstd compressed csvs with polars.

ghuls commented 1 year ago

You can use parquet-fromcsv in the meantime to convert compressed CSV/TSV files to parquet and use pl.scan_parquet on them: https://github.com/pola-rs/polars/issues/9283#issuecomment-1594512827

itamarst commented 11 months ago

Looking through the code, it seems like the limitation eventually devolves to LazyScanBuilder::scan_csv(), which accepts a Into<std::path::PathBuf>. It then opens that and converts it into a File. After a bit of back and forth this gets turned into a ReaderBytes. And then later it makes a LogicalPlan::Scan which takes a PathBuf.

LogicalPlan::Scan eventually becomes a ALogicalPlan::Scan I think, and there's some code in physical_plan/planner/lp.rs which creates a CvsExec from that, which in turn creates a CsvReader... specifically for CSVs. An ALogicalPlan::Scan can also be parquet etc., so option will either be splitting up the work across multiple PRs and there's intermediate stage where #10413 is not yet done (but closer) or doing it in one big change, which seems worse.

There are likely other code paths that interact with (A)LogicalPlan::Scan.

CsvReader works off of anything with MmapBytesReader trait (as does ParquetReader).

Specifically for the BytesIO case, to support MmapBytesReader one would getbuffer(), which prevents resizing at least. One could then use the buffer API to extract a pointer to the underlying data, and then multiple threads can read from that without holding the GIL, so long as additionally a reference is kept to the BytesIO object (or Python buffer object) so it isn't GCed.

itamarst commented 11 months ago

My first thought for implementation based on above is replacing all PathBuf in the various classes (LogicalPlan::Scan etc) with:

enum FileSource {
    Path(PathBuf),
    Reader(Box<dyn MmapBytesReader>),
}
itamarst commented 11 months ago

But... that's sort of limiting. The more general solution is making the CsvStreamer not assuming the CSV fits in memory. In particular, if csv CoreReader could bee modified to take a GetReaderBytes trait implementor as an input instead of ReaderBytes, which lets you get ReaderBytes at some position and ReaderBytes can implement GetReaderBytes for itself, and much of the code would stay the same.

trait GetReaderBytes {
    fn read(&self, offset: usize, length: usize) -> ReaderBytes
}
MaxPowerWasTaken commented 4 months ago

You can use parquet-fromcsv in the meantime to convert compressed CSV/TSV files to parquet and use pl.scan_parquet on them: #9283 (comment)

I think I'm understanding correctly that this is a recommendation to use a rust library. Any advice for the less-cool among us who are still working pretty exclusively from a python environment?

ghuls commented 4 months ago

You can use parquet-fromcsv in the meantime to convert compressed CSV/TSV files to parquet and use pl.scan_parquet on them: #9283 (comment)

I think I'm understanding correctly that this is a recommendation to use a rust library. Any advice for the less-cool among us who are still working pretty exclusively from a python environment?

It is a command line tool, but part of the rust arrow library.

MaxPowerWasTaken commented 4 months ago

You can use parquet-fromcsv in the meantime to convert compressed CSV/TSV files to parquet and use pl.scan_parquet on them: #9283 (comment)

I think I'm understanding correctly that this is a recommendation to use a rust library. Any advice for the less-cool among us who are still working pretty exclusively from a python environment?

It is a command line tool, but part of the rust arrow library.

awesome, thanks for the correction, yeah cli tools work for my use case.

AlexanderNenninger commented 2 weeks ago

+1 to this. Implementing this proposal would allow for scanning memory-mapped files that need additional processing, e.g. parsing custom headers.