stefan-m-lenz / JuliaConnectoR

A functionally oriented interface for calling Julia from R
Other
100 stars 6 forks source link

How to Run Processes in Parallel Using `foreach` and `doParallel`? #19

Closed TheCedarPrince closed 1 year ago

TheCedarPrince commented 1 year ago

Hi @stefan-m-lenz ,

I was recently experimenting with JuliaConnectoR and I was wondering if you could help me with trying to parallelize some code using the foreach and doParallel packages. I am using Julia 1.8.0 and the latest release of JuliaConnectoR as of now and am on a Linux machine. Here is a reproducible code example I was attempting:

library(doParallel)
library(dplyr)
library(foreach)
library(JuliaConnectoR)
library(parallel)
library(tibble)

Sys.setenv("JULIA_NUM_THREADS" = parallel::detectCores())
startJuliaServer()

pkg <- juliaImport("Pkg")

pkg$activate("OHDSI", shared = TRUE) 

pkg$add(c("HealthSampleData", "OMOPCDMCohortCreator", "SQLite"))

hsd <- juliaImport("HealthSampleData")

eunomia <- hsd$Eunomia()

slt <- juliaImport("SQLite")

conn <- slt$DB(eunomia)

occ <- juliaImport("OMOPCDMCohortCreator")

occ$GenerateDatabaseDetails(juliaEval(":sqlite"), "main")

occ$GenerateTables(conn)

indices <- c(1, 2, 3, 4, 5)
starts <- c(1, 10, 20, 30, 40)
ends <- c(9, 19, 29, 39, 49)

myCluster <- makeCluster(4, type = "FORK")
registerDoParallel(myCluster)

system.time({
    values <- foreach(idx = indices) %dopar% { 
        occ$GetPatientGender(starts[idx]:ends[idx], conn)
    }
})

stopCluster(myCluster)

I am able to successfully run all my code up until the foreach statement where I attempt to create an spawn a parallel process. Unfortunately, this does not terminate and my R REPL hangs on this line. I do not know what is happening as my computer does report multiple R processes running and high CPU usage by these sub-processes. I am not married to using foreach and doParallel for my work but do want to get parallel processing working with my Julia package, OMOPCDMCohortCreator.

Thanks and I look forward to hearing back from you! Let me know if I can provide additional information. :)

TheCedarPrince commented 1 year ago

Hey @stefan-m-lenz , I actually got the parallel processing to work as I had hoped! Here is an example that you could run on your own machine:

library(doParallel)
library(foreach)
library(JuliaConnectoR)
library(parallel)

Sys.setenv("JULIA_NUM_THREADS" = parallel::detectCores(), "DATADEPS_ALWAYS_ACCEPT" = TRUE)

myCluster <- makeCluster(4, type = "FORK")
registerDoParallel(myCluster)

pkg <- juliaImport("Pkg")
pkg$activate("OHDSI", shared = TRUE) 
pkg$add(c("OMOPCDMCohortCreator", "HealthSampleData", "SQLite"))

hsd <- juliaImport("HealthSampleData")
eunomia <- hsd$Eunomia()

indices <- c(1, 2, 3, 4, 5)
starts <- c(1, 1, 1, 1, 1)
ends <- c(4000, 4000, 4000, 4000, 4000)

values <- foreach(idx = indices) %dopar% { 

    pkg <- juliaImport("Pkg")
    pkg$activate("OHDSI", shared = TRUE) 

    hsd <- juliaImport("HealthSampleData")
    slt <- juliaImport("SQLite")
    occ <- juliaImport("OMOPCDMCohortCreator")

    eunomia <- hsd$Eunomia()
    conn <- slt$DB(eunomia)

    occ$GenerateDatabaseDetails(juliaEval(":sqlite"), "main")
    occ$GenerateTables(conn)

    data.frame(occ$GetPatientGender(starts[idx]:ends[idx], conn))
}

stopCluster(myCluster)

A question that I had is this: is it possible to somehow better parallelize this? I had thought with the "FORK" option, the state for each R session would be copied to each thread. It looks like it does for R objects but for Julia objects/state, it would appear not to be the case which is why I have to put the following lines within the foreach call:

    pkg <- juliaImport("Pkg")
    pkg$activate("OHDSI", shared = TRUE) 

    hsd <- juliaImport("HealthSampleData")
    slt <- juliaImport("SQLite")
    occ <- juliaImport("OMOPCDMCohortCreator")

    eunomia <- hsd$Eunomia()
    conn <- slt$DB(eunomia)

    occ$GenerateDatabaseDetails(juliaEval(":sqlite"), "main")
    occ$GenerateTables(conn)

Ideally, it would be nice to have the Julia objects somehow forked too but I am not sure if that is possible. This way, precompilation won't have to run repeatedly across R-threads for the Julia operations being run. Thanks!

stefan-m-lenz commented 1 year ago

HI @TheCedarPrince I have to say that the use of the "FORK" option is maybe not advised. The JuliaConnector package holds a reference to the TCP connection of the Julia process. This is copied across processes if the R process is forked. Copying a TCP connection object across processes is not safe. However, you achieve by calling startJuliaServer that all new R processes that are spawned from the main R process access the same Julia session but run on different threads. This means that no additional precompiling is necessary and global objects can be shared. (But of course be aware about issues of thread-safety when accessing global objects in Julia from different R processes/ Julia threads.)

I probably should add as a caveat to the documentation that you should maybe not use process forking on Linux when using the JuliaConnectoR. I thought I did somewhere but I can't find it. I need to think it through again first.

What can also lead to problems is the fact that you call juliaImport in the foreach block. If precompilation is required, this might happen in parallel here because all threads or processes determine that precompilation is required and start it. But Julia can't handle that different processes or threads simultaneosly precompile the same package. It is better to call juliaImport once before the parallelization to ensure that no packages need to be precompiled later on.

You should also move the pkg$activate out of the foreach because it is only necessary once in the Julia process, which is shared among the R processes.