pola-rs / polars

Dataframes powered by a multithreaded, vectorized query engine, written in Rust
https://docs.pola.rs
Other
29.32k stars 1.86k forks source link

Support lazy schema retrieval in IO Plugins #18638

Open tmct opened 1 week ago

tmct commented 1 week ago

Description

Hi,

I just wrote my first Python IO Plugin, for fetching data from a company-internal data source - very pleased that you can create LazyFrames of completely custom origin now, thanks! This brings us one step closer to expressing entire data processing pipelines declaratively without breaks using the Lazy API.

What is missing from the example, and would really help me to put my scan_custom_data_source into production, is the ability to collect the schema lazily, as has been recently-ish made possible in LazyFrames.

Is there already a way to do this, or are changes needed to register_io_source?

Many thanks, Tom

tmct commented 1 week ago

Let me know if this is worth another Issue, but I am also interested in the scenario where I am able to leverage existing built-in LazyFrame scans, but want to do some lazily-defined work beforehand.

Motivating example: I would like to write my own scan_encrypted_parquet method. I can of course write a register_io_source callable which decrypts files and yields DataFrames, but this means that any compute graph defined within that method is disconnected from later compute. And the onus is then on me to pass through the with_columns args etc appropriately. Imagine for example a similar function to register_io_source, whose callable argument returns LazyFrames.

tmct commented 1 week ago

Here is my work in progress, illustrating both questions I have:

def scan_parquet_sc(
    f: Path,
    schema: SchemaDict,  # ideally we would collect this lazily (and maybe cache the result if collect_schema is called before collect.) But until then, don't get this schema wrong!
) -> pl.LazyFrame:
    """Decrypts a parquet file then scans.

    Don't ask why we're encrypting the whole file instead of using the Parquet columnar standard..."""

    def pq_source(
        with_columns: list[str] | None,
        predicate: pl.Expr | None,
        _n_rows: int | None,
        _batch_size: int | None,
    ) -> Iterator[pl.DataFrame]:
        res = _decrypt_and_scan_parquet_sc(f, with_columns, predicate, _n_rows, _batch_size)
        if _batch_size is not None:
            logger.warning(f"Was passed {_batch_size=} but will ignore for now - maybe we should collect row group by row group then check if that's big enough to batch")
        yield res.collect()  # if batch=None I would perhaps like a way to return a LazyFrame?

    return register_io_source(pq_source, schema=schema)

def _decrypt_and_scan_parquet_sc(f: Path,
       with_columns: list[str] | None,
       predicate: pl.Expr | None,
        n_rows: int | None,
       _batch_size: int | None,
   ) -> pl.LazyFrame:
    pq_bytes = BytesIO(crypt.decrypt_bytes(f.read_bytes()))
    df = pl.scan_parquet(pq_bytes)  # once #10413 is released...
    if with_columns is not None:
        df = df.select(with_columns)
    if predicate is not None:
        df = df.filter(predicate)
    if _n_rows is not None:
        df = df.head(_n_rows)  # I'm pretty sure you want head and not fetch here
    return df

def _collect_schema_parquet_sc(f: Path):
    # TODO implement and hook in when supported
    ...
ritchie46 commented 1 week ago

Hey @tmct, great that you made an IO plugin! That's where they are for.

This example shows the ability to create the schema lazily:

https://github.com/pola-rs/pyo3-polars/tree/main/example/io_plugin

tmct commented 1 week ago

Many thanks Ritchie, I'll give it a go.

tmct commented 1 week ago

Does 'src.schema()' in that example not realise the schema at the time of scan_... though, rather than lazily? I do not understand the significance of that last instance of the Source, so likely I've misunderstood how it works