pola-rs / polars

Dataframes powered by a multithreaded, vectorized query engine, written in Rust
https://docs.pola.rs
Other
30.14k stars 1.94k forks source link

Write only one dictionary when sinking to IPC #6407

Open ghuls opened 1 year ago

ghuls commented 1 year ago

Problem description

Write only one dictionary when sinking to IPC.

It would be great that when writing categorial data to an IPC sink, only one unified dictionary is written, instead of multiple ones as the IPC format does not support it.

As internally Polars still would add new values to the string cache, it shouldn't be a problem to write the full dictionary in the end as values that are not seen in the latest batch are not forgotten due to the string cache (unless it is physically required for a dictionary to be located close to its batch).

# Writing categorical data from scan_csv with sink_ipc writes different dictionaries.
In [25]: %time pl.scan_csv("test.tsv", sep="\t", has_header=False).with_columns([pl.col(pl.Utf8).cast(pl.Categorical)]).sink_ipc("test.categorical.ipc")
thread '<unnamed>' panicked at 'called `Result::unwrap()` on an `Err` value: ArrowError(InvalidArgumentError("Dictionary replacement detected when writing IPC file format. Arrow IPC files only support a single dictionary for a given field across all batches."))', /home/runner/work/polars/polars/polars/polars-lazy/polars-pipe/src/executors/sinks/file_sink.rs:148:54
thread '<unnamed>' panicked at 'called `Result::unwrap()` on an `Err` value: "SendError(..)"', /home/runner/work/polars/polars/polars/polars-lazy/polars-pipe/src/executors/sinks/file_sink.rs:175:43
thread '<unnamed>' panicked at 'called `Result::unwrap()` on an `Err` value: "SendError(..)"', /home/runner/work/polars/polars/polars/polars-lazy/polars-pipe/src/executors/sinks/file_sink.rs:175:43
thread '<unnamed>' panicked at 'called `Result::unwrap()` on an `Err` value: "SendError(..)"', /home/runner/work/polars/polars/polars/polars-lazy/polars-pipe/src/executors/sinks/file_sink.rs:175:43
thread '<unnamed>' panicked at 'called `Result::unwrap()` on an `Err` value: "SendError(..)"', /home/runner/work/polars/polars/polars/polars-lazy/polars-pipe/src/executors/sinks/file_sink.rs:175:43
thread '<unnamed>' panicked at 'called `Result::unwrap()` on an `Err` value: "SendError(..)"', /home/runner/work/polars/polars/polars/polars-lazy/polars-pipe/src/executors/sinks/file_sink.rs:175:43
thread '<unnamed>' panicked at 'called `Result::unwrap()` on an `Err` value: "SendError(..)"', /home/runner/work/polars/polars/polars/polars-lazy/polars-pipe/src/executors/sinks/file_sink.rs:175:43
thread '<unnamed>' panicked at 'called `Result::unwrap()` on an `Err` value: "SendError(..)"', /home/runner/work/polars/polars/polars/polars-lazy/polars-pipe/src/executors/sinks/file_sink.rs:175:43
thread '<unnamed>' panicked at 'called `Result::unwrap()` on an `Err` value: "SendError(..)"', /home/runner/work/polars/polars/polars/polars-lazy/polars-pipe/src/executors/sinks/file_sink.rs:175:43
thread '<unnamed>' panicked at 'called `Result::unwrap()` on an `Err` value: "SendError(..)"', /home/runner/work/polars/polars/polars/polars-lazy/polars-pipe/src/executors/sinks/file_sink.rs:175:43
thread '<unnamed>' panicked at 'called `Result::unwrap()` on an `Err` value: "SendError(..)"', /home/runner/work/polars/polars/polars/polars-lazy/polars-pipe/src/executors/sinks/file_sink.rs:175:43
thread '<unnamed>' panicked at 'called `Result::unwrap()` on an `Err` value: "SendError(..)"', /home/runner/work/polars/polars/polars/polars-lazy/polars-pipe/src/executors/sinks/file_sink.rs:175:43
thread '<unnamed>' panicked at 'called `Result::unwrap()` on an `Err` value: "SendError(..)"', /home/runner/work/polars/polars/polars/polars-lazy/polars-pipe/src/executors/sinks/file_sink.rs:175:43
---------------------------------------------------------------------------
PanicException                            Traceback (most recent call last)
<timed eval> in <module>

~/software/anaconda3/envs/polars/lib/python3.8/site-packages/polars/internals/lazyframe/frame.py in sink_ipc(self, path, compression, maintain_order, type_coercion, predicate_pushdown, projection_pushdown, simplify_expression, no_optimization, slice_pushdown)
   1333             streaming=True,
   1334         )
-> 1335         return ldf.sink_ipc(
   1336             path=path,
   1337             compression=compression,

PanicException: called `Result::unwrap()` on an `Err` value: "SendError(..)"

# Collecting all result first, so only one dictionary is written, solves it for IPC.
In [26]: %time pl.scan_csv(test.tsv", sep="\t", has_header=False).with_columns([pl.col(pl.Utf8).cast(pl.Categorical)]).collect().lazy().sink_ipc("test.categorical.ipc")
CPU times: user 39.2 s, sys: 9.72 s, total: 49 s
Wall time: 18.2 s

# Parquet supports writing different dictionaries.
In [28]: %time pl.scan_csv("test.tsv", sep="\t", has_header=False).with_columns([pl.col(pl.Utf8).cast(pl.Categorical)]).sink_parquet("test.categorical.parquet")
CPU times: user 40.2 s, sys: 2.1 s, total: 42.3 s
Wall time: 12.1 s

Not sure if it should be handled automatically or not, but reading parquet/ipc files with multiple categorical columns requires to enable the StringCache manually.

In [39]: %%time
    ...: df = pl.read_ipc("test.categorical.ipc")
    ...: # df = pl.read_parquet("test.categorical.parquet")
    ...: # df = pl.scan_ipc("test.categorical.ipc").head().collect()
    ...: # df = pl.scan_parquet("test.categorical.parquet").head().collect()
    ...: 
could not mmap compressed IPC file, defaulting to normal read
thread '<unnamed>' panicked at 'should not fail: ComputeError(Borrowed("Cannot concat Categoricals coming from a different source. Consider setting a global StringCache."))', /home/runner/work/polars/polars/polars/polars-core/src/frame/mod.rs:903:36
---------------------------------------------------------------------------
PanicException                            Traceback (most recent call last)
<timed exec> in <module>

~/software/anaconda3/envs/polars/lib/python3.8/site-packages/polars/utils.py in wrapper(*args, **kwargs)
    392         def wrapper(*args: P.args, **kwargs: P.kwargs) -> T:
    393             _rename_kwargs(fn.__name__, kwargs, aliases)
--> 394             return fn(*args, **kwargs)
    395 
    396         return wrapper

~/software/anaconda3/envs/polars/lib/python3.8/site-packages/polars/io.py in read_ipc(file, columns, n_rows, use_pyarrow, memory_map, storage_options, row_count_name, row_count_offset, rechunk)
    865             return df
    866 
--> 867         return DataFrame._read_ipc(
    868             data,
    869             columns=columns,

~/software/anaconda3/envs/polars/lib/python3.8/site-packages/polars/internals/dataframe/frame.py in _read_ipc(cls, file, columns, n_rows, row_count_name, row_count_offset, rechunk, memory_map)
    869         projection, columns = handle_projection_columns(columns)
    870         self = cls.__new__(cls)
--> 871         self._df = PyDataFrame.read_ipc(
    872             file,
    873             columns,

In [48]: %%time
    ...: with pl.StringCache():
    ...:     pl.read_ipc("test.categorical.ipc")
    ...:     pl.read_parquet("test.categorical.parquet")
    ...:     pl.scan_ipc("test.categorical.ipc").head().collect()
    ...:     pl.scan_parquet("test.categorical.parquet").head().collect()
    ...: 
ghuls commented 1 year ago

It still does not work by default in 0.19.8:

In [4]: %time pl.scan_csv("test.tsv", separator="\t", has_header=False, comment_char='#').with_columns([pl.col(pl.Utf8).cast(pl.Categorical)]).sink_ipc("test.categorical.ipc")
thread '<unnamed>' panicked at /home/runner/work/polars/polars/crates/polars-pipe/src/executors/sinks/file_sink.rs:248:54:
called `Result::unwrap()` on an `Err` value: ArrowError(InvalidArgumentError("Dictionary replacement detected when writing IPC file format. Arrow IPC files only support a single dictionary for a given field across all batches."))
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
thread '<unnamed>' panicked at /home/runner/work/polars/polars/crates/polars-pipe/src/executors/sinks/file_sink.rs:275:43:
called `Result::unwrap()` on an `Err` value: "SendError(..)"
thread '<unnamed>' panicked at /home/runner/work/polars/polars/crates/polars-pipe/src/executors/sinks/file_sink.rs:275:43:
called `Result::unwrap()` on an `Err` value: "SendError(..)"
thread '<unnamed>' panicked at /home/runner/work/polars/polars/crates/polars-pipe/src/executors/sinks/file_sink.rs:275:43:
called `Result::unwrap()` on an `Err` value: "SendError(..)"
---------------------------------------------------------------------------
PanicException                            Traceback (most recent call last)
File <timed eval>:1

File ~/software/anaconda3/envs/polars/lib/python3.8/site-packages/polars/lazyframe/frame.py:2057, in LazyFrame.sink_ipc(self, path, compression, maintain_order, type_coercion, predicate_pushdown, projection_pushdown, simplify_expression, slice_pushdown, no_optimization)
   2010 """
   2011 Evaluate the query in streaming mode and write to an IPC file.
   2012 
   (...)
   2046 
   2047 """
   2048 lf = self._set_sink_optimizations(
   2049     type_coercion=type_coercion,
   2050     predicate_pushdown=predicate_pushdown,
   (...)
   2054     no_optimization=no_optimization,
   2055 )
-> 2057 return lf.sink_ipc(
   2058     path=path,
   2059     compression=compression,
   2060     maintain_order=maintain_order,
   2061 )

PanicException: called `Result::unwrap()` on an `Err` value: "SendError(..)"

In [6]: pl.show_versions()
--------Version info---------
Polars:              0.19.8
Index type:          UInt32
Platform:            Linux-5.15.0-86-generic-x86_64-with-glibc2.10
Python:              3.8.12 | packaged by conda-forge | (default, Oct 12 2021, 21:57:06) 
[GCC 9.4.0]

----Optional dependencies----
adbc_driver_sqlite:  <not installed>
cloudpickle:         2.2.1
connectorx:          <not installed>
deltalake:           <not installed>
fsspec:              <not installed>
gevent:              <not installed>
matplotlib:          <not installed>
numpy:               1.24.4
openpyxl:            <not installed>
pandas:              1.5.3
pyarrow:             11.0.0
pydantic:            <not installed>
pyiceberg:           <not installed>
pyxlsb:              <not installed>
sqlalchemy:          <not installed>
xlsx2csv:            <not installed>
xlsxwriter:          <not installed>
ByteNybbler commented 11 months ago

Here's a small Rust example that reproduces the panic:

use polars::prelude::*;

fn main() {
    let df = df![
        "strings" => &["a", "b", "c", "d", "e"],
    ].unwrap();

    df.lazy()
        .with_columns([
            col("strings").cast(DataType::Categorical(None))
        ])
        .sink_ipc(
            std::path::PathBuf::from("out.arrow"),
            IpcWriterOptions::default(),
        )
        .unwrap()
}
run UdfExec
RUN STREAMING PIPELINE
df -> hstack -> parquet_sink
RefCell { value: [] }
thread '<unnamed>' panicked at /Users/bytenybbler/.cargo/registry/src/index.crates.io-6f17d22bba15001f/polars-pipe-0.35.4/src/executors/sinks/file_sink.rs:286:54:
called `Result::unwrap()` on an `Err` value: InvalidOperation(ErrString("Dictionary replacement detected when writing IPC file format. Arrow IPC files only support a single dictionary for a given field across all batches."))
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
thread 'main' panicked at /Users/bytenybbler/.cargo/registry/src/index.crates.io-6f17d22bba15001f/polars-pipe-0.35.4/src/executors/sinks/file_sink.rs:336:14:
called `Result::unwrap()` on an `Err` value: Any { .. }

On the other hand, performing the categorical cast prior to the streaming operation does not cause a panic:

use polars::prelude::*;

fn main() {
    let categoricals = Series::new("strings", ["a", "b", "c", "d", "e"])
        .cast(&DataType::Categorical(None)).unwrap();

    let df = DataFrame::new(vec![categoricals]).unwrap();

    df.lazy()
        .sink_ipc(
            std::path::PathBuf::from("out.arrow"),
            IpcWriterOptions::default(),
        )
        .unwrap()
}
ByteNybbler commented 11 months ago

Seems like DictionaryTracker::insert in crates/polars-arrow/src/io/ipc/write/common.rs is trying to create a new dictionary for every differing string value it encounters in a column when casting to categorical. Since the IPC implementation only allows one dictionary per column, Polars does not like this.