pola-rs / polars

Dataframes powered by a multithreaded, vectorized query engine, written in Rust
https://docs.pola.rs
Other
29.12k stars 1.83k forks source link

Multiprocessing Dataframe Broken between 0.20.6 and 0.20.7 #14448

Closed teddy661 closed 6 months ago

teddy661 commented 6 months ago

Checks

Reproducible example

import ast import itertools import multiprocessing as mp import os import platform import requests from io import BytesIO from pathlib import Path from typing import Callable

import numpy as np import polars as pl

def parallelize_dataframe( df: pl.DataFrame, func: Callable[[pl.DataFrame], pl.DataFrame], n_cores: int = 4, ) -> pl.DataFrame: """ Enable parallel processing of a dataframe by splitting it by the number of cores and then recombining the results. """ rows_per_dataframe = df.height // n_cores remainder = df.height % n_cores num_rows = [rows_per_dataframe] * (n_cores - 1) num_rows.append(rows_per_dataframe + remainder) start_pos = [0] for n in num_rows: start_pos.append(start_pos[-1] + n) df_split = [] for start, rows in zip(start_pos, num_rows): df_split.append(df.slice(start, rows)) func_list = list(itertools.repeat(func, len(df_split))) pool_args = list(zip(df_split, func_list)) pool = mp.Pool(n_cores) new_df = pl.concat(pool.map(process_chunk, pool_args)) pool.close() pool.join() return new_df

def process_chunk(args: tuple) -> pl.DataFrame: """ Process a chunk of the dataframe """ df, func = args new_df = func(df) return new_df

def convert_wrapper(df: pl.DataFrame) -> pl.DataFrame: df = df.with_columns( pl.col("NER").map_elements(lambda x: ast.literal_eval(x)).alias("NER"), ) return df

def main(): num_cpus = 4

url = "https://drive.google.com/uc?export=download&id=1Weu5A0XWf5nWtVbvBkGDu7sD9woZm9GQ"

#r = requests.get(url, allow_redirects=True)
target_file = Path("./polars-0.20.7-bug.csv")
df = pl.read_csv(target_file)
df = parallelize_dataframe(df, convert_wrapper, num_cpus)
print(df.head())

if name == "main": main()

Log output

Process SpawnPoolWorker-2:
Traceback (most recent call last):
  File "C:\Python\Python311\Python311\Lib\multiprocessing\process.py", line 314, in _bootstrap
    self.run()
  File "C:\Python\Python311\Python311\Lib\multiprocessing\process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "C:\Python\Python311\Python311\Lib\multiprocessing\pool.py", line 114, in worker
    task = get()
           ^^^^^
  File "C:\Python\Python311\Python311\Lib\multiprocessing\queues.py", line 367, in get
    return _ForkingPickler.loads(res)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\edbrown\Documents\01-Berkeley\281\281-venv\Lib\site-packages\polars\series\series.py", line 626, in __setstate__
    self._s.__setstate__(state)
polars.exceptions.ComputeError: out-of-spec: ExpectedBuffer
Process SpawnPoolWorker-4:
Traceback (most recent call last):
  File "C:\Python\Python311\Python311\Lib\multiprocessing\process.py", line 314, in _bootstrap
    self.run()
  File "C:\Python\Python311\Python311\Lib\multiprocessing\process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "C:\Python\Python311\Python311\Lib\multiprocessing\pool.py", line 114, in worker
    task = get()
           ^^^^^
  File "C:\Python\Python311\Python311\Lib\multiprocessing\queues.py", line 367, in get
    return _ForkingPickler.loads(res)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\edbrown\Documents\01-Berkeley\281\281-venv\Lib\site-packages\polars\series\series.py", line 626, in __setstate__
    self._s.__setstate__(state)
polars.exceptions.ComputeError: out-of-spec: ExpectedBuffer
Process SpawnPoolWorker-3:
Traceback (most recent call last):
  File "C:\Python\Python311\Python311\Lib\multiprocessing\process.py", line 314, in _bootstrap
    self.run()
  File "C:\Python\Python311\Python311\Lib\multiprocessing\process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "C:\Python\Python311\Python311\Lib\multiprocessing\pool.py", line 114, in worker
    task = get()
           ^^^^^
  File "C:\Python\Python311\Python311\Lib\multiprocessing\queues.py", line 367, in get
    return _ForkingPickler.loads(res)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\edbrown\Documents\01-Berkeley\281\281-venv\Lib\site-packages\polars\series\series.py", line 626, in __setstate__
    self._s.__setstate__(state)
polars.exceptions.ComputeError: out-of-spec: ExpectedBuffer

Issue description

Splitting Dataframes for multiprocessing is broken in 0.20.7 this worked in 0.20.6. The example I submitted is better done another way. But is used to illustrate the problem.

Expected behavior

shape: (5, 7) ┌──────────┬─────────────────────┬─────────────────────┬─────────────────────┬─────────────────────┬──────────┬────────────────────┐ │ column_0 ┆ title ┆ ingredients ┆ directions ┆ link ┆ source ┆ NER │ │ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- │ │ i64 ┆ str ┆ str ┆ str ┆ str ┆ str ┆ list[str] │ ╞══════════╪═════════════════════╪═════════════════════╪═════════════════════╪═════════════════════╪══════════╪════════════════════╡ │ 0 ┆ No-Bake Nut Cookies ┆ ["1 c. firmly ┆ ["In a heavy ┆ www.cookbooks.com/R ┆ Gathered ┆ ["brown sugar", │ │ ┆ ┆ packed brown sugar… ┆ 2-quart saucepan, ┆ ecipe-Details… ┆ ┆ "milk", … "bite … │ │ ┆ ┆ ┆ m… ┆ ┆ ┆ │ │ 1 ┆ Jewell Ball'S ┆ ["1 small jar ┆ ["Place chipped ┆ www.cookbooks.com/R ┆ Gathered ┆ ["beef", "chicken │ │ ┆ Chicken ┆ chipped beef, cut … ┆ beef on bottom o… ┆ ecipe-Details… ┆ ┆ breasts", … "s… │ │ 2 ┆ Creamy Corn ┆ ["2 (16 oz.) pkg. ┆ ["In a slow cooker, ┆ www.cookbooks.com/R ┆ Gathered ┆ ["frozen corn", │ │ ┆ ┆ frozen corn", … ┆ combine all … ┆ ecipe-Details… ┆ ┆ "cream cheese", … │ │ 3 ┆ Chicken Funny ┆ ["1 large whole ┆ ["Boil and debone ┆ www.cookbooks.com/R ┆ Gathered ┆ ["chicken", │ │ ┆ ┆ chicken", "2 (10… ┆ chicken.", "Pu… ┆ ecipe-Details… ┆ ┆ "chicken gravy", … │ │ ┆ ┆ ┆ ┆ ┆ ┆ "… │ │ 4 ┆ Reeses Cups(Candy) ┆ ["1 c. peanut ┆ ["Combine first ┆ www.cookbooks.com/R ┆ Gathered ┆ ["peanut butter", │ │ ┆ ┆ butter", "3/4 c. g… ┆ four ingredients… ┆ ecipe-Details… ┆ ┆ "graham cracke… │ └──────────┴─────────────────────┴─────────────────────┴─────────────────────┴─────────────────────┴──────────┴────────────────────┘

Installed versions

``` In [3]: pl.show_versions() --------Version info--------- Polars: 0.20.7 Index type: UInt32 Platform: Windows-10-10.0.22631-SP0 Python: 3.11.7 (tags/v3.11.7:fa7a6f2, Dec 4 2023, 19:24:49) [MSC v.1937 64 bit (AMD64)] ----Optional dependencies---- adbc_driver_manager: 0.9.0 cloudpickle: 3.0.0 connectorx: 0.3.2 deltalake: fsspec: 2024.2.0 gevent: 23.9.1 hvplot: matplotlib: 3.8.2 numpy: 1.26.4 openpyxl: 3.1.2 pandas: 2.2.0 pyarrow: 15.0.0 pydantic: 2.6.1 pyiceberg: pyxlsb: 1.0.10 sqlalchemy: 2.0.26 xlsx2csv: 0.8.2 xlsxwriter: 3.1.9 ```
teddy661 commented 6 months ago

polars-0.20.7-bug.csv

ritchie46 commented 6 months ago

Can you try to clean up your issue report a bit? Formatting is a mess?

teddy661 commented 6 months ago

I will attempt to get to it this afternoon. It is a bit of a mess. apologies

cmdlineluser commented 6 months ago

I think there are just some extra/missing backquotes the example/expected sections.

It looks like the repro for polars-0.20.7-bug.csv is:

import ast
import itertools
import multiprocessing as mp
import os
import platform
import requests
from io import BytesIO
from pathlib import Path
from typing import Callable

import numpy as np
import polars as pl

def parallelize_dataframe(
df: pl.DataFrame,
func: Callable[[pl.DataFrame], pl.DataFrame],
n_cores: int = 4,
) -> pl.DataFrame:
    """
    Enable parallel processing of a dataframe by splitting it by the number of cores
    and then recombining the results.
    """
    rows_per_dataframe = df.height // n_cores
    remainder = df.height % n_cores
    num_rows = [rows_per_dataframe] * (n_cores - 1)
    num_rows.append(rows_per_dataframe + remainder)
    start_pos = [0]
    for n in num_rows:
        start_pos.append(start_pos[-1] + n)
        df_split = []
        for start, rows in zip(start_pos, num_rows):
            df_split.append(df.slice(start, rows))
    func_list = list(itertools.repeat(func, len(df_split)))
    pool_args = list(zip(df_split, func_list))
    pool = mp.get_context("spawn").Pool(n_cores)
    new_df = pl.concat(pool.map(process_chunk, pool_args))
    pool.close()
    pool.join()
    return new_df

def process_chunk(args: tuple) -> pl.DataFrame:
    """
    Process a chunk of the dataframe
    """
    df, func = args
    new_df = func(df)
    return new_df

def convert_wrapper(df: pl.DataFrame) -> pl.DataFrame:
    df = df.with_columns(
        pl.col("NER").map_elements(lambda x: ast.literal_eval(x)).alias("NER"),
    )
    return df

def main():
    num_cpus = 4
    #url = "https://drive.google.com/uc?export=download&id=1Weu5A0XWf5nWtVbvBkGDu7sD9woZm9GQ"
    #r = requests.get(url, allow_redirects=True)
    target_file = Path("./polars-0.20.7-bug.csv")
    df = pl.scan_csv(target_file).collect()
    df = parallelize_dataframe(df, convert_wrapper, num_cpus)
    print(df.head())

if __name__ == "__main__":
    main()
teddy661 commented 6 months ago

I've confirmed that the reformatted code above reproduces the problem in 0.20.7. Thank you @cmdlineluser

ritchie46 commented 6 months ago

Fix coming up.