kedro-org / kedro-plugins

First-party plugins maintained by the Kedro team.
Apache License 2.0
94 stars 90 forks source link

Polars Datatype Catalog Entry Cannot Partition on Saving Parquet #908

Open alexdavis24 opened 1 month ago

alexdavis24 commented 1 month ago

Description

Context

Steps to Reproduce

  1. Sample dataset that runs locally with no issues:
    df = pl.DataFrame(
    {"A": [1, 2, 3],
        "B": [4, 5, 6]}
    )
    path = "tmp/test.parquet"
    df.write_parquet(
    path,
    use_pyarrow=True,
    pyarrow_options={"partition_cols": ["B"]}
    )
    # this also runs with no issues
    df.write_parquet(
    path,
    partition_by=["B"]
    )
  2. Sample code following the same implementation:
    # pipelines.py
    from kedro.pipeline import Pipeline, node
    from kedro.pipeline.modular_pipeline import pipeline
    import polars as pl
    def my_func():
    return pl.DataFrame(
    {"A": [1, 2, 3],
        "B": [4, 5, 6]}
    )
    def create_pipeline() -> Pipeline:
    return pipeline(
    node(
                func=my_func,
                inputs={}
                outputs="my_entry",
                name="partition_polars"        
    )
# catalog.yml
# using Rust
my_entry:
  # also tried with polars.LazyPolarsDatset
  type: polars.EagerPolarsDataset 
  filepath: /tmp/test.parquet
  file_format: parquet
  save_args:
    partition_by: 
      - B
# catalog.yml
# using pyarrow (C++)
my_entry:
  type: polars.EagerPolarsDataset
  filepath: /tmp/test.parquet
  file_format: parquet
  save_args:
    use_pyarrow: True
    pyarrow_options:
      partition_cols: 
      - B
  fs_args:
    filesystem: pyarrow._fs.FileSystem

Expected Result

Actual Result

From Rust implementation:

DatasetError: Failed while saving data to data set
EagerPolarsDataset(file_format=parquet, filepath=/tmp/test.parquet,
load_args={}, protocol=file, save_args={'partition_by': ['dt1y']}).
'BytesIO' object cannot be converted to 'PyString'

From Pyarrow:

DatasetError: Failed while saving data to data set
 LazyPolarsDataset(filepath=/tmp/test.parquet, load_args={}, protocol=file, 
save_args={'pyarrow_options': {'compression': zstd, 'partition_cols': ['dt1y'],
'write_statistics': True}, 'use_pyarrow': True}).
Argument 'filesystem' has incorrect type (expected pyarrow._fs.FileSystem, got 
NoneType)

Your Environment

datajoely commented 1 month ago

Just want to say thanks for such a clear write up and investigation 💪

SajidAlamQB commented 1 month ago

Thank you @alexdavis24 for reporting this, we'll have a look.