pola-rs / polars

Dataframes powered by a multithreaded, vectorized query engine, written in Rust
https://docs.pola.rs
Other
29.26k stars 1.85k forks source link

Support reading of zstd compressed csv files #9283

Open corneliusroemer opened 1 year ago

corneliusroemer commented 1 year ago

Problem description

I wish I could read zstd compressed csv files using read_csv and ideally also scan_csv. See https://stackoverflow.com/questions/76417610/how-to-read-csv-a-zstd-compressed-file-using-python-polars/76420788#76420788

Possibly related:

ghuls commented 1 year ago

For now you can use parquet-fromcsv from arrow-rs to convert CSV/TSV files to parquet.

This supports compressed (uncompressed, snappy, gzip brotli lz4`zstd) files only relatively recently: https://github.com/apache/arrow-rs/issues/3721

parquet-fromcsv is also useful in general to read compressed CSV/TSV files and convert them to parquet, so you can use pl.scan_parquet on very big files.

# Clone Rust arrow repo.
git clone https://github.com/apache/arrow-rs

# Build arrow CLI tools.
cargo build --release --features=cli

# Current list of Arrow CLI tools:
❯  file  target/release/*|grep ELF
target/release/arrow-file-to-stream:              ELF 64-bit LSB shared object, x86-64, version 1 (SYSV), dynamically linked, interpreter /lib64/ld-linux-x86-64.so.2, for GNU/Linux 3.2.0, with debug_info, not stripped, too many notes (256)
target/release/arrow-json-integration-test:       ELF 64-bit LSB shared object, x86-64, version 1 (SYSV), dynamically linked, interpreter /lib64/ld-linux-x86-64.so.2, for GNU/Linux 3.2.0, with debug_info, not stripped, too many notes (256)
target/release/arrow-stream-to-file:              ELF 64-bit LSB shared object, x86-64, version 1 (SYSV), dynamically linked, interpreter /lib64/ld-linux-x86-64.so.2, for GNU/Linux 3.2.0, with debug_info, not stripped, too many notes (256)
target/release/flight-test-integration-client:    ELF 64-bit LSB shared object, x86-64, version 1 (SYSV), dynamically linked, interpreter /lib64/ld-linux-x86-64.so.2, for GNU/Linux 3.2.0, with debug_info, not stripped, too many notes (256)
target/release/flight-test-integration-server:    ELF 64-bit LSB shared object, x86-64, version 1 (SYSV), dynamically linked, interpreter /lib64/ld-linux-x86-64.so.2, for GNU/Linux 3.2.0, with debug_info, not stripped, too many notes (256)
target/release/gen:                               ELF 64-bit LSB shared object, x86-64, version 1 (SYSV), dynamically linked, interpreter /lib64/ld-linux-x86-64.so.2, for GNU/Linux 3.2.0, with debug_info, not stripped, too many notes (256)
target/release/libparquet_derive.so:              ELF 64-bit LSB shared object, x86-64, version 1 (SYSV), dynamically linked, with debug_info, not stripped, too many notes (256)
target/release/parquet-concat:                    ELF 64-bit LSB shared object, x86-64, version 1 (SYSV), dynamically linked, interpreter /lib64/ld-linux-x86-64.so.2, for GNU/Linux 3.2.0, with debug_info, not stripped, too many notes (256)
target/release/parquet-fromcsv:                   ELF 64-bit LSB shared object, x86-64, version 1 (SYSV), dynamically linked, interpreter /lib64/ld-linux-x86-64.so.2, for GNU/Linux 3.2.0, with debug_info, not stripped, too many notes (256)
target/release/parquet-index:                     ELF 64-bit LSB shared object, x86-64, version 1 (SYSV), dynamically linked, interpreter /lib64/ld-linux-x86-64.so.2, for GNU/Linux 3.2.0, with debug_info, not stripped, too many notes (256)
target/release/parquet-layout:                    ELF 64-bit LSB shared object, x86-64, version 1 (SYSV), dynamically linked, interpreter /lib64/ld-linux-x86-64.so.2, for GNU/Linux 3.2.0, with debug_info, not stripped, too many notes (256)
target/release/parquet-read:                      ELF 64-bit LSB shared object, x86-64, version 1 (SYSV), dynamically linked, interpreter /lib64/ld-linux-x86-64.so.2, for GNU/Linux 3.2.0, with debug_info, not stripped, too many notes (256)
target/release/parquet-rewrite:                   ELF 64-bit LSB shared object, x86-64, version 1 (SYSV), dynamically linked, interpreter /lib64/ld-linux-x86-64.so.2, for GNU/Linux 3.2.0, with debug_info, not stripped, too many notes (256)
target/release/parquet-rowcount:                  ELF 64-bit LSB shared object, x86-64, version 1 (SYSV), dynamically linked, interpreter /lib64/ld-linux-x86-64.so.2, for GNU/Linux 3.2.0, with debug_info, not stripped, too many notes (256)
target/release/parquet-schema:                    ELF 64-bit LSB shared object, x86-64, version 1 (SYSV), dynamically linked, interpreter /lib64/ld-linux-x86-64.so.2, for GNU/Linux 3.2.0, with debug_info, not stripped, too many notes (256)
target/release/parquet-show-bloom-filter:         ELF 64-bit LSB shared object, x86-64, version 1 (SYSV), dynamically linked, interpreter /lib64/ld-linux-x86-64.so.2, for GNU/Linux 3.2.0, with debug_info, not stripped, too many notes (256)

# Use parquet-fromcsv to convert CSV/TSV file to parquet, which you then can use with Polars.
$ target/release/parquet-fromcsv --help
Binary to convert csv to Parquet

Usage: parquet-fromcsv [OPTIONS] --schema <SCHEMA> --input-file <INPUT_FILE> --output-file <OUTPUT_FILE>

Options:
  -s, --schema <SCHEMA>
          message schema for output Parquet

  -i, --input-file <INPUT_FILE>
          input CSV file

  -o, --output-file <OUTPUT_FILE>
          output Parquet file

  -f, --input-format <INPUT_FORMAT>
          input file format

          [default: csv]
          [possible values: csv, tsv]

  -b, --batch-size <BATCH_SIZE>
          batch size

          [env: PARQUET_FROM_CSV_BATCHSIZE=]
          [default: 1000]

  -h, --has-header
          has header

  -d, --delimiter <DELIMITER>
          field delimiter

          default value: when input_format==CSV: ',' when input_format==TSV: 'TAB'

  -r, --record-terminator <RECORD_TERMINATOR>
          record terminator

          [possible values: lf, crlf, cr]

  -e, --escape-char <ESCAPE_CHAR>
          escape character

  -q, --quote-char <QUOTE_CHAR>
          quote character

  -D, --double-quote <DOUBLE_QUOTE>
          double quote

          [possible values: true, false]

  -C, --csv-compression <CSV_COMPRESSION>
          compression mode of csv

          [default: UNCOMPRESSED]

  -c, --parquet-compression <PARQUET_COMPRESSION>
          compression mode of parquet

          [default: SNAPPY]

  -w, --writer-version <WRITER_VERSION>
          writer version

  -m, --max-row-group-size <MAX_ROW_GROUP_SIZE>
          max row group size

      --enable-bloom-filter <ENABLE_BLOOM_FILTER>
          whether to enable bloom filter writing

          [possible values: true, false]

      --help
          display usage help

  -V, --version
          Print version

# Example: Reading gzipped TSV file and converting to parquet.
$ parquet-fromcsv \
    --schema fragments.schema \
    --input-format tsv \
    --csv-compression gzip \
   --parquet-compression zstd \
   --input-file fragments.raw.tsv.gz \
    --output-file fragments.parquet

# To get the schema file:
#   - get a small uncompressed part of the compressed CSV/TSV file
#   - Read the file with Polars and write to parquet. 
#   - Get schema from parquet file with parquet-schema 
$ zcat  fragments.raw.tsv.gz | head -n 10000 > /tmp/fragments.head10000.tsv

pl.read_csv("/tmp/fragments.head10000.tsv", separator="\t", has_header=False).write_parquet("/tmp/fragments.head1000.parquet")

$ parquet-schema  /tmp/fragments.head10000.parquet | grep -A 1000000 '^message' > fragments.schema

$  cat fragments.schema
message root {
  OPTIONAL BYTE_ARRAY column_1 (STRING);
  OPTIONAL INT64 column_2;
  OPTIONAL INT64 column_3;
  OPTIONAL BYTE_ARRAY column_4 (STRING);
  OPTIONAL INT64 column_5;
}
AndreaBarbon commented 5 months ago

Very useful, thanks!

Can we use this on a batch of files?

PS: Note that there's a typo in the number of zeros: Should be consistently 1000 or 10000

ghuls commented 2 months ago

Can we use this on a batch of files?

With GNU parallel it is quite easy to run a similar command with different arguments.

# Make a list of all files you want to process (`ls -1` or `cat file_list`)
# Run 4 conversions at the same time (`-j 4`), where `{}` represents a certain line (a `tsv.gz` file in this case) and `{..}` the same minus the `.tsv.gz` part at the end (strip extension twice).
ls -1 *.tsv.gz \
  | parallel -j 4 --plus \
        parquet-fromcsv \
            --schema fragments.schema \
            --input-format tsv \
            --csv-compression gzip \
            --parquet-compression zstd \
            --input-file {} \
            --output-file {..}.parquet

PS: Note that there's a typo in the number of zeros: Should be consistently 1000 or 10000

Fixed