hannes / MonetDBLite

MonetDB reconfigured as a library
108 stars 11 forks source link

Parallel column conversion for query results / appends #14

Open hannes opened 8 years ago

hannes commented 8 years ago

We could probably parallelise dbWriteTable, in particular value conversions. Also on the query side. Because every column is essentially independent. Actually thats a great idea, will probably add this at some point.

importing ten large tables on ten cores is 7x as fast with a monetdb session that allows multiple connections. until monetdblite can compete with multicore imports, monetdb server control code should not be deprecated.

reproducible script below

library(DBI)
library(MonetDBLite)
library(MonetDB.R)

# create ten csv files with 1,000,000 records each
for( j in 1:10 ){
    tf <- tempfile()
    write.csv( mtcars[ rep( 1:32 , 31250 ) , ] , tf , row.names = FALSE )
    assign( paste0( "tf" , j ) , tf )
}

# # # # # monetdblite import of 10,000,000 records
db <- dbConnect( MonetDBLite() )
system.time( {
    for( j in 1:10 ) {
        tablename <- basename( get( paste0( 'tf' , j ) ) )
        dbWriteTable( db , tablename , mtcars[ 0 , ] )
        dbSendUpdate(db, paste0("COPY OFFSET 2 INTO ", tablename, " FROM '", get( paste0( 'tf' , j ) ) , "' using delimiters ',','\\n','\"'  NULL AS ''" ) )
    }
} )

# # # # # external mserver import of 10,000,000 records
batfile <-
    monetdb.server.setup(
                    database.directory = paste0( tempdir() , "/MonetDB" ) ,
                    monetdb.program.path = 
                        ifelse( 
                            .Platform$OS.type == "windows" , 
                            "C:/Program Files/MonetDB/MonetDB5" , 
                            "" 
                        ) ,
                    dbname = "mydb" ,
                    dbport = 50000
    )

dbname <- "mydb"
dbport <- 50000
monetdb.server.start( batfile )
monet.url <- paste0( "monetdb://localhost:" , dbport , "/" , dbname )
mydb <- dbConnect( MonetDB.R() , monet.url , wait = TRUE )
pid <- as.integer( dbGetQuery( mydb , "SELECT value FROM env() WHERE name = 'monet_pid'" )[[1]] )

library(snow)
cl<-makeCluster(10,type="SOCK")

myfun <-
    function( myfile , murl ){

        library(DBI)
        library(MonetDBLite)
        library(MonetDB.R)
        tablename <- basename( myfile ) 
        con <- dbConnect( MonetDB.R() , murl , wait = TRUE )
        dbSendUpdate(con, paste0("COPY OFFSET 2 INTO ", tablename, " FROM '", myfile , "' using delimiters ',','\\n','\"'  NULL AS ''" ) )

        TRUE
    }

system.time({
for( j in 1:10 ) {
    tablename <- basename( get( paste0( 'tf' , j ) ) )
    dbWriteTable( mydb , tablename , mtcars[ 0 , ] )
}
clusterApply(cl,sapply( paste0( 'tf' , 1:10 ) , get ) ,myfun , murl = monet.url) 
})

stopCluster(cl)
monetdb.server.stop( pid )

Results:

# monetdblite time:
# user  system elapsed 
#58.28    2.87   98.41 

# external mserver time:
# user  system elapsed 
#0.04    0.02   14.12
guilhermejacob commented 7 years ago

@ajdamico , 👍!

hannes commented 7 years ago

could use dataflow for this

ajdamico commented 7 years ago

just to clarify: data import parallelization seems most useful as monetdb/mserver improvement and not a monetdblite-specific enhancement? thanks

hannes commented 7 years ago

no in this case its lite-specific because we need to convert sexp's to monetdb columns

hannes commented 7 years ago

way to go here is to add column conversion for query results / appends to MAL plan. Need to come up with reasonable C interface for this.

hannes commented 7 years ago

can't really do this for query results, most time is spent in string columns and those need to be single-threaded because of R's global string hash table...

hannes commented 6 years ago

I like the non-blocking dbSendQuery idea described here: https://github.com/r-dbi/DBI/issues/69 A typical interaction could look like this:

q1 <- dbSendQuery(c1, "COPY INTO t1 ...")
q2 <- dbSendQuery(c2, "COPY INTO t2 ...")
r1 <- dbFetch(q1)
r2 <- dbFetch(q2)

The two queries would run at the same time, but dbFetch would only return if the result of the corresponding query (in this case an "OK" is available. Comments?