pola-rs / polars

Dataframes powered by a multithreaded, vectorized query engine, written in Rust
https://docs.pola.rs
Other
29.72k stars 1.9k forks source link

100,000 Rows File x 3,000 #9201

Closed hkpeaks closed 1 year ago

hkpeaks commented 1 year ago

Polars version checks

Issue description

Polars can run a 300 million rows csv file, but fail to run if it is divided by 3,000 files.

D:\Benchmark>python PolarsFilterGroupByCSV.py memory allocation of 16392 bytes failed

Reproducible example

import polars as pl
import time
import pathlib
s = time.time()

table1 = (
     pl.scan_csv("Input/Copy3000/*.csv")
     .filter((pl.col('Ledger') >= "L30") & (pl.col('Ledger') <= "L70"))      
     .groupby(by=["Ledger", "Account", "DC","Currency"])
    .agg([    
        pl.sum('Base_Amount').alias('Total_Base_Amount'),        
    ])) 

path = "Output/PolarsFilterGroupByCSV.csv"
table1.lazy().collect(streaming=True).write_csv(path)

e = time.time()
print("Polars FilterGroupBy CSV Time = {}".format(round(e-s,3)))

One of the input file
https://github.com/hkpeaks/peaks-consolidation/blob/main/Benchmark20230602/1.csv

Expected behavior

D:\Benchmark>python PolarsDBFilterGroupByCSV.py Polars FilterGroupBy CSV Time = ??.???

Installed versions

``` 0.18.0 ```
ritchie46 commented 1 year ago

Can you create a MWE?

hkpeaks commented 1 year ago

Every file is same, you can use script to copy files in your machine very easily. So it is a simplex case. I test it again, it can complete for 1,000 files. But fail when I use 1,200 or more. So I will use 1,000 files for benchmarking with Spark, DuckDB and Peaks.

zundertj commented 1 year ago

Can you include the download + file copying code in the code, so we can just run those lines and have the same setup as you have?

On the MWE, at a minimum, you should try to address the following:

  1. the time is not needed? If so, please remove
  2. does the problem also occur if any or all of the following is taken out: filter, groupby (which of the columns you group there, or can all be taken out). Take out as much as possible whilst still triggering the problem.
  3. does the problem occur if you do not call write_csv at the end?
  4. does the streaming=True / False make a difference?

Can you also provide the output of pl.show_versions() per the issue template?

hkpeaks commented 1 year ago

This copy file code is derived from Bing Chat. It is Golang Go. Suggest you ask Bing to provide you Python code.

package main

import ( "fmt" "io" "os" "strconv" )

func main() { source := "D:\Go\Input\100000.csv" var target string

for i := 0; i < 3000; i++ {
    // Open the source file
    srcFile, err := os.Open(source)
    if err != nil {
        fmt.Println(err)
        return
    }
    defer srcFile.Close()

    target = "D:\\Go\\Input\\Copy3000\\" + strconv.Itoa(i) + ".csv";
    // Create the target file
    dstFile, err := os.Create(target)
    if err != nil {
        fmt.Println(err)
        return
    }
    defer dstFile.Close()

    // Copy the contents of the source file to the target file
    _, err = io.Copy(dstFile, srcFile)
    if err != nil {
        fmt.Println(err)
        return
    }
}

fmt.Println("File copied successfully!")

}

hkpeaks commented 1 year ago

The root problem may be the design of the steaming engine. The author told me that it is in alpha stage not only one time. So I have opened my steaming source code for Polars team to learn. https://github.com/hkpeaks/peaks-consolidation, this project has only 3 months history, but I consider the steaming engine become in mature stage for single big file. It can support to process over billion row JoinTable. Now I am extending it to support many small/big files in a folder.

For over 1,000 files (each file has 100,000 rows), Peaks may also encounter out of memory error becuase Peaks has not implemented steaming for multiple files. Polars team can check whether their code has implmented steaming for multiple files. DuckDB can process all 3,000 files with only 22 seconds, so I believe DuckDB has implemented steaming for multiple files.

For this case, MWE shall be a folder which contain 2 files. Output to screen or file is not relevant. In fact I have used GroupBy, output file is very small. It is an out of memory case when memory usage for total number of rows is over the memory size. I have tested it again it support 1,000 files but fail when over 1,200 files it trigger out of memory. I can accept it is a limitation of the software. So my coming benchmark will be tested on 1,000 files for Polars.

Most important design of steaming code for the Peaks is the func ParallelStreaming(). It empower Read File --> Query -> Write File are running in parallel. If Polars implement this design, it empower Polars able to handle billion rows JoinTable and output billion-row file for filter setting. But billion-row sorting is another game, I have not yet to proceed this development.

hkpeaks commented 1 year ago

To implement of separation of concern is very important for steaming model. For example, if Filter and GroupBy are first two query commands you build, and it is separate from how the system steaming and management of in-memory partition, after you success in first 2 commands in steaming, you can enjoy 3rd command e.g. JoinTable also support steaming automatically. No matter how much commands you build afterward, all these command can be enjoyed steaming automatically. I am a non-code consultant working with FlexSystem for a decade. I had learnt a lot of software design principal from the CEO Adam Lok. What I had learn from him in fact is valuable to my current design of Peaks Consolidation.

If your JoinTable fail to support billion-row full join, it may be due to you have not implemented separation of concerns correctly. Golang is not support OO, but it can support me to implement separation of concerns, despite current code is not perfect.

zundertj commented 1 year ago

The only way we can help you is if you help us. In particular, you haven't come back on the questions by both Ritchie and myself to create a MWE. The same question that was also asked in the issue template. See this article for how to create a MWE. Idem for the output of pl.show_versions().

Instead, you have posted a lot of text, almost all off-topic and irrelevant, such as talk about other software, your background and other programming languages. Please answer on topic in a constructive way.

Edit: I see that https://github.com/pola-rs/polars/pull/9214 is suggested to be a potential fix; could you test the latest main version?

hkpeaks commented 1 year ago

I am a developer of simiar software, so I understand the issue. The error I state is not whether it can print or it can output a file or it calc correctly, it is "Out of Memory". I assume it is the limiation of the software, so run 1,000 file is fine for me for benchmarking. Do what you request to do provide nothing help to fix the issue as scirpt is provided and file is provided. This test is using 0.18.0. The current version I see is 0.18.0. Below are my past reported issues:-

https://github.com/pola-rs/polars/issues/8231 https://github.com/pola-rs/polars/issues/8533 https://github.com/pola-rs/polars/issues/8693

The software is not mine, fix or not fix will not affect my software development agenda as I am not prepare to use Polars as my third-party library. I have tested to build Rust code, noticed a lot of limitation. Now I consider to use DuckDB - parquet engine. But I prefer write code by my own effort, at least save time to handling bug fix. If not spend time to report bug for third-party software, in fact save my time.

Pandas reply my issue report is very simple, "In general, pandas doesn't support out-of-core at all, and it is recommended that you use other libraries, such as Dask when your memory doesn't fit in RAM." https://github.com/pandas-dev/pandas/issues/53249

DuckDB reply my report issues is responsive and able solve problem directly. https://github.com/duckdb/duckdb/issues/7776 https://github.com/duckdb/duckdb/discussions/7609 https://github.com/duckdb/duckdb/discussions/7605 https://github.com/duckdb/duckdb/discussions/7796

Apache is exceptional helpful https://github.com/apache/arrow-go/issues/54

Peaks is new, only one to create issue report https://github.com/hkpeaks/peaks-consolidation/issues/2

mcrumiller commented 1 year ago

@hkpeaks we are happy to help you diagnose the issue, but we also request that you listen to us.

I am a developer of simiar software, so I understand the issue

I am not sure that you do. The issue is that you have not provided a minimally working example, which is code that we can run without spending a bunch of our own time figuring out how to get it to run. Your initial code is almost there, except that it assumes we have an Input/Copy3000 folder populated with CSV files. We don't, and it's not obvious how we might do that. There are two ways to help us here:

1) place a .zip file on the internet that we can download and extract, so that your code runs, or 2) add a little bit of code that somehow supplies the files (try using requests.get()), and maybe copies it 3000 times.

Bidek56 commented 1 year ago

MWE on a Windows 11 machine with 32Gb of memory, Python 3.11.3, Polars 0.18

  1. Download sample file:

    import urllib.request
    csv_url = 'https://raw.githubusercontent.com/hkpeaks/peaks-consolidation/main/Benchmark20230602/1.csv'
    urllib.request.urlretrieve(csv_url, '1.csv')
  2. Create 3000 copies of the 1.csv file using this Python code:

import shutil

src = './data/1.csv'

for i in range(3000):
    shutil.copy(src, f'./data/1_{str(i)}.csv')
  1. Execute the following Python script:
import polars as pl
import time
import pathlib
s = time.time()

table1 = (
     pl.scan_csv("data/*.csv")
     .filter((pl.col('Ledger') >= "L30") & (pl.col('Ledger') <= "L70"))      
     .groupby(by=["Ledger", "Account", "DC","Currency"])
    .agg([    
        pl.sum('Base_Amount').alias('Total_Base_Amount'),        
    ])) 

path = "Output/PolarsFilterGroupByCSV.csv"
table1.lazy().collect(streaming=True).write_csv(path)

e = time.time()
print("Polars FilterGroupBy CSV Time = {}".format(round(e-s,3)))

This script works with less than 2600 files on my Windows 11 laptop. Somewhere above 2600 files, the script just crashes with no error message. I hope this helps. Thanks

Bidek56 commented 1 year ago

Similar Rust code on the same 3K files works just fine.

use polars::prelude::*;

fn main() {
    let path = "./data/*.csv";
    let ldf = LazyCsvReader::new(path)
        .has_header(true)
        .finish().unwrap()
        .filter( col("Ledger").is_in( lit("L30" ) ) )
        .groupby(["DC"])
        .agg([ 
            count().alias("Count"),        
        ]);

    print!("{:?}", ldf.collect());
}
hkpeaks commented 1 year ago

If a steaming model is designed properly, there shall be no limit on number of files and rows except hard disk space. I have implemented a water tank model for my app. So I have solved billion-row Jointabe using only 32GB memory. Now I explore how to upgrade my computer SSD, to test for million files and trillion rows scenarios. I have opened my source code for Polars team, learn or not laren is their option. Our house water tank is small, but I enjoy unlimit water supply. Sometime suspension of water supply is due to cleaning of water tank. I have not hear so called crashing of water tank. I love Pandas because they told me the software do not support data size bigger than memory, suggest me to use Dask.

Extracted from my project, this is most important function of the streaming design. First goroutine "go CurrentExtraction()" allows a parallel job to read data, second goroutine "go CurrentWriteFile()" allows a job to write data. The middle of the code is query data. Implement a timer for batch > len(extractionbatch) {time.Sleep(100 * time.Millisecond)} to wait for import data. If table utilized, implement a delete function if , found := bytestream_partition[current_partition]; found { delete(bytestream_partition, current_partition)}. You can ask Bing chat to convert it to Rust. To overcome 2000 chars, you can ask Bing Chat how to increase 2000 chars of the chat box to 20,000 chars.

func ParallelStreaming(table_partition_store map[string]map[int]Cache, task Task, rule Rule, ir InternalRule, total_byte int64, file os.File) map[int]Cache {

rule.byte_per_stream = int64(rule.streamMB * 1000000)
var streaming_count int = int(total_byte / rule.byte_per_stream)

if streaming_count < 1 {
    streaming_count = 1
}

var total_partition int = streaming_count * rule.thread

if total_byte > 50000000 {
    rule.thread = total_partition
} else {
    rule.thread = 10
}

ds := *FileStructure(rule, total_byte, file)
rule.thread = total_partition / streaming_count
fmt.Println("Total Bytes:", total_byte, "| Total Batches of Stream:", streaming_count)
partition_address := *PartitionAddress(ds)
bytestream_partition := make(map[int][]byte)
extraction_batch := make(map[int]int)
query_batch := make(map[int]int)
combine_table_partition := make(map[int]Cache)
result_table_partition_store := make(map[string]map[int]Cache)
result_table_partition := make(map[int]Cache)

go CurrentExtraction(extraction_batch, query_batch, bytestream_partition, streaming_count, rule, file, partition_address)

var total_row int32
var total_column int
var csv_string strings.Builder
var f *os.File
var err error
write_partition := make(map[int]CachePartition)
var p int

for batch := 1; batch <= streaming_count; batch++ {
    for current_partition := 0; current_partition < rule.thread*(len(query_batch)-1); current_partition++ {
        partition.Lock()
        if _, found := bytestream_partition[current_partition]; found {
            delete(bytestream_partition, current_partition)
        }
        partition.Unlock()
    }

    for batch > len(extraction_batch) {
        time.Sleep(100 * time.Millisecond)
    }

    table_partition := make(map[int]Cache)

    var mutex2 sync.Mutex
    var parallel2 sync.WaitGroup
    parallel2.Add(rule.thread)
    var i int = 0

    for current_partition := (batch - 1) * rule.thread; current_partition < rule.thread*batch; current_partition++ {
        go func(current_partition int) {
            defer parallel2.Done()
            result_table := *CellAddress2(ds, false, current_partition, rule, ir.source_table_name, bytestream_partition)
            mutex2.Lock()
            if batch == 1 {
                for _, column := range ir.validate_all_column_name {

                    if _, found := result_table.upper_column_name2id[strings.ToUpper(column)]; !found {

                        if strings.ToUpper(column) != "NULL" && strings.Trim(column, "") != "" {
                            if strings.ToUpper(column) != "$COLUMN" {
                                fmt.Println("** Column", column, "not found **")
                                os.Exit(0)
                            }
                        }
                    }
                }
            }
            table_partition[i] = *&result_table
            i++
            mutex2.Unlock()
        }(current_partition)
    }

    parallel2.Wait()

    table_partition_store[strings.ToUpper(ir.return_table_name)] = *&table_partition
    ir.source_table_name = ir.return_table_name

    result_table := *CurrentStream(batch, &table_partition_store, task, rule, ir)

    if strings.Contains(strings.ToUpper(ir.return_table_name), ".CSV") {
    } else {
        ir.column_id_seq = result_table.ir.column_id_seq
        ir.column_name = result_table.ir.column_name
        result_table_partition = *&result_table.table_partition

        for i := 0; i < len(result_table_partition); i++ {

            if len(result_table_partition[i].bytestream) > 0 {
                combine_table_partition[p] = result_table_partition[i]
                p++
            }
        }
    }

    query_batch[batch] = 1
    fmt.Print(batch, " ")

    if _, found := ir.full_streaming_command[strings.ToUpper(RemoveCommandIndex(task.command))]; found {

        if strings.Contains(strings.ToUpper(ir.return_table_name), ".CSV") {

            for current_partition := 0; current_partition < len(result_table.table_partition); current_partition++ {
                total_row += result_table.table_partition[current_partition].partition_row
            }

            partition.Lock()
            write_partition[batch] = *&result_table
            partition.Unlock()

            for len(write_partition) > 5 {
                time.Sleep(100 * time.Millisecond)
            }

            if batch == 1 {

                total_column = len(result_table.table_partition[0].column_name)

                csv_string.WriteString(result_table.table_partition[0].column_name[0])

                for x := 1; x < len(result_table.table_partition[0].column_name); x++ {
                    csv_string.WriteString(",")
                    csv_string.WriteString(result_table.table_partition[0].column_name[x])
                }

                csv_string.WriteString("\r\n")

                go CurrentWriteFile(ir, csv_string, streaming_count, write_partition, err, f)
            }
        }
    }
}

if _, found := ir.full_streaming_command[strings.ToUpper(RemoveCommandIndex(task.command))]; found {

    if strings.Contains(strings.ToUpper(ir.return_table_name), ".CSV") {
        var result_table Cache
        result_table.partition_row = total_row
        result_table.total_column = total_column
        combine_table_partition[0] = result_table

        for len(write_partition) > 0 {
            time.Sleep(100 * time.Millisecond)
        }

        return &combine_table_partition

    } else {

        for current_partition := 0; current_partition < len(combine_table_partition); current_partition++ {
            total_row += combine_table_partition[current_partition].partition_row
        }

        total_column = len(combine_table_partition[0].column_name)

        return &combine_table_partition
    }

} else {

    result_table_partition_store["TEMP"] = *&combine_table_partition

    ir.source_table_name = "TEMP"
    column_id_seq, column_name, upper_column_name2id, data_type := AddColumnmNameWithSeq2(&result_table_partition_store, rule, ir)
    ir.column_id_seq = column_id_seq
    ir.column_name = column_name

    result_table_partition[0] = *CurrentCommand(1, combine_table_partition, 0, len(combine_table_partition), task, ir, rule, upper_column_name2id, data_type)

    return &result_table_partition
}

}

My streaming has implement for single big file, coming few days will implement to support many big/small files within a folder. For small file, run many but each file allocated one thread, for large file, run single file with many threads. So from the moment of getting file list from the OS, I need to get file size information, then sort the list into 2 groups, small file and large file. If small files and large files run at same time, it is very high risk to trigger calculation errors.

I implement this logic to classify small and large file if total_byte > 100,000,000 {rule.thread = 10} else {rule.thread = 1}. If run small file with multithread, my experience told me that many csv files download from Kaggle fail to extract data propertly. So I implement this safety default value, but allow user to override by script setting.

ritchie46 commented 1 year ago

Thank you for creating a repro @Bidek56.

On my linux machine it fails with

thread '<unnamed>' panicked at 'called `Result::unwrap()` on an `Err` value: Io(Os { code: 24, kind: Uncategorized, message: "Too many open files" })', /home/ritchie46/code/polars/polars/polars-lazy/polars-pipe/src/executors/sources/csv.rs:57:14

So I am looking into that. Maybe on windows it stackoverflows when many file handles are open? This is just a guess on my side, I have no expertise on that.

hkpeaks commented 1 year ago

I have similar experience when I extend to support my Peaks a folder which contain 100,000 files. After many many amendments of the code, I have solved the issues but not remember it very clearly which changed of code can support 100,000 files. https://github.com/hkpeaks/peaks-consolidation/blob/main/src/file.go The current code support in-memory only for many files, not yet extend to support streaming. Support multiple file is my recent development. Every function I built, usually I will do stress test to identify potential bugs. I had built a folder which contain 1 million files, knowing that performance drop significantly, so I simply say that I do not support million files at the moment. 100,000 files is still very high efficiency. Create million files in fact is a very high risk exercise as it may damage the Windows file database system, so I do it on Windows OS partition, rather than data partition.

ritchie46 commented 1 year ago

Our house water tank is small, but I enjoy unlimit water supply. Sometime suspension of water supply is due to cleaning of water tank. I have not hear so called crashing of water tank.

@hkpeaks just curious. Is this an analogy?

hkpeaks commented 1 year ago

I test on Windows, you test on Linux. Development environment is not exactly the same.

May be due to my background is an accountant, not very understand the need of MWE to support your bug fix for this scenario. My MWE for many file is very simple https://github.com/hkpeaks/peaks-consolidation/blob/main/CombineFile.txt

ReadFile{C:/peaks-consolidation-main/Input/1000SmallFiles/*.csv ~ Table} WriteFile{Table ~ PeaksCombineFile.csv}

When I had moved from my work from hawker to accountant, in fact I cannot understand the prevalent practice of accountants. Without COVID-19, I cannot enjoy retirement life and I cannot escape from non-code IT consultant. I enjoy purely coding, but hate marketing. My recent works in marketing in fact has discouraged my mood on coding itself. If no benchmarking activities with other strong software, I find no way to promote my app. Repeat and repeat to compare weak performance software e.g. Pandas, it is not my personality. If you are no longer be strong in performance, I will avoid to compare you.

hkpeaks commented 1 year ago

You may also concern processing speed for these 3,000 files when comparing DuckDB.

hkpeaks commented 1 year ago

Congratulations on achieving the second stage of success with Polars! I hope you continue to excel and reach the third stage of success with the AI/BI apps on a new cloud platform. Best of luck! 😊

Now I rewrite my Peaks, to provide both Python and Go version using a new ETL Framework. https://github.com/hkpeaks/pypeaks

Since there are no official support Golang can build binary library for Windows platform, so it is no proper way to create Python bindings. So the ETL Framework for Python I will use Apache Arrow compatible python dataframe libraries as calc engine. If I use Rust, I do not comfort with the use of Rayon-rs. Reconsider using C# is alternative option when Microsoft in recent years is very active to involve the Python language. But the disadvantage of C# is framework dependent. And .net is changing the game very frequently.