pola-rs / polars

Dataframes powered by a multithreaded, vectorized query engine, written in Rust
https://docs.pola.rs
Other
26.9k stars 1.65k forks source link

`POLARS_MAX_THREADS` doesn't work expectly. #15452

Closed baiguoname closed 1 month ago

baiguoname commented 1 month ago

Checks

Reproducible example

std::env::set_var("POLARS_MAX_THREADS", "1");

Log output

No response

Issue description

When I set this, I run different threads with different DataFrame, the cpu still just used 1 core. Does this env variable mean, polars will just use a core in one "program" rather than one DataFrame?

Expected behavior

Even I set this env variable to 1, when I run different threads with different DataFrames, the number of cpu cores used should be more than just 1.

Installed versions

Polars: 0.38.3

reswqa commented 1 month ago

This controls the maximum number of threads in the rayon thread pool, where all parallel computations in polars take place.

baiguoname commented 1 month ago

So, is it right about what I've said above: this env means that how many cores will be used in one program rather in one DataFrame?

reswqa commented 1 month ago

Not 100%, but most expression evaluations are submitted to rayon for better performance. Of course, I don't think this is a bug, all available cores are used by default.

baiguoname commented 1 month ago

This will slow down polars tramendously. For example: I have a expr and a DataFrame, one core will take 1s to compute it, and two cores will take 800ms. Now I have 100 DataFrames, if I set the number cores used to 1, and run all the DataFrames in different threads, it will take 50s. Now with the limitation of polars, it will take 80s.

reswqa commented 1 month ago

What I don't understand is why you have to set the number cores to 1 even if you want to use more resources.

baiguoname commented 1 month ago

Because in most situation, two cores (In DataFrame) can't reduce the comsumed time to half compared to one core. But if I can put the two DataFrames in two thread, it can reduce the comsumed time to half. Like what I've said, the former takes 50s, but the latter take 80s.

reswqa commented 1 month ago

I have a feeling that the difference you have here mainly comes from the fact that collect between different DF's is blocked, and if you use multiple threads they can be executed simultaneously. I think the main problem here is that you set rayon's thread pool to 1, which prevents compution-intensive tasks from executing in parallel.

baiguoname commented 1 month ago

collect between different DF's is blocked,

Even I set POLARS_MAX_THREADS to 2, the collect between different DF's (DF lies in different threads) is still called sequencially?

reswqa commented 1 month ago

Even I set POLARS_MAX_THREADS to 2, the collect between different DF's (DF lies in different threads) is still called sequencially?

IIUC, If DF lies in different threads, it will not blocked each other. If all df in a single-thread, then it is executed sequentially.

baiguoname commented 1 month ago

Even I set this env variable to 1, when I run different threads with different DataFrames, the number of cpu cores used should be more than just 1.

Then why there just one core used?

reswqa commented 1 month ago

Then why there just one core used?

Even if you start a thread for each dataFrame, computation-intensive operations such as expression evaluation will still be performed by a single thread(env variable was set to 1), while other operations should not take up much CPU. you expect the resources allocated to the computation to be more than one thread, the solution to your problem is to increase the value of POLARS_MAX_THREADS, wouldn't it?

reswqa commented 1 month ago

But if I can put the two DataFrames in two thread, it can reduce the comsumed time to half. Like what I've said, the former takes 50s, but the latter take 80s.

The 80s here sets POLARS_MAX_THREADS to 1 or more than 1? Or can you show the code of your program.

baiguoname commented 1 month ago

Then why there just one core used?

Even if you start a thread for each dataFrame, computation-intensive operations such as expression evaluation will still be performed by a single thread(env variable was set to 1), while other operations should not take up much CPU. you expect the resources allocated to the computation to be more than one thread, the solution to your problem is to increase the value of POLARS_MAX_THREADS, wouldn't it?

What I've guessed is right. I changed the source code of polars file /src/frame/mod.rs:

    fn try_apply_columns_par(
        &self,
        func: &(dyn Fn(&Series) -> PolarsResult<Series> + Send + Sync),
    ) -> PolarsResult<Vec<Series>> {
        POOL.install(|| {
            let start_time = std::time::Instant::now();
            println!("start ---- {:?}", start_time);
            let res = self.columns.par_iter().map(func).collect();
            println!("end ----- {:?}", start_time);
            res
        })
    }

And this is the a part of outputs from terminal:

start ---- Instant { tv_sec: 2268774, tv_nsec: 825473741 }
end ----- Instant { tv_sec: 2268774, tv_nsec: 825473741 }
start ---- Instant { tv_sec: 2268774, tv_nsec: 825669035 }
end ----- Instant { tv_sec: 2268774, tv_nsec: 825669035 }
start ---- Instant { tv_sec: 2268774, tv_nsec: 825684175 }
end ----- Instant { tv_sec: 2268774, tv_nsec: 825684175 }
start ---- Instant { tv_sec: 2268774, tv_nsec: 825711819 }
end ----- Instant { tv_sec: 2268774, tv_nsec: 825711819 }

As you can see, the log start is always followed by end, which means that even the DFs lie in different threads, the collection between them are always blocked. So in one program, the DFs collect are alwasy called squentially.

baiguoname commented 1 month ago

But if I can put the two DataFrames in two thread, it can reduce the comsumed time to half. Like what I've said, the former takes 50s, but the latter take 80s.

The 80s here sets POLARS_MAX_THREADS to 1 or more than 1? Or can you show the code of your program.

More 1, if it's set to 1, it should take 100s.

baiguoname commented 1 month ago

Then why there just one core used?

Even if you start a thread for each dataFrame, computation-intensive operations such as expression evaluation will still be performed by a single thread(env variable was set to 1), while other operations should not take up much CPU. you expect the resources allocated to the computation to be more than one thread, the solution to your problem is to increase the value of POLARS_MAX_THREADS, wouldn't it?

Thank you for your explaination. So why after the value is set to 1, the collect in different threads must called sequentially. What I need is that one collect in one thread takes one core, but I can call collect in different threads simutantially.
If I set the value to 2, I guess it is possilbe that one collect in one thread may take two cores. It is slow than the way above.

reswqa commented 1 month ago

As you can see, the log start is always followed by end, which means that even the DFs lie in different threads, the collection between them are always blocked. So in one program, the DFs collect are alwasy called squentially.

The log you added was executed in the rayon thread pool. Is this the result of POLARS_MAX_THREADS=1?

baiguoname commented 1 month ago

Is this the result of POLARS_MAX_THREADS=1

Yes, it is.

reswqa commented 1 month ago

This is expected then, because now the pool has only one thread, which must execute sequentially.

reswqa commented 1 month ago

Only operations that don't need to be submitted to POOL will be parallelized by the thread you introduced for each df.

baiguoname commented 1 month ago

Only operations that don't need to be submitted to POOL will be parallelized by the thread you introduced.

Yes, I've got it. Is there a way that I let a collect in one thread just take one core, but different collects in different threads can call simutantially?

reswqa commented 1 month ago

I am not aware of a way to strictly control the computation of per-df-per-core as not very familiar with rayon, but I think it should exist.

How about you create a new issue that explicitly states this requirement? Because the context of our conversation is so long right now, I'm worried about distracting others from understanding the nature of the problem.

baiguoname commented 1 month ago

I am not aware of a way to strictly control the computation of per-df-per-core as not very familiar with rayon, but I think it should exist.

How about you create a new issue that explicitly states this requirement? Because the context of our conversation is so long right now, I'm worried about distracting others from understanding the nature of the problem.

Of course, I'll create a new issue for this problem. 问下, 你有国内的polars交流群或者rust?我在网上没找到。

reswqa commented 1 month ago

I closed the question because it is a bit off topic now :)

问下, 你有国内的polars交流群或者rust?我在网上没找到。

Could you left your wechat or qq number?

baiguoname commented 1 month ago

I closed the question because it is a bit off topic now :)

问下, 你有国内的polars交流群或者rust?我在网上没找到。

Could you left your wechat or qq number?

wechat account: aruster qq number: 2429357337