pola-rs / polars

Dataframes powered by a multithreaded, vectorized query engine, written in Rust
https://docs.pola.rs
Other
30.26k stars 1.95k forks source link

Polars runs out of memory when collecting a Parquet file #18009

Open cfkstat opened 3 months ago

cfkstat commented 3 months ago

Checks

Reproducible example

in_plf = pl.scan_parquet("sample.parquet").collect()

Polars running more than 100% of the memory causes the program to crash, and reading parquet when run again will use only 1 cpu core, running very slowly, but if run again successfully (execution time is particularly long), running read again can use 100% of the CPU resources.

Log output

No response

Issue description

Running a normal Polars task again after a failed execution can only use 1 core cpu, and it is uncertain what parameters control the behavior of polars.

Expected behavior

Running a normal Polars task again after a failed execution can only use 1 core cpu, and it is uncertain what parameters control the behavior of polars.

Installed versions

``` Polars 1.3 ```
coastalwhite commented 3 months ago

Is there any chance you can provide a sample parquet file that shows this behaviour?

cfkstat commented 3 months ago

Data in the production environment, considering customer privacy, can not provide data, this kind of program behavior is not in the earlier version of polars. Observing the program, the CPU rate in the main thread is very low and you don't know what parameters will prevent the code from using more CPU.

coastalwhite commented 3 months ago

Could you provide some further information?

kszlim commented 3 months ago

Data in the production environment, considering customer privacy, can not provide data, this kind of program behavior is not in the earlier version of polars. Observing the program, the CPU rate in the main thread is very low and you don't know what parameters will prevent the code from using more CPU.

It might be worthwhile vending a tool that anonymizes parquet data for reproduction cases:

import pyarrow as pa
import pyarrow.parquet as pq
import hashlib
import random
import string

def anonymize_string(s):
    if isinstance(s, str):
        return str(hashlib.md5(s.encode()).hexdigest())
    elif isinstance(s, bytes):
        return str(hashlib.md5(s).hexdigest())
    else:
        return str(hashlib.md5(str(s).encode()).hexdigest())

def anonymize_column_name(name):
    return f"column_{anonymize_string(name)[:8]}"

def generate_random_data(dtype, length):
    if pa.types.is_string(dtype) or pa.types.is_large_string(dtype):
        return [''.join(random.choices(string.ascii_letters + string.digits, k=10)) for _ in range(length)]
    elif pa.types.is_integer(dtype):
        return [random.randint(0, 1000000) for _ in range(length)]
    elif pa.types.is_floating(dtype):
        return [random.uniform(0, 1000000) for _ in range(length)]
    elif pa.types.is_boolean(dtype):
        return [random.choice([True, False]) for _ in range(length)]
    elif pa.types.is_list(dtype):
        return [generate_random_data(dtype.value_type, random.randint(0, 5)) for _ in range(length)]
    elif pa.types.is_struct(dtype):
        return [{field.name: generate_random_data(field.type, 1)[0] for field in dtype} for _ in range(length)]
    elif pa.types.is_map(dtype):
        return [{k: v for k, v in zip(
            generate_random_data(dtype.key_type, random.randint(1, 5)),
            generate_random_data(dtype.item_type, random.randint(1, 5))
        )} for _ in range(length)]
    else:
        raise ValueError(f"Unsupported dtype: {dtype}")

def anonymize_schema(schema):
    return pa.schema([
        pa.field(anonymize_column_name(field.name), field.type, field.nullable)
        for field in schema
    ])

def anonymize_array(array):
    return pa.array(generate_random_data(array.type, len(array)))

def anonymize_column(column):
    if pa.types.is_struct(column.type):
        return pa.StructArray.from_arrays(
            [anonymize_array(column.field(i)) for i in range(len(column.type))],
            fields=[pa.field(anonymize_column_name(f.name), f.type) for f in column.type]
        )
    elif pa.types.is_list(column.type):
        return pa.ListArray.from_arrays(
            column.offsets, 
            anonymize_array(column.values)
        )
    elif pa.types.is_map(column.type):
        return pa.MapArray.from_arrays(
            column.offsets,
            anonymize_array(column.keys),
            anonymize_array(column.items)
        )
    else:
        return anonymize_array(column)

def get_compression_info(parquet_file):
    metadata = parquet_file.metadata
    compressions = set()
    compression_levels = set()

    for i in range(metadata.num_row_groups):
        for j in range(metadata.num_columns):
            col_metadata = metadata.row_group(i).column(j)
            compressions.add(col_metadata.compression)
            if col_metadata.compression != 'UNCOMPRESSED':
                compression_levels.add(getattr(col_metadata, 'compression_level', None))

    compression = max(compressions, key=list(compressions).count)
    compression_level = max(compression_levels) if compression_levels else None

    return compression, compression_level

def anonymize_parquet(input_file, output_file):
    table = pq.read_table(input_file)
    parquet_file = pq.ParquetFile(input_file)

    compression, compression_level = get_compression_info(parquet_file)

    new_schema = anonymize_schema(table.schema)
    new_columns = [anonymize_column(column) for column in table.columns]

    new_table = pa.Table.from_arrays(new_columns, schema=new_schema)

    parquet_metadata = parquet_file.metadata

    pq.write_table(
        new_table, 
        output_file,
        version=parquet_metadata.format_version,
        compression=compression,
        compression_level=compression_level,
        write_statistics=True,
        data_page_size=parquet_metadata.row_group(0).column(0).data_page_offset,
        flavor='spark',
        use_dictionary=parquet_metadata.row_group(0).column(0).encodings[0] == 'PLAIN_DICTIONARY',
        write_batch_size=parquet_metadata.row_group(0).num_rows,
    )

input_file = 'input.parquet'
output_file = 'anonymized.parquet'
anonymize_parquet(input_file, output_file)

This might be a starting point, but ideally the script would reproduce the layout of the parquet file as much as possible.

I'd give that a go and see if the issue still reproduces, and if it does, then you might be able to share it.