Rdatatable / data.table

R's data.table package extends data.frame:
http://r-datatable.com
Mozilla Public License 2.0
3.56k stars 973 forks source link

Enforcing use of multithread (parallelize) when groupby #5200

Open matthewgson opened 2 years ago

matthewgson commented 2 years ago

Hi,

I have a question that is related internals of data.table and potentially related to feature request. (cross posting SO) I have a large data with more than 1 billion observations, and I need to perform some string operations which is slow.

My code is as simple as this:

DT[, var := some_function(var2)]

If I'm not mistaken, data.table uses multithread when it is called with by (maybe not always), and I'm trying to parallelize this operation utilizing this. To do so, I can make an interim grouper variable, such as

DT[, grouper := .I %/% 100]  

and do

DT[, var := some_function(var2), by = grouper]

I tried some benchmarking with a small sample of data, but surprisingly I did not see a performance improvement. However, with one yet another experiment, I found groupby operation improved the speed, and it made me confused.

So my questions are:

FYI, I see that multithreading is enabled with half of my cores when I import data.table, so I guess there's no openMP issue.

MichaelChirico commented 2 years ago

You mentioned string operations -- basically it's hard for us to parallelize anything with strings. Your best bet is probably to parallelize manually.

A representative (smaller-scale, e.g 1e8) example would of course help :)

ColeMiller1 commented 2 years ago

See also #3962.

But AFAIU, generic functions in j are not parallelized. Data.table gains speed because it reuses memory for each by grouping, or for common functions like sum(), internal optimizations happen for multi threading.

I’m not sure uses can force multi-threading but users can choose optimization levels to prevent multi threading or automatic indexing (see ?datatable.optimize or google - I’m not at a computer to check).

jangorecki commented 2 years ago

Finding groups of "by" argument is still parallelized even for UDF function in j

matthewgson commented 2 years ago

If I summarize,

Please let me know if I am mistaken.

Before I realize this my server ran out of vector memory (even though it had 1 TB ram, while the data was taking 200GB) when it was ran without by argument. So I used split-apply manually using doFuture, using split(DT, by='grouper') and foreach, %dopar%. It worked pretty fast.

Thank you much data.table dev team, please feel free to close this issue.

ben-schwen commented 2 years ago

More or less your summary is correct, but gforce optimization is kindy tricky and current optimization won't always kick in although the functions might have a gforce implementation. See also #3815.

There is also the open issue #519 about documentation.

jangorecki commented 1 year ago

That would probably be a big change but alternative parallelization strategy could be used in some case, like UDF in j.

Working example of parallelization of UDF using parallel package is in #5575

x = rnorm(1e5)
n = 500
setDTthreads(1)
system.time(
  th1 <- frollapply(x, n, median, simplify=unlist)
)
#   user  system elapsed
#  4.106   0.008   4.115
setDTthreads(4)
system.time(
  th4 <- frollapply(x, n, median, simplify=unlist)
)
#   user  system elapsed
#  5.778   0.140   1.498
all.equal(th1, th4)
#[1] TRUE