pola-rs / polars

Dataframes powered by a multithreaded, vectorized query engine, written in Rust
https://docs.pola.rs
Other
27.93k stars 1.72k forks source link

Performance is lower than expected on join query in Rust #3986

Closed andrei-ionescu closed 1 year ago

andrei-ionescu commented 1 year ago

What language are you using?

Rust

Which feature gates did you use?

"polars-io", "parquet", "lazy", "dtype-struct"

Have you tried latest version of polars?

What version of polars are you using?

latest, git master

What operating system are you using polars on?

maxOS Monterey 12.3.1

What language version are you using

$ rustc --version
rustc 1.64.0-nightly (7b46aa594 2022-07-05)

$ cargo --version
cargo 1.64.0-nightly (c0bbd42ce 2022-07-03)

Describe your bug.

The processing time in the case of Polars is 4x longer.

What are the steps to reproduce the behavior?

Given the following two dataset from the OpenDataBlendIO:

Comparing the processing time of this Polars DataFrame API query:

let time_pass = Instant::now();
let file1 = "data-fact-anonymised_mot_test_item-2021-20220620T090029Z-anonymised_mot_test_item_2021.parquet".to_string();
let file2 = "data-dimension-vehicle-20220620T085710Z-vehicle.parquet".to_string();

let df1 = LazyFrame::scan_parquet(
    file1,
    scan_args(),
)
.unwrap();

let df2 = LazyFrame::scan_parquet(
    file2,
    scan_args(),
)
.unwrap();

let df3 = df2
    .join(
        df1, 
        [col("drv_vehicle_key")],
        [col("drv_vehicle_key")], 
        JoinType::Inner,
    )
    .groupby([col("drv_vehicle_make")])
    .agg([count().alias("cnt")])
    .select([
        col("drv_vehicle_make"),
        col("cat"),
    ])
    .sort(
        &"cnt", 
        SortOptions {
            descending: true,
            nulls_last: true,
        }
    )
    .limit(20)
    .collect()
    .unwrap();

dbg!(df3);
println!("Total time {:?}", time_pass.elapsed());

with the similar one in Spark:

val t = System.currentTimeMillis()
val df1 = spark
    .read
    .format("parquet")
    .load("data-fact-anonymised_mot_test_item-2021-20220620T090029Z-anonymised_mot_test_item_2021.parquet")

val df2 = spark
    .read
    .format("parquet")
    .load("data-dimension-vehicle-20220620T085710Z-vehicle.parquet")

val df3 = df2
    .join(df1, df2("drv_vehicle_key") === df1("drv_vehicle_key"), "inner")
    .groupBy("drv_vehicle_make")
    .agg(count("drv_vehicle_make").alias("cnt"))
    .select("drv_vehicle_make", "cnt")
    .sort(col("cnt").desc)
    .limit(20)
    .show

println(s"Total time: ${System.currentTimeMillis()-t}")

The results are:

Polars Spark
39722 11903
39888 9847
39898 9557
43298 12894
36630 9682

Everything is in milliseconds (ms). The tries that were made in Spark every time were started from scratch with a new instance of Spark.

What is the actual behavior?

The processing time in the case of Polars is 4x longer.

What is the expected behavior?

The process time to be better in the case of Polars.

ritchie46 commented 1 year ago

Could you provide me with a docker setup for spark with the right settings so I can reproduce what you did?

What are your timings in the polars python api?

P.S. I changed the title as we do much more than only joins here.

andrei-ionescu commented 1 year ago

@ritchie46: Currently Apache Spark is installed on my local machine, the same place where I also did run the Polars test. Spark version is 3.2.1.

I don't plan to use Python because I want to be as close as possible to the hardware.

ritchie46 commented 1 year ago

Could you still get some timings in python? I know that the python bindings have the optimal settings for optimizations and allocators, so that would give me more information.

ghuls commented 1 year ago

@andrei-ionescu Are you sure you compiled in release mode and with optimizations?

In [16]: file1 = 'data_fact_anonymised_mot_test_item_2021_20220620T090029Z_anonymised_mot_test_item_2021.parquet'

In [17]: file2 = 'data_dimension_vehicle_20220620T085710Z_vehicle.parquet'

In [18]: df1 = pl.scan_parquet(file1)

In [19]: df2 = pl.scan_parquet(file2)

In [20]: df3 = (
    ...:     df2
    ...:     .join(
    ...:         df1,
    ...:         left_on=[pl.col("drv_vehicle_key")],
    ...:         right_on=[pl.col("drv_vehicle_key")],
    ...:         how="inner",
    ...:     )
    ...:     .groupby([pl.col("drv_vehicle_make")])
    ...:     .agg([pl.count().alias("cnt")])
    ...:     .select([
    ...:         pl.col("drv_vehicle_make"),
    ...:         pl.col("cnt"),
    ...:     ])
    ...:     .sort(
    ...:         "cnt",
    ...:         reverse=True,
    ...:         nulls_last=True,
    ...:     )
    ...:     .limit(20)
    ...: )

In [21]: %time df4 = df3.collect()
CPU times: user 16.4 s, sys: 2.13 s, total: 18.6 s
Wall time: 2.8 s

In [22]: df4
Out[22]: 
shape: (20, 2)
┌──────────────────┬──────────┐
│ drv_vehicle_make ┆ cnt      │
│ ---              ┆ ---      │
│ str              ┆ u32      │
╞══════════════════╪══════════╡
│ FORD             ┆ 13551161 │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌┤
│ VAUXHALL         ┆ 9322023  │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌┤
│ VOLKSWAGEN       ┆ 7671689  │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌┤
│ PEUGEOT          ┆ 4152401  │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌┤
│ ...              ┆ ...      │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌┤
│ MINI             ┆ 1586903  │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌┤
│ SKODA            ┆ 1348856  │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌┤
│ MAZDA            ┆ 1318925  │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌┤
│ VOLVO            ┆ 1234986  │
└──────────────────┴──────────┘

(Changed "cat" with "cnt" in the select statement to get it running).

andrei-ionescu commented 1 year ago

@ghuls: Could you provide the Rust code counterpart? And what does "compiled in release mode and with optimizations" mean? Do you have a command or some options that I need to use?

ritchie46 commented 1 year ago

@andrei-ionescu could you run the code provided by @ghuls and report the time it takes on your machine?

andrei-ionescu commented 1 year ago

@ritchie46, @ghuls: I just did run the Python code above in Python 3 and the result is somewhere around 5s.

It seems that there is something missing on the Rust side.

ritchie46 commented 1 year ago

There are several things you can do in order of ease:

  1. compile in release mode.
  2. target your native cpu flags in release mode
  3. use Jemalloc or Mimalloc as global allocator
  4. activate more polars features that help performance (in favor of compile time): { "performant", "chunked_ids" }
  5. use single threaded LTO (very slow)

The python polars bindings have all of these except your target cpu flags.

andrei-ionescu commented 1 year ago

@ritchie46: Is there some documentation on how to have this on par with Python. If it works that well on Python, what's the options that Python has?

I'm asking because rustc has a lot of parameters to toggle. I'm interested in the ones used when building Polars for Python.

ghuls commented 1 year ago

Even with just cargo build --release:

❯ timeit polars_lazy_scan_parquet/target/release/polars_lazy_scan_parquet
[src/main.rs:54] df3 = shape: (20, 2)
┌──────────────────┬──────────┐
│ drv_vehicle_make ┆ cnt      │
│ ---              ┆ ---      │
│ str              ┆ u32      │
╞══════════════════╪══════════╡
│ FORD             ┆ 13551161 │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌┤
│ VAUXHALL         ┆ 9322023  │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌┤
│ VOLKSWAGEN       ┆ 7671689  │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌┤
│ PEUGEOT          ┆ 4152401  │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌┤
│ ...              ┆ ...      │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌┤
│ MINI             ┆ 1586903  │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌┤
│ SKODA            ┆ 1348856  │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌┤
│ MAZDA            ┆ 1318925  │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌┤
│ VOLVO            ┆ 1234986  │
└──────────────────┴──────────┘
Total time 3.284089366s

Time output:
------------

  * Command: polars_lazy_scan_parquet/target/release/polars_lazy_scan_parquet
  * Elapsed wall time: 0:03.33 = 3.33 seconds
  * Elapsed CPU time:
     - User: 19.42
     - Sys: 2.44
  * CPU usage: 655%
  * Context switching:
     - Voluntarily (e.g.: waiting for I/O operation): 29911
     - Involuntarily (time slice expired): 5716
  * Maximum resident set size (RSS: memory) (kiB): 4752296
  * Number of times the process was swapped out of main memory: 0
  * Filesystem:
     - # of inputs: 0
     - # of outputs: 0
  * Exit status: 0
andrei-ionescu commented 1 year ago

@ghuls: In my case is not working.

This the code I use:

use polars::chunked_array::object::SortOptions;
use polars::prelude::{LazyFrame, ScanArgsParquet, PolarsError, JoinType, ParallelStrategy};
use polars::lazy::dsl::*;
use std::time::Instant;

fn main() -> Result<(), PolarsError> {

    let time_pass = Instant::now();
    let file1 = "data/parquet/data-fact-anonymised_mot_test_item-2021-20220620T090029Z-anonymised_mot_test_item_2021.parquet".to_string();
    let file2 = "data/parquet/data-dimension-vehicle-20220620T085710Z-vehicle.parquet".to_string();

    let df1 = LazyFrame::scan_parquet(
        file1,
        scan_args(),
    )
    .unwrap();

    let df2 = LazyFrame::scan_parquet(
        file2,
        scan_args(),
    )
    .unwrap();

    let df3 = df2
        .join(
            df1, 
            [col("drv_vehicle_key")],
            [col("drv_vehicle_key")], 
            JoinType::Inner,
        )
        .groupby([col("drv_vehicle_make")])
        .agg([count().alias("cnt")])
        .select([
            col("drv_vehicle_make"),
            col("cnt"),
        ])
        .sort(
            &"cnt", 
            SortOptions {
                descending: true,
                nulls_last: true,
            }
        )
    .limit(20)
    .collect()
    .unwrap();

    dbg!(df3);
    println!("Total time {:?} seconds", time_pass.elapsed().as_secs());
    Ok(())
}

fn scan_args() -> ScanArgsParquet { ScanArgsParquet {
    n_rows: None,
    cache: true,
    parallel: ParallelStrategy::Auto,
    rechunk: true,
    row_count: None,
}}

This is the Cargo.toml:

[package]
name = "my app"
version = "0.1.0"
edition = "2021"

[dependencies]
polars = { path = "/.../polars/polars", features = [ "polars-io", "parquet", "lazy", "dtype-struct" ] }
polars-sql = { path = "/.../polars/polars-sql" }

This is what I do to build:

$ cargo build --release --target=x86_64-apple-darwin

I'm on a macOS Monterey 12.3.1.

This is how I run it:

$ cargo run

The output is this:

[src/main.rs:48] df3 = shape: (20, 2)
┌──────────────────┬──────────┐
│ drv_vehicle_make ┆ cnt      │
│ ---              ┆ ---      │
│ str              ┆ u32      │
╞══════════════════╪══════════╡
│ FORD             ┆ 13551161 │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌┤
│ VAUXHALL         ┆ 9322023  │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌┤
│ VOLKSWAGEN       ┆ 7671689  │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌┤
│ PEUGEOT          ┆ 4152401  │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌┤
│ ...              ┆ ...      │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌┤
│ MINI             ┆ 1586903  │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌┤
│ SKODA            ┆ 1348856  │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌┤
│ MAZDA            ┆ 1318925  │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌┤
│ VOLVO            ┆ 1234986  │
└──────────────────┴──────────┘
Total time 47 seconds

So...

  1. What am I doing wrong?
  2. What OS are you using?
ghuls commented 1 year ago

I am on Linux.

Invoke the binary directly or use cargo run --release --target=x86_64-apple-darwin. You are running the debug version.

With just cargo run it takes 51.650563074s seconds.

andrei-ionescu commented 1 year ago

@ghuls: Yes! Hooray! That was it. Thank you so much for bearing me in all this use case. This look very promising in what I'm trying to do.

[src/main.rs:48] df3 = shape: (20, 2)
┌──────────────────┬──────────┐
│ drv_vehicle_make ┆ cnt      │
│ ---              ┆ ---      │
│ str              ┆ u32      │
╞══════════════════╪══════════╡
│ FORD             ┆ 13551161 │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌┤
│ VAUXHALL         ┆ 9322023  │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌┤
│ VOLKSWAGEN       ┆ 7671689  │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌┤
│ PEUGEOT          ┆ 4152401  │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌┤
│ ...              ┆ ...      │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌┤
│ MINI             ┆ 1586903  │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌┤
│ SKODA            ┆ 1348856  │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌┤
│ MAZDA            ┆ 1318925  │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌┤
│ VOLVO            ┆ 1234986  │
└──────────────────┴──────────┘
Total time 4 seconds
andrei-ionescu commented 1 year ago

Maybe it would be a good idea to specify this somewhere in the documentation.

@ritchie46, @jorgecarleitao, @ghuls: WDYT?

ritchie46 commented 1 year ago

Maybe it would be a good idea to specify this somewhere in the documentation.

@ritchie46, @jorgecarleitao, @ghuls: WDYT?

This can be assumed common knowledge by rust users. Have you finished reading the Rust book?

jorgecarleitao commented 1 year ago

I agree @andrei-ionescu that it is a bit confusing cargo build ... essentially creates the binary. To run it, use ./target/{release,debug}/{program} .... Alternatively, you need to use cargo run ..., which is basically an alias to the above. I think that cargo should be the place to cover that, as @ritchie46 wrote.

An idea: maybe we could request that users filling issues here using Rust Polars to paste the bash command that they are using to run the example (besides the code)? It could make it easier for everyone involved and does not really hurt (they already have to copy-paste the example...).

ritchie46 commented 1 year ago

An idea: maybe we could request that users filling issues here using Rust Polars to paste the bash command that they are using to run the example (besides the code)? It could make it easier for everyone involved and does not really hurt (they already have to copy-paste the example...).

Yeap, that might save some time for everybody involved, I shall update the issue template.

andrei-ionescu commented 1 year ago

Ok. So you're suggesting to add a new section on the issue template where the people reporting Rust issue to write their build command and execution command. I think that would be a good start. But, still won't spare you guys from the fact that you'll need to check those commands every time an issue is created 😄.

Giving people the tools to check themselves is a better approach in my opinion and minimises the probability of new issues, like this one, being created.