pola-rs / r-polars

Bring polars to R
https://pola-rs.github.io/r-polars/
Other
415 stars 36 forks source link

Background execution and R process pool #311

Closed Sicheng-Pan closed 10 months ago

Sicheng-Pan commented 11 months ago

This PR aims to implement an interface to create asynchronous jobs that can be executed in background threads.

Specifically, this will provide an alternative option to run the Lazy query in background without blocking the R console.

In order to run map and apply on the LazyFrame without using the existing R interpreter, new R processes have to be created in background to handle the evaluation tasks.

The user will get thread handles for the background jobs, and can check if they are done (non-blocking) or wait for them to finish (blocking).

Example:

> handle <- pl$LazyFrame(mtcars)$with_column(pl$col("mpg")$map_in_background(function(x) x * 0.43)$alias("kml"))$collect_in_background()
> handle$is_finished() |> unwrap()
[1] TRUE
> handle$join() |> unwrap()
shape: (32, 12)
┌──────┬─────┬───────┬───────┬───┬─────┬──────┬──────┬───────┐
│ mpg  ┆ cyl ┆ disp  ┆ hp    ┆ … ┆ am  ┆ gear ┆ carb ┆ kml   │
│ ---  ┆ --- ┆ ---   ┆ ---   ┆   ┆ --- ┆ ---  ┆ ---  ┆ ---   │
│ f64  ┆ f64 ┆ f64   ┆ f64   ┆   ┆ f64 ┆ f64  ┆ f64  ┆ f64   │
╞══════╪═════╪═══════╪═══════╪═══╪═════╪══════╪══════╪═══════╡
│ 21.0 ┆ 6.0 ┆ 160.0 ┆ 110.0 ┆ … ┆ 1.0 ┆ 4.0  ┆ 4.0  ┆ 9.03  │
│ 21.0 ┆ 6.0 ┆ 160.0 ┆ 110.0 ┆ … ┆ 1.0 ┆ 4.0  ┆ 4.0  ┆ 9.03  │
│ 22.8 ┆ 4.0 ┆ 108.0 ┆ 93.0  ┆ … ┆ 1.0 ┆ 4.0  ┆ 1.0  ┆ 9.804 │
│ 21.4 ┆ 6.0 ┆ 258.0 ┆ 110.0 ┆ … ┆ 0.0 ┆ 3.0  ┆ 1.0  ┆ 9.202 │
│ …    ┆ …   ┆ …     ┆ …     ┆ … ┆ …   ┆ …    ┆ …    ┆ …     │
│ 15.8 ┆ 8.0 ┆ 351.0 ┆ 264.0 ┆ … ┆ 1.0 ┆ 5.0  ┆ 4.0  ┆ 6.794 │
│ 19.7 ┆ 6.0 ┆ 145.0 ┆ 175.0 ┆ … ┆ 1.0 ┆ 5.0  ┆ 6.0  ┆ 8.471 │
│ 15.0 ┆ 8.0 ┆ 301.0 ┆ 335.0 ┆ … ┆ 1.0 ┆ 5.0  ┆ 8.0  ┆ 6.45  │
│ 21.4 ┆ 4.0 ┆ 121.0 ┆ 109.0 ┆ … ┆ 1.0 ┆ 4.0  ┆ 2.0  ┆ 9.202 │
└──────┴─────┴───────┴───────┴───┴─────┴──────┴──────┴───────┘
sorhawell commented 11 months ago

hi @Sicheng-Pan this PR is very interesting ! :)

I'd like to hear your thoughts on the final syntax, would do you think of below?

I think $collect_in_background() is a meaningful variation of $collect(). The former does not halt the Rsession at the expense of at least one more R process is launched (extra process not need if not using map or apply). Also $collect_in_background() returns a handle, not a <DataFrame>.

however for $map_in_background() I suggest to use regular $map() and resolve if map is pushed to another R process via if $collect() or $collect_in_background() was used and/or some global polars_options.

It is also meaningful to map in another process even if not calling $collect_in_background() when several R lambdas(pure functions) could be mapped in parallel

also it could be advantageous with an option like pl$set_polars_options(r_extra_process_pool = 5), where default is 0 . If user sets this process are spun up and recycled for more jobs.

eitsupi commented 11 months ago

Looks amazing! Thanks for working on this!

Could you please resolve merge conflicts?

Sicheng-Pan commented 11 months ago

@sorhawell Since currently we do not have a global options store on the rust side, I think it is better to leave map and map_in_background as separate expressions and directly leave the choice to the users. Besides, map and map_in_background do have some differences: map can have side-effects in the main process, while map_in_background cannot have side-effects and can only support pure R functions for now.

sorhawell commented 11 months ago

I propose 3 last thing before merging:

Doesn't that make sense ?

If you like I can add one of these.

sorhawell commented 11 months ago

In this PR LazyFrame has now collect() + collect_in_background() + collect_background() .

Could we cut away collect_background()?

sorhawell commented 11 months ago

the RThreadHandle print.RThreadHandle and is_finished() does not work after handle is exhausted. I have added some expectations to unit-test but not resolved them

sorhawell commented 11 months ago

I have added two scenarios "low io - low cpu" and "low io - high cpu".

in "low io - high cpu" scenario, R background processes mapping is double as fast. However it does not matter what capacity is set to and it appears only two processes are spawned.

> f_all_cols(lf)$collect() |> system.time()
   user  system elapsed 
  7.023   0.148   7.136 
> pl$set_global_rpool_cap(1)
> f_all_cols(lf, in_background = TRUE)$collect() |> system.time()  #burn-in start processes
   user  system elapsed 
  0.013   0.010   3.935 
> f_all_cols(lf, in_background = TRUE)$collect() |> system.time()
   user  system elapsed 
  0.014   0.012   3.940 
> pl$set_global_rpool_cap(4)
> f_all_cols(lf, in_background = TRUE)$collect() |> system.time() #burn-in start processes
   user  system elapsed 
  0.014   0.016   3.955 
> f_all_cols(lf, in_background = TRUE)$collect() |> system.time()
   user  system elapsed 
  0.013   0.011   3.642 
> pl$get_global_rpool_cap()
$available
[1] 2

$capacity
[1] 4

In "low io - low cpu" where in_background = FALSE is much faster as expected. Sometimes when running with in_background = TRUE many time in row the performance just becomes 10x faster and match single process. I cannot explain that. Maybe it skips running in background process.

> pl$set_global_rpool_cap(4)
> f_sum_all_cols(lf, in_background = TRUE)$collect() |> system.time()
   user  system elapsed 
  0.006   0.006   0.330 
> f_sum_all_cols(lf, in_background = TRUE)$collect() |> system.time()
   user  system elapsed 
  0.005   0.007   0.022 
> f_sum_all_cols(lf, in_background = TRUE)$collect() |> system.time()
   user  system elapsed 
  0.007   0.007   0.025 
Sicheng-Pan commented 11 months ago

@sorhawell It seems that the R-CMD-check is failing on CI. The tests can pass locally on my side. May I ask if you know any possible reasons behind this?

sorhawell commented 11 months ago

@Sicheng-Pan

Error: Error in eval(ei, envir) : checking tests ... [18s/18s] ERROR
  Running ‘testthat.R’ [18s/18s]
Running the tests in ‘tests/testthat.R’ failed.
Last 13 lines of output:
  <RPolarsErr_error/error/condition>
  Error: Execution halted with the following contexts
     0: In R: in $collect():
     0: During function call [test_check("polars")]
     1: When calling $collect() on LazyFrame
     2: ComputeError(ErrString("Execution halted with the following contexts\n   1: When trying to rent a R process from the global R process pool\n   2: When trying to spawn a background R process\n   3: Os { code: 8, kind: Uncategorized, message: \"Exec format error\" }\n"))

It seems there is a OS specific issue with the system call to start a process on ubuntu/linux. We do not see it on a mac. The rust crate you use to make system calls have likely produced this error Os { code: 8, kind: Uncategorized, message: \"Exec format error\" }\n")) You may find clues there for how to read it. Maybe some calls are not supported on all platforms. Maybe others are.

If you find no clues you or I can try debug it in a ubuntu docker container (rocker docker). I have one ready installed for another project.

In the latest build you see an error from printing RThreadHandle

    Running 'testthat.R' [25s]
Running the tests in 'tests/testthat.R' failed.
Last 13 lines of output:
  Error: Execution halted with the following contexts
     0: During function call [test_check("polars")]

  Backtrace:
      ▆
   1. ├─base::print(handle) at test-rbackground.R:43:2
   2. └─polars:::print.RThreadHandle(handle)
   3.   ├─base::cat(as.character(x), "\n") at polars/R/rbackground.R:19:22
   4.   ├─base::as.character(x) at polars/R/rbackground.R:19:22
   5.   └─polars:::as.character.RThreadHandle(x) at polars/R/rbackground.R:19:22
   6.     └─polars:::unwrap(.pr$RThreadHandle$thread_description(x)) at polars/R/rbackground.R:14:29

I caused that by adding the unit test expectation. I could fix the error but I left it undone to not mess with your often fine design choices :)

Sicheng-Pan commented 11 months ago

@sorhawell I examined the error messages but I still don't have any clue why this could fail. It seems that Command::spawn failed on Ubuntu , but I suppose this command should have cross-platform support. If you know what's going on feel free to fix it (or point it out in the comments so that I can fix it)!

sorhawell commented 11 months ago

I will try to test the commits between last pass and first error to pin point the issue. Maybe it is shared mem feature.

sorhawell commented 11 months ago

maybe something with quotes

sorhawell commented 11 months ago

@Sicheng-Pan I got a little closer. When trying to run only R it fails when running ls it does not fail. But likely hangs as there is no handler.

https://github.com/pola-rs/r-polars/pull/328/commits/29e146ff48e3b9dba8a565f3a017a1fcc9986ed3

I would take step back and start a new branch from main where you unit-test starting processes with R. Maybe another way of staring processes is needed to work on all platforms.

... or just stay in this branch but disable all current process unit tests and then slowly add on more and more complicated process calls and see the green check for each commit.

Sicheng-Pan commented 11 months ago

@Sicheng-Pan I got a little closer. When trying to run only R it fails when running ls it does not fail. But likely hangs as there is no handler.

29e146f

I would take step back and start a new branch from main where you unit-test starting processes with R. Maybe another way of staring processes is needed to work on all platforms.

... or just stay in this branch but disable all current process unit tests and then slowly add on more and more complicated process calls and see the green check for each commit.

@sorhawell Are you sure about whether R command is present in the CI environment? Could you please try which R and see the output?

sorhawell commented 11 months ago

@Sicheng-Pan you might be right as which points to r_check_bin/R which is a devilish little shell script

echo "'R' should not be used without a path -- see par. 1.6 of the manual"
exit 1

we need to find some cross platform way for R to locate it self. Maybe there is some variable in R it self specifing this. Then we need to start a command with path to the right binary in rust.

sorhawell commented 11 months ago

Simple solution would be to have an environment variable where is R binary. If rust cannot start a process, the error suggests the user to specify path to R binary.

Sicheng-Pan commented 11 months ago

@sorhawell I've made some small modifications to the code. Now all checks can pass on ubuntu/macos, but it seems that there are new errors on the windows platforms.

sorhawell commented 11 months ago

@Sicheng-Pan to fix the windows issues you could revert to previous way for just windows, something like...

if R!("R.version$os"=="Windows").expect("in any R session").as_bool().expect("== producess a bool") == true {
  command::new("R")
} else {
 ....
}.spawn()
Sicheng-Pan commented 11 months ago

@Sicheng-Pan to fix the windows issues you could revert to previous way for just windows, something like...


if R!("R.version$os"=="Windows").expect("in any R session").as_bool().expect("== producess a bool") == true {

  command::new("R")

} else {

 ....

}.spawn()

@sorhawell We probably couldn't use anything from the R side because this function may not be called in the main thread

sorhawell commented 11 months ago

It seems in_background=TRUE is equally 2x faster than (in_background=FALSE) in "low io - high cpu" for 0,1 and 2 processes. Having 8 processes is faster than 2. Could there be some edge condition?

last benchmark is updated 97b2fb9

### 3a -----------  Use R processes in parallel, low io, high cpu
lf <- pl$LazyFrame(lapply(1:12,\(i) rep(i,5)))
f_all_cols <-  \(lf,...) lf$select(pl$all()$map(\(x) {
  for(i in 1:10000) y = sum(rnorm(1000))
  sum(x)
},...))

> pl$set_global_rpool_cap(1)
> f_all_cols(lf, in_background = TRUE)$collect() |> system.time() #burn-in start processes
   user  system elapsed 
  0.007   0.015   4.124 
> f_all_cols(lf, in_background = TRUE)$collect() |> system.time()
   user  system elapsed 
  0.006   0.014   3.871 
> f_all_cols(lf, in_background = FALSE)$collect() |> system.time()
   user  system elapsed 
  8.209   0.350   8.362 
> 
> ##Appears to be a factor overhead when comparing single process
> pl$set_global_rpool_cap(0)
> f_all_cols(lf, in_background = TRUE)$collect() |> system.time() #burn-in start processes
   user  system elapsed 
  0.007   0.015   4.314 
> f_all_cols(lf, in_background = TRUE)$collect() |> system.time()
   user  system elapsed 
  0.007   0.015   4.491 
> f_all_cols(lf, in_background = FALSE)$collect() |> system.time()
   user  system elapsed 
  8.815   0.956   9.725 
> 
> pl$set_global_rpool_cap(2)
> f_all_cols(lf, in_background = TRUE)$collect() |> system.time() #burn-in start processes
   user  system elapsed 
  0.006   0.015   3.989 
> f_all_cols(lf, in_background = TRUE)$collect() |> system.time()
   user  system elapsed 
  0.023   0.038   3.803 
> f_all_cols(lf, in_background = FALSE)$collect() |> system.time()
   user  system elapsed 
  8.943   1.151   9.886 
> 
> pl$set_global_rpool_cap(8)
> f_all_cols(lf, in_background = TRUE)$collect() |> system.time() #burn-in start processes
   user  system elapsed 
  0.005   0.010   3.048 
> f_all_cols(lf, in_background = TRUE)$collect() |> system.time()
   user  system elapsed 
  0.006   0.015   2.425 
> f_all_cols(lf, in_background = FALSE)$collect() |> system.time()
   user  system elapsed 
  8.261   0.317   8.436 
> pl$get_global_rpool_cap() #only 2 processes appears to be spawned
$available
[1] 8

$capacity
[1] 8
Sicheng-Pan commented 10 months ago

It seems in_background=TRUE is equally 2x faster than (in_background=FALSE) in "low io - high cpu" for 0,1 and 2 processes. Having 8 processes is faster than 2. Could there be some edge condition?

last benchmark is updated 97b2fb9

@sorhawell I have not examined the polars code base closely but I would assume that they are running the maps in parallel threads (for the benchmark case). Without using the R background process pool, all these treads have to share the main R thread from the user, which could become the bottleneck of computation. With the R background thread pool each thread can get its own R process for evaluation. A process pool of size 8 might be faster than a process pool of size 2 because with larger process it is more likely for a polars thread to inherit a used R process (after the completion of calculation in another thread), rather than creating a new one, but this may only lead to marginal benefits since the overhead of creating a new process might be small with respect to the computation under the low io - high cpu scenario.

Sicheng-Pan commented 10 months ago

@sorhawell It seems that the docs check CI keeps hanging. Do you have any ideas why this is the case?

sorhawell commented 10 months ago

@sorhawell It seems that the docs check CI keeps hanging. Do you have any ideas why this is the case?

@Sicheng-Pan Just got a theory. It is likely because all the other workflows will build the package whereas docs workflow will only load it in the session.. The other processes cannot handle the requests without the polars package, which causes the polars query in main process to stall/hang.

I will try to modify the workflow.

Sicheng-Pan commented 10 months ago

@sorhawell Thanks! Anything else I should address for this PR?

sorhawell commented 10 months ago

@Sicheng-Pan I have to run for today I just flush all changes and continue tmr.

I have found a bug in RBackgroundPool that explains the strange benchmark. It is good to dive deep into benchmarks of multi stuff. Sometimes there is gold to be found. RBackgroundPool::lease would always spawn a new process if non was available in pool, disregarding the cap. Processes would be discarded if capacity was exceeded still. This explains poor performance in low io - low cpu tests, because most time was spend starting and stopping R processes.

I made a poorman sleep mechnism where thread will only spawn new process if capacity is not exceeded. Otherwise they will perform some adaptive sleeping between 0.2-16.3ms to wait for R process to become available. A better mechanism where threads are parked and qued until an R process idle in pool.

Sometime in benchmark test 3a I even get performances which 100 times faster. I think there is one more bug to be found and the the overhead in low io - low cpu will be near zero.

Sicheng-Pan commented 10 months ago

@Sicheng-Pan I have to run for today I just flush all changes and continue tmr.

I have found a bug in RBackgroundPool that explains the strange benchmark. It is good to dive deep into benchmarks of multi stuff. Sometimes there is gold to be found. RBackgroundPool::lease would always spawn a new process if non was available in pool, disregarding the cap. Processes would be discarded if capacity was exceeded still. This explains poor performance in low io - low cpu tests, because most time was spend starting and stopping R processes.

I made a poorman sleep mechnism where thread will only spawn new process if capacity is not exceeded. Otherwise they will perform some adaptive sleeping between 0.2-16.3ms to wait for R process to become available. A better mechanism where threads are parked and qued until an R process idle in pool.

Sometime in benchmark test 3a I even get performances which 100 times faster. I think there is one more bug to be found and the the overhead in low io - low cpu will be near zero.

@sorhawell Thanks for the investigation! Actually this is not a bug. I implemented it in this way on purpose, such that the RBackgroundPool behaves like a buffer (which can overflow) instead of a container (which can't). Imagine the case where you have heavy R computations that can be done in parallel in the background but the user only specified that the pool capacity is 1. If no new R processes can be spawn then this will become a bottleneck. Meanwhile the overhead of creating a new process is acceptable from my perspective. If users want to eliminate the overhead they may try to set the capacity to around the number of CPU cores (with hyper-threading), assuming that polars will not spawn more threads than that. Also I thought that the low CPU case is not so relevant because in this scenario in_background = FALSE could be a better option.

sorhawell commented 10 months ago

I will write a counter suggestion and we will compare on benchmark. It might be ok with an unbounded pool. However maybe some user would like to bound their mem usage with pool size. Also, polars uses 2x thread of physical cores, e.g. 8 thread on my machine. I don't think it is always beneficial to have that many R sessions open at once. If 8 threads and cap=2. This explains the slowness because about 6 threads must create new processes and destroy them again continuously. Maybe you could patch the unbounded pool to not continuously open and close processes but it seems difficult to me.

Sicheng-Pan commented 10 months ago

I will write a counter suggestion and we will compare on benchmark. It might be ok with an unbounded pool. However maybe some user would like to bound their mem usage with pool size. Also, polars uses 2x thread of physical cores, e.g. 8 thread on my machine. I don't think it is always beneficial to have that many R sessions open at once. If 8 threads and cap=2. This explains the slowness because about 6 threads must create new processes and destroy them again continuously. Maybe you could patch the unbounded pool to not continuously open and close processes but it seems difficult to me.

I can also make an option for the user to configure whether the upper limit is a soft limit of hard limit.

sorhawell commented 10 months ago

before spawn processes in paralell

[1] "test 3a"
usr:   51ms sys:   15ms elp:   85ms ||  - 3a +io %bitrate %cpu foreground 
usr:   65ms sys:   86ms elp: 2054ms ||  - 3a +io %bitrate %cpu pool=8 background burn-in  
usr:   63ms sys:   87ms elp:   76ms ||  - 3a +io %bitrate %cpu pool=8 background 
usr:   58ms sys:   68ms elp:  186ms ||  - 3a +io %bitrate %cpu pool=4 background 
usr:   60ms sys:   69ms elp:  182ms ||  - 3a +io %bitrate %cpu pool=2 background 
usr:   60ms sys:   58ms elp:  325ms ||  - 3a +io %bitrate %cpu pool=1 background 
[1] "test 3b"
usr:10888ms sys:  452ms elp:11047ms ||  - 3b %io %bitrate +cpu foreground 
usr:    4ms sys:   12ms elp: 4415ms ||  - 3b %io %bitrate +cpu pool=8 background burn-in  
usr:    3ms sys:    7ms elp: 2891ms ||  - 3b %io %bitrate +cpu pool=8 background 
usr:    4ms sys:    8ms elp: 3273ms ||  - 3b %io %bitrate +cpu pool=6 background 
usr:    4ms sys:    7ms elp: 3709ms ||  - 3b %io %bitrate +cpu pool=4 background 
usr:    4ms sys:    6ms elp: 5648ms ||  - 3b %io %bitrate +cpu pool=2 background 
usr:    4ms sys:    8ms elp:11798ms ||  - 3b %io %bitrate +cpu pool=1 background 

after has lower burn-in times

[1] "test 3a - parallel"
usr:   55ms sys:   16ms elp:   96ms ||  - 3a +io %bitrate %cpu foreground 
usr:   65ms sys:   74ms elp:  713ms ||  - 3a +io %bitrate %cpu pool=8 background burn-in  
usr:   62ms sys:   85ms elp:   99ms ||  - 3a +io %bitrate %cpu pool=8 background 
usr:   70ms sys:  105ms elp:  142ms ||  - 3a +io %bitrate %cpu pool=4 background 
usr:   59ms sys:   67ms elp:  174ms ||  - 3a +io %bitrate %cpu pool=2 background 
usr:   54ms sys:   56ms elp:  321ms ||  - 3a +io %bitrate %cpu pool=1 background 
[1] "test 3b - parallel"
usr:10941ms sys:  459ms elp:11152ms ||  - 3b %io %bitrate +cpu foreground 
usr:    5ms sys:   13ms elp: 3474ms ||  - 3b %io %bitrate +cpu pool=8 background burn-in  
usr:    3ms sys:    9ms elp: 2656ms ||  - 3b %io %bitrate +cpu pool=8 background 
usr:    4ms sys:    7ms elp: 2993ms ||  - 3b %io %bitrate +cpu pool=6 background 
usr:    4ms sys:    7ms elp: 3562ms ||  - 3b %io %bitrate +cpu pool=4 background 
usr:    4ms sys:    7ms elp: 5637ms ||  - 3b %io %bitrate +cpu pool=2 background 
usr:    3ms sys:    6ms elp:11131ms ||  - 3b %io %bitrate +cpu pool=1 background 
sorhawell commented 10 months ago

macbook pro 2015 - 4 cores - intel x64 - 16 GB RAM. rust-polars has as default n_workers = cores x 2

previous pool impl

[1] "test 1a - sequential"
usr:  257ms sys:   31ms elp:  344ms ||  - 1a +io %bitrate %cpu foreground 
usr:  126ms sys:   47ms elp:  518ms ||  - 1a +io %bitrate %cpu background1 
usr:  132ms sys:   48ms elp:  534ms ||  - 1a +io %bitrate %cpu background2 
[1] "test 1b - sequential"
usr:  196ms sys:  114ms elp:  311ms ||  - 1b -io +bitrate %cpu foreground 
usr:  277ms sys:  353ms elp: 1651ms ||  - 1b -io +bitrate %cpu background1 
usr:  285ms sys:  338ms elp: 1665ms ||  - 1b -io +bitrate %cpu background2 
[1] "test 2a - sequential"
usr: 2444ms sys:  585ms elp: 3032ms ||  - 2a %io ~bitrate +cpu foreground 
usr:    1ms sys:    5ms elp: 3003ms ||  - 2a %io ~bitrate +cpu background 
usr:    2ms sys:    8ms elp: 2927ms ||  - 2a %io ~bitrate +cpu background2 
[1] "test 3a - parallel"
usr:   69ms sys:   22ms elp:  131ms ||  - 3a +io %bitrate %cpu foreground 
usr:   68ms sys:   91ms elp: 2112ms ||  - 3a +io %bitrate %cpu pool=8 background burn-in  
usr:   65ms sys:   90ms elp:   80ms ||  - 3a +io %bitrate %cpu pool=8 background 
usr:   66ms sys:   93ms elp: 2609ms ||  - 3a +io %bitrate %cpu pool=4 background 
usr:  150ms sys:  412ms elp:114714ms ||  - 3a +io %bitrate %cpu pool=2 background 
usr:  165ms sys:  506ms elp:149871ms ||  - 3a +io %bitrate %cpu pool=1 background 
[1] "test 3b - parallel"
usr:10863ms sys:  429ms elp:11048ms ||  - 3b %io %bitrate +cpu foreground 
usr:    5ms sys:   12ms elp: 4166ms ||  - 3b %io %bitrate +cpu pool=8 background burn-in  
usr:    4ms sys:    9ms elp: 2642ms ||  - 3b %io %bitrate +cpu pool=8 background 
usr:    4ms sys:   11ms elp: 3288ms ||  - 3b %io %bitrate +cpu pool=6 background 
usr:    4ms sys:   13ms elp: 3807ms ||  - 3b %io %bitrate +cpu pool=4 background 
usr:    5ms sys:   14ms elp: 4245ms ||  - 3b %io %bitrate +cpu pool=2 background 
usr:    6ms sys:   19ms elp: 4979ms ||  - 3b %io %bitrate +cpu pool=1 background 
[1] "test 3c - parallel"
usr:10988ms sys:  715ms elp:11511ms ||  - 3c %io +bitrate +cpu foreground  
usr:  166ms sys:  132ms elp: 4276ms ||  - 3c %io +bitrate +cpu pool=8 background burn-in  
usr:  246ms sys:  123ms elp: 2798ms ||  - 3c %io +bitrate +cpu pool=8 background 
usr:  162ms sys:   95ms elp: 3393ms ||  - 3c %io +bitrate +cpu pool=6 background 
usr:  166ms sys:  108ms elp: 3810ms ||  - 3c %io +bitrate +cpu pool=4 background 
usr:  131ms sys:   90ms elp: 4257ms ||  - 3c %io +bitrate +cpu pool=2 background 
usr:  124ms sys:   97ms elp: 4929ms ||  - 3c %io +bitrate +cpu pool=1 background 
[1] "test 3d - parallel + r-polars conversion"
usr:  418ms sys:   78ms elp:  494ms ||  - 3d %io +bitrate +cpu foreground  
usr:  513ms sys: 2706ms elp: 2869ms ||  - 3d %io +bitrate +cpu pool=8 background burn-in  
usr:  619ms sys: 6368ms elp: 1548ms ||  - 3d %io +bitrate +cpu pool=8 background 
usr:  568ms sys: 2132ms elp: 2079ms ||  - 3d %io +bitrate +cpu pool=6 background 
usr:  480ms sys:  986ms elp: 4128ms ||  - 3d %io +bitrate +cpu pool=4 background 
usr:  414ms sys:  655ms elp: 6512ms ||  - 3d %io +bitrate +cpu pool=2 background 
usr:  437ms sys:  518ms elp: 7901ms ||  - 3d %io +bitrate +cpu pool=1 background 

new queued pool impl

EDIT: closed browser and reran bench mark. ~8% improvement

[1] "test 1a - sequential"
usr:  283ms sys:   35ms elp:  414ms ||  - 1a +io %bitrate %cpu foreground 
usr:  130ms sys:   50ms elp:  534ms ||  - 1a +io %bitrate %cpu background1 
usr:  132ms sys:   51ms elp:  542ms ||  - 1a +io %bitrate %cpu background2 
[1] "test 1b - sequential"
usr:  201ms sys:  123ms elp:  324ms ||  - 1b -io +bitrate %cpu foreground 
usr:  284ms sys:  357ms elp: 1692ms ||  - 1b -io +bitrate %cpu background1 
usr:  282ms sys:  331ms elp: 1679ms ||  - 1b -io +bitrate %cpu background2 
[1] "test 2a - sequential"
usr: 2437ms sys:  522ms elp: 2967ms ||  - 2a %io ~bitrate +cpu foreground 
usr:    1ms sys:    5ms elp: 2974ms ||  - 2a %io ~bitrate +cpu background 
usr:    1ms sys:    7ms elp: 3057ms ||  - 2a %io ~bitrate +cpu background2 
[1] "test 3a - parallel"
usr:   68ms sys:   21ms elp:  151ms ||  - 3a +io %bitrate %cpu foreground 
usr:   66ms sys:   76ms elp:  526ms ||  - 3a +io %bitrate %cpu pool=8 background burn-in  
usr:   63ms sys:   87ms elp:   83ms ||  - 3a +io %bitrate %cpu pool=8 background 
usr:   69ms sys:   92ms elp:  111ms ||  - 3a +io %bitrate %cpu pool=4 background 
usr:   54ms sys:   57ms elp:  164ms ||  - 3a +io %bitrate %cpu pool=2 background 
usr:   56ms sys:   54ms elp:  314ms ||  - 3a +io %bitrate %cpu pool=1 background 
[1] "test 3b - parallel"
usr:10911ms sys:  531ms elp:11068ms ||  - 3b %io %bitrate +cpu foreground 
usr:    4ms sys:   11ms elp: 3150ms ||  - 3b %io %bitrate +cpu pool=8 background burn-in  
usr:    3ms sys:    9ms elp: 2510ms ||  - 3b %io %bitrate +cpu pool=8 background 
usr:    4ms sys:    8ms elp: 3061ms ||  - 3b %io %bitrate +cpu pool=6 background 
usr:    3ms sys:    7ms elp: 3325ms ||  - 3b %io %bitrate +cpu pool=4 background 
usr:    4ms sys:    6ms elp: 5594ms ||  - 3b %io %bitrate +cpu pool=2 background 
usr:    4ms sys:    6ms elp:11030ms ||  - 3b %io %bitrate +cpu pool=1 background 
[1] "test 3c - parallel"
usr:11018ms sys:  732ms elp:11368ms ||  - 3c %io +bitrate +cpu foreground  
usr:  171ms sys:  192ms elp: 3127ms ||  - 3c %io +bitrate +cpu pool=8 background burn-in  
usr:  257ms sys:  123ms elp: 2437ms ||  - 3c %io +bitrate +cpu pool=8 background 
usr:  182ms sys:  100ms elp: 2947ms ||  - 3c %io +bitrate +cpu pool=6 background 
usr:  127ms sys:   61ms elp: 3306ms ||  - 3c %io +bitrate +cpu pool=4 background 
usr:   95ms sys:   59ms elp: 5712ms ||  - 3c %io +bitrate +cpu pool=2 background 
usr:   89ms sys:   48ms elp:11436ms ||  - 3c %io +bitrate +cpu pool=1 background 
[1] "test 3d - parallel + r-polars conversion"
usr:  469ms sys:   78ms elp:  592ms ||  - 3d %io +bitrate +cpu foreground  
usr:  502ms sys: 5470ms elp: 1812ms ||  - 3d %io +bitrate +cpu pool=8 background burn-in  
usr:  599ms sys: 6734ms elp: 1593ms ||  - 3d %io +bitrate +cpu pool=8 background 
usr:  532ms sys: 2434ms elp:  906ms ||  - 3d %io +bitrate +cpu pool=6 background 
usr:  434ms sys:  835ms elp:  658ms ||  - 3d %io +bitrate +cpu pool=4 background 
usr:  355ms sys:  387ms elp:  819ms ||  - 3d %io +bitrate +cpu pool=2 background 
usr:  324ms sys:  286ms elp: 1383ms ||  - 3d %io +bitrate +cpu pool=1 background 
sorhawell commented 10 months ago

conclusion as far as I can see: Previous pool impl is equally fast if number of processes is equal to polars workers or for any sequential maps . See "test 3abc pool=8 not burn-in" and "1x+2x"

Previous pool impl was slower in parallel especially for IO bounded tasks for lower number of pool size (see test 3a), because some or most processes would be created and destroyed after each worker iteration. For CPU bound test 3c, previous pool impl is actually faster, but that is because it was never limited on number of processes and could use 8 on this 4 core machine.

In general the IO overhead of background is about 2x more than foreground. The bitrate overhead (inter process communnication) is about 5x (or perhaps up to 100x). But it is tough to compete with arrow zero-cost abstraction. If using r-polars conversion to R vectors which is CPU bound, then bitrate is not especially limiting see test "3d".

The downside of the new impl it has +200 lines of code. The upside it will allow to use background R processes in more scenarios which are not extremely CPU bounded, e.g. for aggregating over groups. Also, users can now set the actual max number of R processes via pl$set_global_rpool()which may be useful to reduce memory usage.

So far old impl seemed to hit deadlock 3 times out of perhaps 100 tests. I have not seen any deadlocks on new pool impl yet. But this is very spurious and data is limited, so who knows. Having multiple workers that contend for two Mutex could perhaps lead to a deadlock if two workers got one each and is waiting for the other. This is just a theory, maybe not an issue.

sorhawell commented 10 months ago

I will write a counter suggestion and we will compare on benchmark. It might be ok with an unbounded pool. However maybe some user would like to bound their mem usage with pool size. Also, polars uses 2x thread of physical cores, e.g. 8 thread on my machine. I don't think it is always beneficial to have that many R sessions open at once. If 8 threads and cap=2. This explains the slowness because about 6 threads must create new processes and destroy them again continuously. Maybe you could patch the unbounded pool to not continuously open and close processes but it seems difficult to me.

I can also make an option for the user to configure whether the upper limit is a soft limit of hard limit.

I could not see an easy way to add a hard limit without some global accounting of total leased/pool process-handlers. How do you find this updated pool impl?

Sicheng-Pan commented 10 months ago

I could not see an easy way to add a hard limit without some global accounting of total leased/pool process-handlers. How do you find this updated pool impl?

@sorhawell I was thinking about this and I came to the conclusion that a counter for leased processes is necessary. Also we would need either a Condvar or a queue of mspc::Sender so that threads can wait for the in-use R processes, and I planned to make this an Option<_> so that we could tell if we have a hard limit (Some(condvar/queue)) or soft limit (None). But I thought this will change a lot to the old implementation, especially because I inevitably need to write a inner struct for the thread pool, so I did not proceed to do this. It seems that you implemented all these except for the Option<_> part, and the code looks pretty good to me. Thanks for the help!

sorhawell commented 10 months ago

@eitsupi I have changed the Makefile make install to no handle dependencies. I guess make requirements could do that. I have added install to make docs because some examples require polars being build as a package to be loaded in background R processes.

sorhawell commented 10 months ago

@Sicheng-Pan As far as I see I think the huge PR is ready to go. Many thanks for your patience!

sorhawell commented 10 months ago

@Sicheng-Pan many thanks !!