pola-rs / polars

Dataframes powered by a multithreaded, vectorized query engine, written in Rust
https://docs.pola.rs
Other
30.63k stars 1.99k forks source link

Deadlock when using async parquet reader #18086

Open kszlim opened 3 months ago

kszlim commented 3 months ago

Checks

Reproducible example

first do:

pip install -U polars numpy psutil
import os
os.environ["POLARS_CONCURRENCY_BUDGET"] = "1500" # not sure if this is needed or even does anything with the async local reader
os.environ["POLARS_FORCE_ASYNC"] = "1"

import polars as pl
import numpy as np
import psutil
import polars.selectors as cs
import shutil

from collections.abc import Generator, Iterable
from itertools import islice
from typing import TypeVar

T = TypeVar("T")

def create_random_dataframe(num_rows=250000, num_cols=50):
    """Create a random dataframe with specified number of rows and columns."""
    data = {f'col_{i}': np.random.randn(num_rows).astype(np.float32) for i in range(num_cols)}
    return pl.DataFrame(data)

def write_hive_partitioned_dataframes(base_path, num_partitions=2000):
    """Write random dataframes to Hive-partitioned directory, creating the Parquet file only once."""
    # Create the base directory if it doesn't exist
    os.makedirs(base_path, exist_ok=True)

    # Create a single random dataframe
    df = create_random_dataframe()

    # Write the dataframe to a temporary Parquet file
    temp_file = os.path.join(base_path, "temp.parquet")
    df.write_parquet(temp_file)

    for i in range(num_partitions):
        # Create the partition directory
        if i % 10 == 0:
            print(f"Writing {i}")
        partition_path = os.path.join(base_path, f"id={i}")
        os.makedirs(partition_path, exist_ok=True)

        # Copy the temporary Parquet file to the partition
        output_path = os.path.join(partition_path, f"part-{i}.parquet")
        if os.path.exists(output_path):
            continue
        shutil.copy(temp_file, output_path)

    # Remove the temporary file
    os.remove(temp_file)

    print(f"Created {num_partitions} Hive-partitioned dataframes in {base_path}")

def batched(iterable: Iterable[T], n: int) -> Generator[tuple[T, ...], None, None]:
    """Yield successive n-sized chunks from iterable."""
    if n < 1:
        msg = "n must be at least one"
        raise ValueError(msg)
    it = iter(iterable)
    while batch := tuple(islice(it, n)):
        yield batch

def get_ldf(base_path):
    ldf = pl.scan_parquet(
        f"{base_path}/**/*.parquet",
        retries=10,
        hive_partitioning=True,
    )
    return ldf

base_path = "hive_partitioned_data"
NUM_IDS = 1500
write_hive_partitioned_dataframes(base_path, NUM_IDS)

base_ldf = get_ldf(base_path)

IDS = list(range(NUM_IDS))

ID_COL = 'id'

exprs = []
for column in base_ldf.columns:
    exprs.append(pl.col(column).over(ID_COL)) # Don't think this is actually required, but seems to make it happen more frequently.

for i in range(200):
    ldfs = []
    for batch in batched(IDS, NUM_IDS // psutil.cpu_count()):
        ldf = base_ldf.filter(pl.col(ID_COL).is_in(batch))
        ldf = ldf.with_columns(exprs)
        ldf = ldf.group_by(ID_COL).agg(cs.float().mean().name.suffix("_mean"), cs.float().median().name.suffix("_median"), cs.float().min().name.suffix("_min"), cs.float().max().name.suffix("_max"), cs.float().std().name.suffix("_std"))
        ldfs.append(ldf)
    print(f"On iteration: {i}")
    dfs = pl.collect_all(ldfs)
    df = pl.concat(dfs)
    print(df)

Log output

Stops running after n number of iterations

Issue description

This code deadlocks after a while, took a few minutes for me (might heavily depend on your computer as it's probably a function of core count, I suspect, but cannot confirm that more cores makes it more likely to occur), i also think more files might help it reproduce more reliably. I'm running this on a c6a.24xlarge in AWS.

Expected behavior

Shouldn't deadlock

Installed versions

``` --------Version info--------- Polars: 1.4.1 Index type: UInt32 Platform: Linux-5.10.220-188.869.x86_64-x86_64-with-glibc2.26 Python: 3.11.7 (main, Dec 5 2023, 22:00:36) [GCC 7.3.1 20180712 (Red Hat 7.3.1-17)] ----Optional dependencies---- adbc_driver_manager: cloudpickle: connectorx: deltalake: fastexcel: fsspec: gevent: great_tables: hvplot: matplotlib: nest_asyncio: numpy: 2.0.1 openpyxl: pandas: pyarrow: pydantic: pyiceberg: sqlalchemy: torch: xlsx2csv: xlsxwriter: ```
kszlim commented 3 months ago

Closing https://github.com/pola-rs/polars/issues/14939 as there's a lot of irrelevant information and now there's a MRE

kszlim commented 3 months ago

I don't believe the collect_all is strictly necessary to induce a deadlock, but it does make the deadlock far easier to reproduce.

kszlim commented 3 months ago

@coastalwhite curious if you managed to reproduce this?

coastalwhite commented 3 months ago

I have not really looked at this yet, sorry.

kszlim commented 3 months ago

No worries! I just want to make sure you have all you need, happy to help with anything needed.

kszlim commented 2 months ago

One annoying thing with this deadlock is that it only reproduces on machine with a high number of vCPU.

Try reproducing this on a c6a.24xlarge on AWS. I've confirmed it doesn't reproduce on a macbook.

Can confirm this is still as issue as of 1.8.2

kszlim commented 2 months ago

I think i managed to get it to reproduce on a m1` macbook pro:

first run:

ulimit -n 10240 # This is needed due to sheer quantity of files
import os
os.environ["POLARS_CONCURRENCY_BUDGET"] = "1500" # not sure if this is needed or even does anything with the async local reader
os.environ["POLARS_FORCE_ASYNC"] = "1"
NUM_THREADS  = 192
os.environ["POLARS_MAX_THREADS"] = str(NUM_THREADS)

import polars as pl
import numpy as np
import polars.selectors as cs
import shutil

from collections.abc import Generator, Iterable
from itertools import islice
from typing import TypeVar

T = TypeVar("T")

def create_random_dataframe(num_rows=250000, num_cols=50):
    """Create a random dataframe with specified number of rows and columns."""
    data = {f'col_{i}': np.random.randn(num_rows).astype(np.float32) for i in range(num_cols)}
    return pl.DataFrame(data)

def write_hive_partitioned_dataframes(base_path, num_partitions=2000):
    """Write random dataframes to Hive-partitioned directory, creating the Parquet file only once."""
    # Create the base directory if it doesn't exist
    os.makedirs(base_path, exist_ok=True)

    # Create a single random dataframe
    df = create_random_dataframe()

    # Write the dataframe to a temporary Parquet file
    temp_file = os.path.join(base_path, "temp.parquet")
    df.write_parquet(temp_file)

    for i in range(num_partitions):
        # Create the partition directory
        if i % 10 == 0:
            print(f"Writing {i}")
        partition_path = os.path.join(base_path, f"id={i}")
        os.makedirs(partition_path, exist_ok=True)

        # Copy the temporary Parquet file to the partition
        output_path = os.path.join(partition_path, f"part-{i}.parquet")
        if os.path.exists(output_path):
            continue
        shutil.copy(temp_file, output_path)

    # Remove the temporary file
    os.remove(temp_file)

    print(f"Created {num_partitions} Hive-partitioned dataframes in {base_path}")

def batched(iterable: Iterable[T], n: int) -> Generator[tuple[T, ...], None, None]:
    """Yield successive n-sized chunks from iterable."""
    if n < 1:
        msg = "n must be at least one"
        raise ValueError(msg)
    it = iter(iterable)
    while batch := tuple(islice(it, n)):
        yield batch

def get_ldf(base_path):
    ldf = pl.scan_parquet(
        f"{base_path}/**/*.parquet",
        retries=10,
        hive_partitioning=True,
    )
    return ldf

base_path = "hive_partitioned_data"
NUM_IDS = 1000
write_hive_partitioned_dataframes(base_path, NUM_IDS)

base_ldf = get_ldf(base_path)

IDS = list(range(NUM_IDS))

ID_COL = 'id'

exprs = []
for column in base_ldf.columns:
    exprs.append(pl.col(column).over(ID_COL)) # Don't think this is actually required, but seems to make it happen more frequently.

for i in range(200):
    ldfs = []
    for batch in batched(IDS, 5):
        ldf = base_ldf.filter(pl.col(ID_COL).is_in(batch))
        ldf = ldf.with_columns(exprs)
        ldf = ldf.group_by(ID_COL).agg(cs.float().mean().name.suffix("_mean"), cs.float().median().name.suffix("_median"), cs.float().min().name.suffix("_min"), cs.float().max().name.suffix("_max"), cs.float().std().name.suffix("_std"))
        ldfs.append(ldf)
    print(f"On iteration: {i}")
    dfs = pl.collect_all(ldfs)
    df = pl.concat(dfs)
    print(df)

Seems to take longer to reproduce though.

orlp commented 2 months ago

@kszlim I tried reproducing locally but I think my macbook doesn't have enough RAM (it has 32GB).

kszlim commented 2 months ago

@orlp does it work if you reduce NUM_IDS slightly?

kszlim commented 3 weeks ago

Can confirm this still repros for me on polars 1.12.0