Closed ArmanAttaran closed 3 years ago
Never heard before about it but looks very interesting. Thank for filling the request. Yes, it is possible.
thanks let me know if you need any help. if you are lazy you could just run the dplyr code through dbplyr but that would do duckdb a huge injustice. also make sure you set up the pragmas to use multithreading. https://duckdb.org/docs/sql/pragmas the developers are super responsive so im sure they would not mind lending a hand.
Help will be appreciated, otherwise it may not happen very soon. There are other tools waiting to be added, as well as some improvements.
Hi @jangorecki @ArmanAttaran
I never heard of duckdb either so I wanted to give it a test drive. I have some code below to show off some basic benchmarks to get some insight to @jangorecki about the potential performance. I have no clue at this point about optimizing duckdb inside R but I ran with the basic example from the docs. Also, @jangorecki I ran into a data.table error when trying to process 1.7 billion rows. Not sure if that is expected or not. The error code I was running into was this: "Error in gforce(thisEnv, jsub, o, f, len__, irows) : Internal error: Failed to allocate counts or TMP when assigning g in gforce"
Nonetheless, here the code I was running. I tested using 32 threads and 64 threads using AVG / mean and MIN / min across two numeric grouping variables (I used the Kaggle Walmart data set and appended it repeatedly to grow its size for subsequent runs). Threads only affected data.table as I'm unaware if there is a way to alter thread usage in duckdb. However, I didn't see the CPU usage go above 3-4% when running the duckdb operations.
# DuckDB initialization
library("DBI")
con <- dbConnect(duckdb::duckdb(), ":memory:")
# 32 Threads
# data.table initialization
data.table::setDTthreads(threads = parallel::detectCores() / 2)
data.table::getDTthreads(verbose = TRUE)
# omp_get_num_procs() 64
# R_DATATABLE_NUM_PROCS_PERCENT unset (default 50)
# R_DATATABLE_NUM_THREADS unset
# R_DATATABLE_THROTTLE unset (default 1024)
# omp_get_thread_limit() 2147483647
# omp_get_max_threads() 64
# OMP_THREAD_LIMIT unset
# OMP_NUM_THREADS unset
# RestoreAfterFork true
# data.table is using 32 threads with throttle==1024. See ?setDTthreads.
# load walmart data
data <- data.table::fread("https://www.dropbox.com/s/2str3ek4f4cheqi/walmart_train.csv?dl=1")
# Convert IDate to Date
data[, Date := as.Date(Date)]
# Prepare for duckdb
dbWriteTable(con, "data", data, overwrite = TRUE)
# Basic tests
# Avg
microbenchmark::microbenchmark(
times = 1,
dbGetQuery(con, 'SELECT AVG("Weekly_Sales") FROM data GROUP BY "Store","Dept"'),
data[, mean(Weekly_Sales), by = list(Store,Dept)])
# Unit: milliseconds
# expr min lq mean median uq max neval
# dbGetQuery(con, "SELECT AVG(\\"Weekly_Sales\\") FROM data GROUP BY \\"Store\\",\\"Dept\\"") 7.880001 7.880001 7.880001 7.880001 7.880001 7.880001 1
# data[, mean(Weekly_Sales), by = list(Store, Dept)] 7.792000 7.792000 7.792000 7.792000 7.792000 7.792000 1
# Min
microbenchmark::microbenchmark(
times = 1,
dbGetQuery(con, 'SELECT MIN("Weekly_Sales") FROM data GROUP BY "Store","Dept"'),
data[, min(Weekly_Sales), by = list(Store,Dept)])
# Unit: milliseconds
# expr min lq mean median uq max neval
# dbGetQuery(con, "SELECT MIN(\\"Weekly_Sales\\") FROM data GROUP BY \\"Store\\",\\"Dept\\"") 7.224600 7.224600 7.224600 7.224600 7.224600 7.224600 1
# data[, min(Weekly_Sales), by = list(Store, Dept)] 7.431101 7.431101 7.431101 7.431101 7.431101 7.431101 1
# Create 3.7 Million records
data <- data.table::rbindlist(list(data, data, data, data, data, data, data, data))
dbWriteTable(con, "data", data, overwrite = TRUE)
# Basic tests
# Avg
microbenchmark::microbenchmark(
times = 1,
dbGetQuery(con, 'SELECT AVG("Weekly_Sales") FROM data GROUP BY "Store","Dept"'),
data[, mean(Weekly_Sales), by = list(Store,Dept)])
# Unit: milliseconds
# expr min lq mean median uq max neval
# dbGetQuery(con, "SELECT AVG(\\"Weekly_Sales\\") FROM data GROUP BY \\"Store\\",\\"Dept\\"") 50.8000 50.8000 50.8000 50.8000 50.8000 50.8000 1
# data[, mean(Weekly_Sales), by = list(Store, Dept)] 50.5613 50.5613 50.5613 50.5613 50.5613 50.5613 1
# Min
microbenchmark::microbenchmark(
times = 1,
dbGetQuery(con, 'SELECT MIN("Weekly_Sales") FROM data GROUP BY "Store","Dept"'),
data[, min(Weekly_Sales), by = list(Store,Dept)])
# Unit: milliseconds
# expr min lq mean median uq max neval
# dbGetQuery(con, "SELECT MIN(\\"Weekly_Sales\\") FROM data GROUP BY \\"Store\\",\\"Dept\\"") 45.2676 45.2676 45.2676 45.2676 45.2676 45.2676 1
# data[, min(Weekly_Sales), by = list(Store, Dept)] 65.0869 65.0869 65.0869 65.0869 65.0869 65.0869 1
# Create 27 Million records
data <- data.table::rbindlist(list(data, data, data, data, data, data, data, data))
dbWriteTable(con, "data", data, overwrite = TRUE)
# Basic tests
# Avg
microbenchmark::microbenchmark(
times = 1,
dbGetQuery(con, 'SELECT AVG("Weekly_Sales") FROM data GROUP BY "Store","Dept"'),
data[, mean(Weekly_Sales), by = list(Store,Dept)])
# Unit: milliseconds
# expr min lq mean median uq max neval
# dbGetQuery(con, "SELECT AVG(\\"Weekly_Sales\\") FROM data GROUP BY \\"Store\\",\\"Dept\\"") 390.6504 390.6504 390.6504 390.6504 390.6504 390.6504 1
# data[, mean(Weekly_Sales), by = list(Store, Dept)] 276.7906 276.7906 276.7906 276.7906 276.7906 276.7906 1
# Min
microbenchmark::microbenchmark(
times = 1,
dbGetQuery(con, 'SELECT MIN("Weekly_Sales") FROM data GROUP BY "Store","Dept"'),
data[, min(Weekly_Sales), by = list(Store,Dept)])
# Unit: milliseconds
# expr min lq mean median uq max neval
# dbGetQuery(con, "SELECT MIN(\\"Weekly_Sales\\") FROM data GROUP BY \\"Store\\",\\"Dept\\"") 347.6161 347.6161 347.6161 347.6161 347.6161 347.6161 1
# data[, min(Weekly_Sales), by = list(Store, Dept)] 397.8378 397.8378 397.8378 397.8378 397.8378 397.8378 1
# Create 216 Million records
data <- data.table::rbindlist(list(data, data, data, data, data, data, data, data))
dbWriteTable(con, "data", data, overwrite = TRUE)
# Avg
microbenchmark::microbenchmark(
times = 1,
dbGetQuery(con, 'SELECT AVG("Weekly_Sales") FROM data GROUP BY "Store","Dept"'),
data[, mean(Weekly_Sales), by = list(Store,Dept)])
# Unit: seconds
# expr min lq mean median uq max neval
# dbGetQuery(con, "SELECT AVG(\\"Weekly_Sales\\") FROM data GROUP BY \\"Store\\",\\"Dept\\"") 3.117599 3.117599 3.117599 3.117599 3.117599 3.117599 1
# data[, mean(Weekly_Sales), by = list(Store, Dept)] 1.996483 1.996483 1.996483 1.996483 1.996483 1.996483 1
# Min
microbenchmark::microbenchmark(
times = 1,
dbGetQuery(con, 'SELECT MIN("Weekly_Sales") FROM data GROUP BY "Store","Dept"'),
data[, min(Weekly_Sales), by = list(Store,Dept)])
# Unit: seconds
# expr min lq mean median uq max neval
# dbGetQuery(con, "SELECT MIN(\\"Weekly_Sales\\") FROM data GROUP BY \\"Store\\",\\"Dept\\"") 2.767846 2.767846 2.767846 2.767846 2.767846 2.767846 1
# data[, min(Weekly_Sales), by = list(Store, Dept)] 2.969684 2.969684 2.969684 2.969684 2.969684 2.969684 1
# Create 1.7 Billion Million records
data <- data.table::rbindlist(list(data, data, data, data, data, data, data, data))
dbWriteTable(con, "data", data, overwrite = TRUE)
# Avg: data.table error = Error in gforce(thisEnv, jsub, o__, f__, len__, irows) : Internal error: Failed to allocate counts or TMP when assigning g in gforce
microbenchmark::microbenchmark(
times = 1,
dbGetQuery(con, 'SELECT AVG("Weekly_Sales") FROM data GROUP BY "Store","Dept"')
#,data[, mean(Weekly_Sales), by = list(Store,Dept)]
)
# Unit: seconds
# expr min lq mean median uq max neval
# dbGetQuery(con, "SELECT AVG(\\"Weekly_Sales\\") FROM data GROUP BY \\"Store\\",\\"Dept\\"") 24.73299 24.73299 24.73299 24.73299 24.73299 24.73299 1
# Min: data.table error = Error in gforce(thisEnv, jsub, o__, f__, len__, irows) : Internal error: Failed to allocate counts or TMP when assigning g in gforce
microbenchmark::microbenchmark(
times = 1,
dbGetQuery(con, 'SELECT MIN("Weekly_Sales") FROM data GROUP BY "Store","Dept"')
#,data[, min(Weekly_Sales), by = list(Store,Dept)]
)
# Unit: seconds
# expr min lq mean median uq max neval
# dbGetQuery(con, "SELECT MIN(\\"Weekly_Sales\\") FROM data GROUP BY \\"Store\\",\\"Dept\\"") 21.84974 21.84974 21.84974 21.84974 21.84974 21.84974 1
# Close connect and cleanup
dbDisconnect(conn = con)
gc()
# @@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
# @@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
# @@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
# @@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
# clear everything and restart R
# 64 Threads
# data.table initialization
data.table::setDTthreads(threads = parallel::detectCores())
data.table::getDTthreads(verbose = TRUE)
# omp_get_num_procs() 64
# R_DATATABLE_NUM_PROCS_PERCENT unset (default 50)
# R_DATATABLE_NUM_THREADS unset
# R_DATATABLE_THROTTLE unset (default 1024)
# omp_get_thread_limit() 2147483647
# omp_get_max_threads() 64
# OMP_THREAD_LIMIT unset
# OMP_NUM_THREADS unset
# RestoreAfterFork true
# data.table is using 64 threads with throttle==1024. See ?setDTthreads.
# load walmart data
data <- data.table::fread("https://www.dropbox.com/s/2str3ek4f4cheqi/walmart_train.csv?dl=1")
# DuckDB initialization
library("DBI")
con <- dbConnect(duckdb::duckdb(), ":memory:")
# Convert IDate to Date
data[, Date := as.Date(Date)]
# Prepare for duckdb
dbWriteTable(con, "data", data, overwrite = TRUE)
# Basic tests
# Avg
microbenchmark::microbenchmark(
times = 1,
dbGetQuery(con, 'SELECT AVG("Weekly_Sales") FROM data GROUP BY "Store","Dept"'),
data[, mean(Weekly_Sales), by = list(Store,Dept)])
# Unit: milliseconds
# expr min lq mean median uq max neval
# dbGetQuery(con, "SELECT AVG(\\"Weekly_Sales\\") FROM data GROUP BY \\"Store\\",\\"Dept\\"") 7.791700 7.791700 7.791700 7.791700 7.791700 7.791700 1
# data[, mean(Weekly_Sales), by = list(Store, Dept)] 9.424201 9.424201 9.424201 9.424201 9.424201 9.424201 1
# Min
microbenchmark::microbenchmark(
times = 1,
dbGetQuery(con, 'SELECT MIN("Weekly_Sales") FROM data GROUP BY "Store","Dept"'),
data[, min(Weekly_Sales), by = list(Store,Dept)])
# Unit: milliseconds
# expr min lq mean median uq max neval
# dbGetQuery(con, "SELECT MIN(\\"Weekly_Sales\\") FROM data GROUP BY \\"Store\\",\\"Dept\\"") 7.237801 7.237801 7.237801 7.237801 7.237801 7.237801 1
# data[, min(Weekly_Sales), by = list(Store, Dept)] 8.878701 8.878701 8.878701 8.878701 8.878701 8.878701 1
# Create 3.7 Million records
data <- data.table::rbindlist(list(data, data, data, data, data, data, data, data))
dbWriteTable(con, "data", data, overwrite = TRUE)
# Basic tests
# Avg
microbenchmark::microbenchmark(
times = 1,
dbGetQuery(con, 'SELECT AVG("Weekly_Sales") FROM data GROUP BY "Store","Dept"'),
data[, mean(Weekly_Sales), by = list(Store,Dept)])
# Unit: milliseconds
# expr min lq mean median uq max neval
# dbGetQuery(con, "SELECT AVG(\\"Weekly_Sales\\") FROM data GROUP BY \\"Store\\",\\"Dept\\"") 51.3822 51.3822 51.3822 51.3822 51.3822 51.3822 1
# data[, mean(Weekly_Sales), by = list(Store, Dept)] 65.8879 65.8879 65.8879 65.8879 65.8879 65.8879 1
# Min
microbenchmark::microbenchmark(
times = 1,
dbGetQuery(con, 'SELECT MIN("Weekly_Sales") FROM data GROUP BY "Store","Dept"'),
data[, min(Weekly_Sales), by = list(Store,Dept)])
# Unit: milliseconds
# expr min lq mean median uq max neval
# dbGetQuery(con, "SELECT MIN(\\"Weekly_Sales\\") FROM data GROUP BY \\"Store\\",\\"Dept\\"") 45.6323 45.6323 45.6323 45.6323 45.6323 45.6323 1
# data[, min(Weekly_Sales), by = list(Store, Dept)] 78.5993 78.5993 78.5993 78.5993 78.5993 78.5993 1
# Create 27 Million records
data <- data.table::rbindlist(list(data, data, data, data, data, data, data, data))
dbWriteTable(con, "data", data, overwrite = TRUE)
# Basic tests
# Avg
microbenchmark::microbenchmark(
times = 1,
dbGetQuery(con, 'SELECT AVG("Weekly_Sales") FROM data GROUP BY "Store","Dept"'),
data[, mean(Weekly_Sales), by = list(Store,Dept)])
# Unit: milliseconds
# expr min lq mean median uq max neval
# dbGetQuery(con, "SELECT AVG(\\"Weekly_Sales\\") FROM data GROUP BY \\"Store\\",\\"Dept\\"") 389.5782 389.5782 389.5782 389.5782 389.5782 389.5782 1
# data[, mean(Weekly_Sales), by = list(Store, Dept)] 299.4557 299.4557 299.4557 299.4557 299.4557 299.4557 1
# Min
microbenchmark::microbenchmark(
times = 1,
dbGetQuery(con, 'SELECT MIN("Weekly_Sales") FROM data GROUP BY "Store","Dept"'),
data[, min(Weekly_Sales), by = list(Store,Dept)])
# Unit: milliseconds
# expr min lq mean median uq max neval
# dbGetQuery(con, "SELECT MIN(\\"Weekly_Sales\\") FROM data GROUP BY \\"Store\\",\\"Dept\\"") 345.8003 345.8003 345.8003 345.8003 345.8003 345.8003 1
# data[, min(Weekly_Sales), by = list(Store, Dept)] 421.7540 421.7540 421.7540 421.7540 421.7540 421.7540 1
# Create 216 Million records
data <- data.table::rbindlist(list(data, data, data, data, data, data, data, data))
dbWriteTable(con, "data", data, overwrite = TRUE)
# Avg
microbenchmark::microbenchmark(
times = 1,
dbGetQuery(con, 'SELECT AVG("Weekly_Sales") FROM data GROUP BY "Store","Dept"'),
data[, mean(Weekly_Sales), by = list(Store,Dept)])
# Unit: seconds
# expr min lq mean median uq max neval
# dbGetQuery(con, "SELECT AVG(\\"Weekly_Sales\\") FROM data GROUP BY \\"Store\\",\\"Dept\\"") 3.121465 3.121465 3.121465 3.121465 3.121465 3.121465 1
# data[, mean(Weekly_Sales), by = list(Store, Dept)] 2.080287 2.080287 2.080287 2.080287 2.080287 2.080287 1
# Min
microbenchmark::microbenchmark(
times = 1,
dbGetQuery(con, 'SELECT MIN("Weekly_Sales") FROM data GROUP BY "Store","Dept"'),
data[, min(Weekly_Sales), by = list(Store,Dept)])
# Unit: seconds
# expr min lq mean median uq max neval
# dbGetQuery(con, "SELECT MIN(\\"Weekly_Sales\\") FROM data GROUP BY \\"Store\\",\\"Dept\\"") 2.759468 2.759468 2.759468 2.759468 2.759468 2.759468 1
# data[, min(Weekly_Sales), by = list(Store, Dept)] 3.045216 3.045216 3.045216 3.045216 3.045216 3.045216 1
# Close connect and cleanup
dbDisconnect(conn = con)
gc()
@AdrianAntico Thank you for benchmarks. As or data.table error, it is a regression, reported here: https://github.com/Rdatatable/data.table/issues/4818
@AdrianAntico you need to set duckdb to use multithreading here https://duckdb.org/docs/sql/pragmas
@ArmanAttaran inside R I ran the below but given the warning I'm not sure if that is how to run PRAGMA statements. Can you let me know if that's correct and if not, how to run those?
*Edit - looks like this is the way?
DBI::dbSendQuery(conn = con, 'PRAGMA threads=32')
<duckdb_result f5760 connection=7c910 statement='PRAGMA threads=32'>
not this way
dbGetQuery(con, 'PRAGMA threads=32')
data frame with 0 columns and 0 rows
Warning message:
In dbFetch(rs, n = n, ...) :
Should not call dbFetch() on results that do not come from SELECT
Yes, DBI queries which are not retrieving results should be sent rather than get. Not sure if using that from R is the proper way as there might be significant overhead related to pulling query results into R memory. That requires exploration, or comments from someone who already explored that.
@jangorecki - thanks for the info on the tmp issue.
It would be nice to start incorporating other operations as well. For example, speed to build lags or rolling stats is probably the most important to me for forecasting applications. While duckdb appears to be solid for aggregation operations, the significant difference in performance for lag operations is too much to overcome to make use of. The code below generates lags 1 through 10 (I typically generate way more than that).
@ArmanAttaran - let me know if the query I set up below is not correct or can be optimized because the run times aren't promising as is. data.table mean run time: 95 milliseconds (357x faster) duckdb mean run time: 33,969 milliseconds
# DuckDB initialization
con <- DBI::dbConnect(duckdb::duckdb(), ":memory:")
# Set to 32 Threads
# Set duckdb multithreading
DBI::dbSendQuery(conn = con, 'PRAGMA threads=32')
# data.table initialization
data.table::setDTthreads(threads = parallel::detectCores() / 2)
data.table::getDTthreads(verbose = TRUE)
# omp_get_num_procs() 64
# R_DATATABLE_NUM_PROCS_PERCENT unset (default 50)
# R_DATATABLE_NUM_THREADS unset
# R_DATATABLE_THROTTLE unset (default 1024)
# omp_get_thread_limit() 2147483647
# omp_get_max_threads() 64
# OMP_THREAD_LIMIT unset
# OMP_NUM_THREADS unset
# RestoreAfterFork true
# data.table is using 32 threads with throttle==1024. See ?setDTthreads.
# load walmart data
data <- data.table::fread("https://www.dropbox.com/s/2str3ek4f4cheqi/walmart_train.csv?dl=1")
# Convert IDate to Date
data[, Date := as.Date(Date)]
# Prepare for duckdb
DBI::dbWriteTable(con, "data", data, overwrite = TRUE)
Query <- paste0('
SELECT
*,
lag(Weekly_Sales, 1) OVER(PARTITION BY "Store", "Dept" ORDER BY "Date" ASC) AS Lag_1,
lag(Weekly_Sales, 2) OVER(PARTITION BY "Store", "Dept" ORDER BY "Date" ASC) AS Lag_2,
lag(Weekly_Sales, 3) OVER(PARTITION BY "Store", "Dept" ORDER BY "Date" ASC) AS Lag_3,
lag(Weekly_Sales, 4) OVER(PARTITION BY "Store", "Dept" ORDER BY "Date" ASC) AS Lag_4,
lag(Weekly_Sales, 5) OVER(PARTITION BY "Store", "Dept" ORDER BY "Date" ASC) AS Lag_5,
lag(Weekly_Sales, 6) OVER(PARTITION BY "Store", "Dept" ORDER BY "Date" ASC) AS Lag_6,
lag(Weekly_Sales, 7) OVER(PARTITION BY "Store", "Dept" ORDER BY "Date" ASC) AS Lag_7,
lag(Weekly_Sales, 8) OVER(PARTITION BY "Store", "Dept" ORDER BY "Date" ASC) AS Lag_8,
lag(Weekly_Sales, 9) OVER(PARTITION BY "Store", "Dept" ORDER BY "Date" ASC) AS Lag_9,
lag(Weekly_Sales, 10) OVER(PARTITION BY "Store", "Dept" ORDER BY "Date" ASC) AS Lag_10
FROM
data')
# Avg
microbenchmark::microbenchmark(
times = 30,
DBI::dbGetQuery(con, Query),
data[, paste0("Lag_", 1L:10L) := data.table::shift(x = Weekly_Sales, n = 1L:10L, type = "lag"), by = list(Store,Dept)])
# Unit: milliseconds
# expr min lq mean median uq max neval
# DBI::dbGetQuery(con, Query)
# 33882.7288 33924.5433 33969.42098 33963.94245 34003.2308 34098.5795 30
# data[, `:=`(paste0("Lag_", 1L:10L), data.table::shift(x = Weekly_Sales, n = 1L:10L, type = "lag")), by = list(Store, Dept)]
# 75.5192 77.7901 95.12174 86.01295 88.5566 252.3392 30
Ok give me some time I’ll look at on the weekend On Fri, Dec 4, 2020 at 4:16 PM Adrian notifications@github.com wrote:
@jangorecki https://github.com/jangorecki - thanks for the info on the tmp issue.
It would be nice to start incorporating other operations as well. For example, speed to build lags or rolling stats is probably the most important to me for forecasting applications. While duckdb appears to be solid for aggregation operations, the significant difference in performance for lag operations is too much to overcome to make use of. The code below generates lags 1 through 10 (I typically generate way more than that).
@ArmanAttaran https://github.com/ArmanAttaran - let me know if the query I set up below is not correct or can be optimized because the run times aren't promising as is. data.table mean run time: 95 milliseconds (357x faster) duckdb mean run time: 33,969 milliseconds
DuckDB initialization
con <- DBI::dbConnect(duckdb::duckdb(), ":memory:")
Set to 32 Threads
Set duckdb multithreading
DBI::dbSendQuery(conn = con, 'PRAGMA threads=32')
data.table initialization
data.table::setDTthreads(threads = parallel::detectCores() / 2) data.table::getDTthreads(verbose = TRUE)
omp_get_num_procs() 64
R_DATATABLE_NUM_PROCS_PERCENT unset (default 50)
R_DATATABLE_NUM_THREADS unset
R_DATATABLE_THROTTLE unset (default 1024)
omp_get_thread_limit() 2147483647
omp_get_max_threads() 64
OMP_THREAD_LIMIT unset
OMP_NUM_THREADS unset
RestoreAfterFork true
data.table is using 32 threads with throttle==1024. See ?setDTthreads.
load walmart data
data <- data.table::fread("https://www.dropbox.com/s/2str3ek4f4cheqi/walmart_train.csv?dl=1")
Convert IDate to Date
data[, Date := as.Date(Date)]
Prepare for duckdb
DBI::dbWriteTable(con, "data", data, overwrite = TRUE)
Query <- paste0(' SELECT *, lag(Weekly_Sales, 1) OVER(PARTITION BY "Store", "Dept" ORDER BY "Date" ASC) AS Lag_1, lag(Weekly_Sales, 2) OVER(PARTITION BY "Store", "Dept" ORDER BY "Date" ASC) AS Lag_2, lag(Weekly_Sales, 3) OVER(PARTITION BY "Store", "Dept" ORDER BY "Date" ASC) AS Lag_3, lag(Weekly_Sales, 4) OVER(PARTITION BY "Store", "Dept" ORDER BY "Date" ASC) AS Lag_4, lag(Weekly_Sales, 5) OVER(PARTITION BY "Store", "Dept" ORDER BY "Date" ASC) AS Lag_5, lag(Weekly_Sales, 6) OVER(PARTITION BY "Store", "Dept" ORDER BY "Date" ASC) AS Lag_6, lag(Weekly_Sales, 7) OVER(PARTITION BY "Store", "Dept" ORDER BY "Date" ASC) AS Lag_7, lag(Weekly_Sales, 8) OVER(PARTITION BY "Store", "Dept" ORDER BY "Date" ASC) AS Lag_8, lag(Weekly_Sales, 9) OVER(PARTITION BY "Store", "Dept" ORDER BY "Date" ASC) AS Lag_9, lag(Weekly_Sales, 10) OVER(PARTITION BY "Store", "Dept" ORDER BY "Date" ASC) AS Lag_10 FROM data')
Avg
microbenchmark::microbenchmark( times = 30, DBI::dbGetQuery(con, Query), data[, paste0("Lag_", 1L:10L) := data.table::shift(x = Weekly_Sales, n = 1L:10L, type = "lag"), by = list(Store,Dept)])
Unit: milliseconds
expr min lq mean median uq max neval
DBI::dbGetQuery(con, Query)
33882.7288 33924.5433 33969.42098 33963.94245 34003.2308 34098.5795 30
data[,
:=
(paste0("Lag_", 1L:10L), data.table::shift(x = Weekly_Sales, n = 1L:10L, type = "lag")), by = list(Store, Dept)]75.5192 77.7901 95.12174 86.01295 88.5566 252.3392 30
— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/h2oai/db-benchmark/issues/170#issuecomment-739025313, or unsubscribe https://github.com/notifications/unsubscribe-auth/AB3YPENNWRU6SSBKH5AXIXTSTFGUBANCNFSM4UJ4JXZA .
@ArmanAttaran
While duckdb appears to be solid for aggregation operations, the significant difference in performance for lag operations is too much to overcome to make use of.
This is an example where overhead of pulling query results to R will be more clearly visible. Lag and rolling stats returns the same number of rows as they got on input, so the data to copy from duckdb engine to R is bigger than in case of aggregation, where output is usually much smaller than input. Although I haven't made any tests so I cannot confirm if that is the case here.
As for the "tasks" like lag and rolling statistics, I would like to encourage you to create an issue requesting that. Ideally explaining your use case and providing example code. I would like this project to be more community driven, so the more upvotes an issue will get the higher priority it will be.
@jangorecki Thanks for the response. I'll put something together for the request.
@ArmanAttaran
While duckdb appears to be solid for aggregation operations, the significant difference in performance for lag operations is too much to overcome to make use of.
This is an example where overhead of pulling query results to R will be more clearly visible. Lag and rolling stats returns the same number of rows as they got on input, so the data to copy from duckdb engine to R is bigger than in case of aggregation, where output is usually much smaller than input. Although I haven't made any tests so I cannot confirm if that is the case here.
As for the "tasks" like lag and rolling statistics, I would like to encourage you to create an issue requesting that. Ideally explaining your use case and providing example code. I would like this project to be more community driven, so the more upvotes an issue will get the higher priority it will be.
I'm not familiar with lag
?
But if it is like you say, that it is passing all the data through then it is really wrong approach with DuckDB. DuckDB is not fast at fetching rows as it is not a row store, but columnar store. The more columns you have the slower the fetching as it has to jump between columns on disk. So, to compare DuckDB you should run aggregation queries.
These results look pretty good for both systems, indeed the setting of threads=...
is necessary to make this comparison fair. The only query where I can see big differences is the last one with the window function lag
. I have a question regarding the equivalent in data.table
: The code used there is
data.table::shift(x = Weekly_Sales, n = 1L:10L, type = "lag")), by = list(Store, Dept)]
Does this include an implicit sorting step? Is the data kept in insertion order which happens to be correct? Because the window expression used for DuckDB does include such a sorting step, which is of course not free.
@hannesmuehleisen data.table doesn't do a sorting step but it also does sorting really fast. I just updated the code snippet for the lag and moving average operation below. Note that I re-shuffled after the first run and that I only ran the operation once for both tests since the time to sort already sorted data might bias the result. Nonetheless, I think @jangorecki was correct in that the benchmark times look bad for duckdb due to the load data into R step that is taking place.
# DuckDB initialization
con <- DBI::dbConnect(duckdb::duckdb(), ":memory:")
# Set to 32 Threads
# Set duckdb multithreading
DBI::dbSendQuery(conn = con, 'PRAGMA threads=32')
# data.table initialization
data.table::setDTthreads(threads = parallel::detectCores() / 2)
data.table::getDTthreads(verbose = TRUE)
# omp_get_num_procs() 64
# R_DATATABLE_NUM_PROCS_PERCENT unset (default 50)
# R_DATATABLE_NUM_THREADS unset
# R_DATATABLE_THROTTLE unset (default 1024)
# omp_get_thread_limit() 2147483647
# omp_get_max_threads() 64
# OMP_THREAD_LIMIT unset
# OMP_NUM_THREADS unset
# RestoreAfterFork true
# data.table is using 32 threads with throttle==1024. See ?setDTthreads.
# load walmart data
data <- data.table::fread("https://www.dropbox.com/s/2str3ek4f4cheqi/walmart_train.csv?dl=1")
# Convert IDate to Date
data[, Date := as.Date(Date)]
# Shuffle data
data <- data[order(runif(.N))]
# Prepare for duckdb
DBI::dbWriteTable(con, "data", data, overwrite = TRUE)
# Create query
Query <- paste0('
SELECT
*,
lag(Weekly_Sales, 1) OVER(PARTITION BY "Store", "Dept" ORDER BY "Date" ASC) AS Lag_1,
lag(Weekly_Sales, 2) OVER(PARTITION BY "Store", "Dept" ORDER BY "Date" ASC) AS Lag_2,
lag(Weekly_Sales, 3) OVER(PARTITION BY "Store", "Dept" ORDER BY "Date" ASC) AS Lag_3,
lag(Weekly_Sales, 4) OVER(PARTITION BY "Store", "Dept" ORDER BY "Date" ASC) AS Lag_4,
lag(Weekly_Sales, 5) OVER(PARTITION BY "Store", "Dept" ORDER BY "Date" ASC) AS Lag_5,
lag(Weekly_Sales, 6) OVER(PARTITION BY "Store", "Dept" ORDER BY "Date" ASC) AS Lag_6,
lag(Weekly_Sales, 7) OVER(PARTITION BY "Store", "Dept" ORDER BY "Date" ASC) AS Lag_7,
lag(Weekly_Sales, 8) OVER(PARTITION BY "Store", "Dept" ORDER BY "Date" ASC) AS Lag_8,
lag(Weekly_Sales, 9) OVER(PARTITION BY "Store", "Dept" ORDER BY "Date" ASC) AS Lag_9,
lag(Weekly_Sales, 10) OVER(PARTITION BY "Store", "Dept" ORDER BY "Date" ASC) AS Lag_10
FROM
data')
# Create data.table function
Sort_Lag <- function() {
data.table::setorderv(x = data, cols = c("Store", "Dept", "Date"), order = c(1, 1, -1))
data[, paste0("Lag_", 1L:10L) := data.table::shift(x = Weekly_Sales, n = 1L:10L, type = "lag"), by = list(Store,Dept)]
}
# Lags
microbenchmark::microbenchmark(
times = 1,
DBI::dbGetQuery(con, Query),
Sort_Lag())
# Unit: milliseconds
# expr min lq mean median uq max neval
# DBI::dbGetQuery(con, Query)
# 33882.7288 33924.5433 33969.42098 33963.94245 34003.2308 34098.5795 30
# data[, `:=`(paste0("Lag_", 1L:10L), data.table::shift(x = Weekly_Sales, n = 1L:10L, type = "lag")), by = list(Store, Dept)]
# 75.5192 77.7901 95.12174 86.01295 88.5566 252.3392 30
Query <- paste0('
SELECT
*,
avg(Weekly_Sales, 2) OVER(PARTITION BY "Store", "Dept" ORDER BY "Date" ASC) AS Lag_2,
avg(Weekly_Sales, 3) OVER(PARTITION BY "Store", "Dept" ORDER BY "Date" ASC) AS Lag_3,
avg(Weekly_Sales, 4) OVER(PARTITION BY "Store", "Dept" ORDER BY "Date" ASC) AS Lag_4,
avg(Weekly_Sales, 5) OVER(PARTITION BY "Store", "Dept" ORDER BY "Date" ASC) AS Lag_5,
avg(Weekly_Sales, 6) OVER(PARTITION BY "Store", "Dept" ORDER BY "Date" ASC) AS Lag_6,
avg(Weekly_Sales, 7) OVER(PARTITION BY "Store", "Dept" ORDER BY "Date" ASC) AS Lag_7,
avg(Weekly_Sales, 8) OVER(PARTITION BY "Store", "Dept" ORDER BY "Date" ASC) AS Lag_8,
avg(Weekly_Sales, 9) OVER(PARTITION BY "Store", "Dept" ORDER BY "Date" ASC) AS Lag_9,
avg(Weekly_Sales, 10) OVER(PARTITION BY "Store", "Dept" ORDER BY "Date" ASC) AS Lag_10
FROM
data')
# Create function
Sort_MA <- function() {
data.table::setorderv(x = data, cols = c("Store", "Dept", "Date"), order = c(1, 1, -1))
data[, paste0("MA_", 2L:10L) := data.table::frollmean(x = Weekly_Sales, n = 2L:10L, fill = NA, algo = "fast", na.rm = FALSE, hasNA = FALSE, adaptive = FALSE), by = list(Store,Dept)]
}
# Remove lag columns
data <- data[, .SD, .SDcols = c("Store", "Dept", "Date", "Weekly_Sales", "IsHoliday")]
# Shuffle data
data <- data[order(runif(.N))]
# Prepare for duckdb
DBI::dbWriteTable(con, "data", data, overwrite = TRUE)
# Moving averages
microbenchmark::microbenchmark(
times = 1,
DBI::dbGetQuery(con, Query),
Sort_MA())
# Unit: milliseconds
# expr min lq mean median uq max neval
# DBI::dbGetQuery(con, Query) 35319.0412 35319.0412 35319.0412 35319.0412 35319.0412 35319.0412 1
# Sort_MA() 577.6384 577.6384 577.6384 577.6384 577.6384 577.6384 1
We just merged some improvements to window functions (https://github.com/cwida/duckdb/pull/1500), when running with those the timings look as follows on our machine (with 8 threads):
1491.62164 1576.14964 1611.20061 1621.64613 1643.79515 1685.7462 30
84.06547 87.35325 91.70508 88.16163 90.50311 120.1314 30
Looks much better I'd say!
@hannah-tillman I can see the difference to some extent on my side but the run time is still overwhelmed by pulling the data into R memory after the calculations are complete. Previous run times for DuckDB was roughly 33k milliseconds and now I'm seeing 29.5k milliseconds.
Getting data into R fast is a very important task which is why I tend to store flat files for machine learning scoring purposes (that's what I do professionally). Pulling data out of most DB systems is pretty slow while data.table::fread() is super fast. For on-demand scoring tasks, there is usually some upper limit of time to deliver predictions and the less time spent going from a warehouse to R (or Python / Julia) means that more time can be spent on recreating features (feature engineering) before model scoring, and feature engineering is where I tend to get the most uplift from an ML-performance perspective.
I'd love to be able to take advantage of the DB benefits of storing data (versus flat files) and I'd be thrilled if DuckDB could find a way to be a leader in this area. Is there any way the dev team could prioritize the data transfer from DuckDB to R to be as fast as data.table::fread()? I would think the solution would be relatively similar for Python and Julia as well, which would make a lot of uses really happy.
@AdrianAntico I fully agree pulling data into R is important. I think DuckDB is already the fastest SQL database wrt transfer into R (by far). Whether this can be as fast as fread
remains to be seen. We don't only integrate with R, but also with Pandas etc. How many rows are you pulling exactly? Strings? I would not be surprised if we spend too much time in Rf_mkCharLenCE
. We could potentially address this with an ALTREP
lazy conversion.
@hannesmuehleisen For the tables that are of concern we're looking at between 10M and 300M records. The tables have a mix of numeric, int, categorical (lower cardinality), string (free hand comment types), date, and datetimes types. The company I'm at is a microsoft shop so I can do some run time comparisons. I've tried working with micosoft's bcp cli and even their ssis pacakges to download data as flat files and then loading via data.table::fread() but they are a headache to get working properly in a short time.
I've implemented a prototype ALTREP
version of string transfer from DuckDB into R in https://github.com/hannesmuehleisen/duckdb/tree/altrepstrings . What it does is delay conversion of strings into SEXPs for DuckDB query results until they are actually accessed. We will do some more testing on this and then merge into master since it's clearly worth it. @AdrianAntico , maybe you'd like to give it a try.
Overall, is there anything else blocking the addition of DuckDB on the db-benchmark website? CC @jangorecki
@hannesmuehleisen I tried running the below and ran into an error. Do you have any ideas about how to proceed?
devtools::install_github(repo = "hannesmuehleisen/duckdb", subdir = "tools/Rpkg", dependencies = FALSE)
"> devtools::install_github(repo = "hannesmuehleisen/duckdb", subdir = "tools/Rpkg", dependencies = FALSE) Downloading GitHub repo hannesmuehleisen/duckdb@master √ checking for file 'C:\Users\Bizon\AppData\Local\Temp\RtmpEBqDTr\remotes2fb012fe2d20\hannesmuehleisen-duckdb-99a6f64\tools\rpkg/DESCRIPTION'
Installing package into ‘C:/Users/Bizon/Documents/R/win-library/4.0’ (as ‘lib’ is unspecified)
^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
compilation terminated. make: *** [C:/PROGRA~1/R/R-40~1.3/etc/x64/Makeconf:229: duckdbr.o] Error 1 ERROR: compilation failed for package 'duckdb'
Yeah the devtools::install_github
route does not work yet, just clone the repo, go into that subdirectory and call R CMD INSTALL .
@hannesmuehleisen only the lack of time for this project at the moment.
@hannesmuehleisen only the lack of time for this project at the moment.
Can I do anything to help?
I'll get started on a PR or are there any political reasons not to add DuckDB?
See #200
is it possible to add duckdb to this test ?