pola-rs / polars

Dataframes powered by a multithreaded, vectorized query engine, written in Rust
https://docs.pola.rs
Other
29.97k stars 1.93k forks source link

GroupBy 1000MillionRows.parquet Success, but Filter Fail to Complete #8533

Closed hkpeaks closed 1 year ago

hkpeaks commented 1 year ago

Polars version checks

Issue description

Too many friends suggest to use Parquet format.

Previously success to run by using GroupBy functon.

Today I try filter function for read/write csv and read/write parquet, csv success, but parquet fail to complete.

I recorded a demo video so you can see what happen of the resource utilisation.

Parquet demo start @ 6:43, before is csv https://www.youtube.com/watch?v=pH8l6RUnY9o

Reproducible example

import polars as pl
import time
import pathlib

s = time.time()

# read parquet file
df = pl.scan_parquet('Input/1000MillionRows.parquet')
filter = df.filter((((pl.col('Ledger') >= "L10") & (pl.col('Ledger') <= "L15")) | ((pl.col('Ledger') >= "L50") & (pl.col('Ledger') <= "L55")) | ((pl.col('Ledger') >= "L82") & (pl.col('Ledger') <= "L88"))) & (((pl.col('Account') >= 12222) & (pl.col('Account') <= 12888)) | ((pl.col('Account') >= 15555) & (pl.col('Account') <= 16888))) & ((pl.col('Project') > "B28") | (pl.col('Project') < "B22")))

# write parquet file
a = filter.collect(streaming=True)
print("Number of selected rows: {}", filter.select(pl.count()).collect());
path: pathlib.Path = "Output/Polars-Filter1000M.parquet"
a.write_csv(path)

e = time.time()
print(f"Duration: {(e-s):.3f} seconds")

==============================================================================

import polars as pl
import time
import pathlib
s = time.time()

q = (
     pl.scan_parquet("Input/1000MillionRows.parquet")      
     .groupby(by=["Ledger", "Account", "PartNo", "Contact","Project","Unit Code", "D/C","Currency"])
    .agg([   
        pl.count('Quantity').alias('Quantity(Count)'),
        pl.max('Quantity').alias('Quantity(Max)'),
        pl.min('Quantity').alias('Quantity(Min)'),
        pl.sum('Quantity').alias('Quantity(Sum)'),        
    ])) 

a = q.collect(streaming=True)
path: pathlib.Path = "Output/Polars-GroupBy1000M.parquet"
a.write_parquet(path)

e = time.time()
print("Polars GroupBy 1000M Time = {}".format(e-s))

Expected behavior

Parquet results shall be same as csv

csv

Installed versions

``` polars-0.17.9 ```
deanm0000 commented 1 year ago
path: pathlib.Path = "Output/Polars-Filter1000M.parquet"
a.write_csv(path)

why are you writing a csv file to a file whose extension is parquet? Is that an accident/typo?

It's not clear what the problem is since you didn't post any of the output. I don't think many people want to watch a video to find the answer to the above. You just say it doesn't match.

hkpeaks commented 1 year ago

The day I tested for many scenarios, I overlooked and made an inconsistency typo at the end.

Suggest the app to generate error message to prompt users.

Now tested about script (source file is Parquet) below scenarios again => no output and no error message:

Scenario 1

write parquet file

a = filter.collect(streaming=True) print("Number of selected rows: {}", filter.select(pl.count()).collect()); path: pathlib.Path = "Output/Polars-Filter1000M.parquet" a.write_parquet(path)

Scenario 2

write csv file

a = filter.collect(streaming=True) print("Number of selected rows: {}", filter.select(pl.count()).collect()); path: pathlib.Path = "Output/Polars-Filter1000M.csv" a.write_csv(path)

hkpeaks commented 1 year ago

After changed to 100M, it ran successfully and be able to output the number of selected records same as my app.

D:\Benchmark>python FilterParquet100M.py Number of selected rows: {} shape: (1, 1) ┌─────────┐ │ count │ │ --- │ │ u32 │ ╞═════════╡ │ 1511000 │ └─────────┘

D:\Benchmark>peaks filter100M CurrentSetting{StreamMB(1000)Thread(100)}

Select{100MillionRows.csv | Ledger(L10..L15,L50..L55,L82..L88)Account(12222..12888,15555..16888)Project(>B28,<B22)~ Peaks-Filter100M.csv} Byte: 7216384229 | Stream: 7 | Thread: 100 1 2 3 4 5 6 7 Peaks-Filter100M.csv(14 x 1511000)

If you see the video comparing Polars(beginning csv) and Peaks(ending csv) resource utilization, it is clear that there is room for Polars to improve its resource utilization for CPU and memory. Many people have said that Rust is much more efficient than Golang in terms of resource utilization. And you will have opportunity to test Polars and Peaks on your machine after I publish it by Jun 2023. Peaks is an accounting software, no plan to become another Pandas.

I guess the issue may be due to the heavily use of Arrows. I mainly use Bytearray - direct byte to byte conversion.

qqlearn123 commented 1 year ago

What is exactly the issue/bug? Speed? Memory/CPU utilization? Very unclear as I can see.

hkpeaks commented 1 year ago

Overall, I think Polars performs well, but it seems to use too many resources. (you can compare Polars and Peaks by the above video).

No output if data is 1 billion row parquet format for filter function (groupby success) Have output if data is 100 million row csv/parquet format for filter function (groupby success) For jointable, if 1 billion, fail both for csv/parquet format as it unlike the streaming model of groupby, seem not a real streaming.

I guess Polars demands more hardware resources mainly due to the use of arrow. If Polars considers bytearray as an in-memory dataset, it will most likely be faster than Peaks for most of query functions. I do not prepare to design my own hashmap so I rely on Google hashmap which is slow but very safe.

I start to code Peaks is only less than 3 months as motivated by the high perfomance of Polars. Peaks implements a real steaming for Distinct, GroupBy, JoinTable and Filter (sorting is in design stage) throughout read file -> query -> write file in parallel. No matter outputing file for JoinTabe and Filter is in billion rows, it is not an issue given there are sufficient hard disk space. After I release the trial version of Peaks by Jun 2023, the Polars development team can compare the resource utilization directly. As Polars motivate me for deveoping better algorithm, I also want Peaks can also benefit to Polars team.

My next focus of open source software will be duckDB to enrich my current development of Peaks. I am only one-man band and no plan to hire contributor for the Peaks, so I will manage Peaks as small as possible. Current source code size is only 117KB.

ritchie46 commented 1 year ago

You collect the result into memory. That will go OOM if the result doesn't fit into memory. Try sink_parquet. If sink_parquet fails, that query is not yet supported in our streaming engine.

Our streaming engine is in alpha stage so opening issues of functionality that isn't yet supported is counter-productive as we are aware of that.

hkpeaks commented 1 year ago

I have nearly completed my testing for Polars in respect of big table. Next step, I shall re-focus on coding of my own project which to be released her first 100-day trial version (limited by 100 seconds for each script) and free version (limited by 100MB csv) by Jun 2023. Obviously without Polars’s outstanding performance comparing with Pandas, I will not have ambition to re-activate my old project WebNameSQL-GoVersion suspense for a 7-month. It becomes another new project as most of the code bringing from old project is removed (mainly re-use parsing and validate scripts) as I changed the in-memory table from key-value to bytearray plus a parallel streaming model throughout ETL processes -- it is a fundamental change. The suspension 7-month period in fact I spent time on studying bioinformatics. The DNA data model in fact brings me a new idea how to implement it for dataframe. I guess you have a member specialize in bioinformatics. When Polars releases a real steaming model for JoinTable, please inform me so I will update the benchmark. Sorting billions of rows in fact I am in design phase, it is not that simple, but I have confidence I can excel on it.

I am not prepare to build another Polars/Pandas. You will gradually know that I am doing alternative software which is significantly different from yours.

hkpeaks commented 1 year ago

@ritchie46 Today I published a final release version of an interactive csv file viewer https://github.com/do-account/PeaksDataFrameViewer Also provide a sample Polars sample script. This may be benefits your user group. It is fast upto 1 million rows/s using 8-core CPU. It may be upper limit of .net. This why I move to Golang, enjoy upto 10X performance gain. I also tested on Rust before use Go, but Rust is troube for using "::". So I cannot move code freely across different source files.

hkpeaks commented 1 year ago

Last evening I use 0.17.13 to follow up this bug - fail to filter data from 1000MillionRows.parquet, remain nothing output, expected output see below. If using 100MillionRows.Parquet, it success. If using GroupBy to process the 1000MillionRows.parquet, the parquet can produce output propertly. The parquet file I use Polars to create, using this 1MillionRows.csv https://github.com/hkpeaks/peaks-consolidation/blob/main/Polars-PeaksBenchmarking/Input/1MillionRows.7z for expansion and convert to parquet format.

Polars is fast in GroupBy, but slow in Filter particular large dataset in csv format but not know the result of parquet format. If you have good news for better performance, suggest you can inform me directly. PS D:\Peaks> python polars.py Number of selected rows: {} shape: (1, 1) ┌───────┐ │ count │ │ --- │ │ u32 │ ╞═══════╡ │ 92000 │ └───────┘ Number of selected rows: {} shape: (1, 1) ┌───────┐ │ count │ │ --- │ │ u32 │ ╞═══════╡ │ 45600 │ └───────┘ Number of selected rows: {} shape: (1, 1) ┌───────┐ │ count │ │ --- │ │ u32 │ ╞═══════╡ │ 308000 │ └───────┘ Number of selected rows: {} shape: (1, 1) ┌───────┐ │ count │ │ --- │ │ u32 │ ╞═══════╡ │ 308000 │ └───────┘

import polars as pl import time import pathlib s = time.time()

table1 = pl.scan_parquet("Input/1000MillionRows.parquet")

table2 = table1.filter(( (pl.col('Ledger') == "L10") & (pl.col('Project') == "B25")))

print("Number of selected rows: {}", table2.select(pl.count()).collect());

table3 = table2.filter(( (pl.col('Quantity') >= 100) & (pl.col('Quantity') <= 300) | (pl.col('Quantity') >= 600) & (pl.col('Quantity') <= 900)))

print("Number of selected rows: {}", table3.select(pl.count()).collect());

table4 = table3.filter((
(pl.col('Contact') >= "C32") & (pl.col('Contact') <= "C39")))

table5 = table4.filter((
(pl.col('Contact') != "C36")))

print("Number of selected rows: {}", table5.select(pl.count()).collect());

path: pathlib.Path = "Output/Polars-Filter.csv"

final = table5.collect(streaming=True)

final.write_csv(path)

e = time.time() print("Polars Filter Data Time = {}".format(round(e-s,3)))

zundertj commented 1 year ago

I am closing this as you are not responding on fair questions asked by others, so there is no way anyone else here can actually help you.