HenrikBengtsson / doFuture

:rocket: R package: doFuture - Use Foreach to Parallelize via Future Framework
https://doFuture.futureverse.org
84 stars 6 forks source link

Using doFuture with packages already using foreach #21

Closed hummuscience closed 6 years ago

hummuscience commented 6 years ago

I am trying to use a package that uses foreach (strucchange). The idea is to let a remote worker take care of the processing.

I asked a similar question a while back in the revolving Future (https://github.com/HenrikBengtsson/future/issues/184).

First I tested whether everything works as I want on the local machine with 4 cores as follows:

library("doFuture")
registerDoFuture()

plan(multiprocess)

slopeFinder <- function (x, breaks = 2, range, h = 3, closest_to = 0, hpc = "none") {
    if (closest_to == 0){

    ts.x <- ts(x[range])
    l <- length(ts.x)
    tt <- 1:l
    ts.break <- strucchange::breakpoints(ts.x ~ tt, breaks = breaks, h = h, hpc = hpc)
    ts.bs <- splines::bs(tt, deg=1, knots = ts.break$breakpoints)
    bs.regress <- lm(ts.x ~ ts.bs)
    ts.coefs <- bs.regress$coefficients
    data <- data.frame(Slope =  ts.coefs["ts.bs2"],
                       Breakpoint = ts.break$breakpoints[1],
                       MaxAmp = max(ts.x[ts.break$breakpoints[1]:ts.break$breakpoints[2]]))

    return(data)

    }else{

    ts.x <- ts(x[range])
    l <- length(ts.x)
    tt <- 1:l
    ts.break <- strucchange::breakpoints(ts.x ~ tt, breaks = breaks, h = h)
    closest <- findInterval(closest_to, ts.break$breakpoints)+1
    ts.bs <- splines::bs(tt, deg=1, knots = ts.break$breakpoints)
    bs.regress <- lm(ts.x ~ ts.bs)
    ts.coefs <- bs.regress$coefficients
    data <- data.frame(Slope =  ts.coefs[closest+1],
                       Breakpoint = ts.break$breakpoints[closest],
                       MaxAmp = max(ts.x[ts.break$breakpoints[closest]:ts.break$breakpoints[closest+1]]))
    return(data)

     }         
}

temp.break <- slopeFinder(temp[,2], breaks = 6, h=20, hpc = "foreach")

This works as planned with all cores being used.

When I want to send this to a remote machine, I used the following as a plan:

plan(list(tweak(remote, workers = "monster"), multiprocess))

and instead of <- I used %<-%. If I understood it correctly, the %<-% would peel away the first layer (sending it to a remote machine) and then the function that uses foreach can run the code.

However, it does not work and when I call the object (temp.break) I get the following error:

Error: could not find function "%dopar%"

HenrikBengtsson commented 6 years ago

Turns out to be a bug(*) in strucchange. Here's a minimal example in a fresh R session:

> library("strucchange")
> bp.nile <- breakpoints(Nile ~ 1, hpc = "foreach")
Error in foreach::foreach(i = 1:(n - h + 1)) %dopar% RSSi(i) : 
  could not find function "%dopar%"
> traceback()
2: breakpoints.formula(Nile ~ 1, hpc = "foreach")
1: breakpoints(Nile ~ 1, hpc = "foreach")

The workaround is to make sure 'foreach' is attached before breakpoints() is called, e.g.

> library("foreach")
> bp.nile <- breakpoints(Nile ~ 1, hpc = "foreach")
Warning: executing %dopar% sequentially: no parallel backend registered

In your case with nested, remote futures, the workaround is to attach foreach in the R session where breakpoints() is called, e.g.

slopeFinder <- function (x, breaks = 2, range, h = 3, closest_to = 0, hpc = "none") {
  if (hpc == "foreach") library("foreach")
  ...
}

Details

The strucchange::breakpoints() bug is because it uses only %dopar% in:

RSS.triang <- if(hpc == "none") sapply(1:(n-h+1), RSSi) else foreach::foreach(i = 1:(n-h+1)) %dopar% RSSi(i)

rather than foreach::%dopar%. I'll contact the maintainer about this.

HenrikBengtsson commented 6 years ago

This is now fixed in the next release of strucchange; Achim said he'll try to submit "to CRAN in the not-so-distant future". In the meanwhile, to the workaround I suggest above.

hummuscience commented 6 years ago

I tried your workaround with all different options of placing library("foreach") in the function without much luck...

slopeFinder <- function (x, breaks = 2, range, h = 3, closest_to = 0, hpc = "none") {
    if (hpc == "foreach") {
        library("foreach")
        }

    if (closest_to == 0){

    ts.x <- ts(x[range])
    l <- length(ts.x)
    tt <- 1:l
    ts.break <- strucchange::breakpoints(ts.x ~ tt, breaks = breaks, h = h, hpc = hpc)
    ts.bs <- splines::bs(tt, deg=1, knots = ts.break$breakpoints)
    bs.regress <- lm(ts.x ~ ts.bs)
    ts.coefs <- bs.regress$coefficients
    data <- data.frame(Slope =  ts.coefs["ts.bs2"],
                       Breakpoint = ts.break$breakpoints[1],
                       MaxAmp = max(ts.x[ts.break$breakpoints[1]:ts.break$breakpoints[2]]))

    return(data)

    }else{

    ts.x <- ts(x[range])
    l <- length(ts.x)
    tt <- 1:l
    ts.break <- strucchange::breakpoints(ts.x ~ tt, breaks = breaks, h = h, hpc = hpc)
    closest <- findInterval(closest_to, ts.break$breakpoints)+1
    ts.bs <- splines::bs(tt, deg=1, knots = ts.break$breakpoints)
    bs.regress <- lm(ts.x ~ ts.bs)
    ts.coefs <- bs.regress$coefficients
    data <- data.frame(Slope =  ts.coefs[closest+1],
                       Breakpoint = ts.break$breakpoints[closest],
                       MaxAmp = max(ts.x[ts.break$breakpoints[closest]:ts.break$breakpoints[closest+1]]))
    return(data)

    }
}

temp.break %<-% slopeFinder(temp[,2], breaks = 6, h=20, hpc = "foreach")

When I check temp.break it seems like R is running. When I check the cores used on the remote machine it is only one core, so no parallel computing.

HenrikBengtsson commented 6 years ago

Ok, so the original error was solved, correct?

Try with:

temp.break %<-% {
  doFuture::registerDoFuture()
  slopeFinder(temp[,2], breaks = 6, h=20, hpc = "foreach")
}

It's not a beautiful solution, but should do it.

hummuscience commented 6 years ago

The original error was solved and your new suggestion works! 👍

HenrikBengtsson commented 6 years ago

FYI, I've created a todo on trying to automate doFuture::registerDoFuture() - https://github.com/HenrikBengtsson/doFuture/issues/22. I don't see an obvious solution that can be implemented immediately, so it'll be some time. Thxs for bring this use case to my attention.