HenrikBengtsson / future

:rocket: R package: future: Unified Parallel and Distributed Processing in R for Everyone
https://future.futureverse.org
955 stars 84 forks source link

Threads/session finalization. #87

Closed RebelionTheGrey closed 8 years ago

RebelionTheGrey commented 8 years ago

Hello.

I use code like this:

RootBootstrapping <- function(mean, sd)
{
  polyCoeffs <- rnorm(length(mean), mean = mean, sd = sd);

  rawResult <- as.complex(polyroot(polyCoeffs));
  roots <- rawResult[order(Re(rawResult), Im(rawResult))];

  rootMatrix <- matrix(nrow = (length(polyCoeffs) - 1), ncol = 2);
  colnames(rootMatrix) <- c("Re", "Im");

  rootMatrix[,"Re"] <- Re(roots);
  rootMatrix[,"Im"] <- Im(roots);

  return (rootMatrix);
}
GetRootsQuantiles <- function(polyMatrixCoeff, quantileLevel = c(0.05, 0.95), points)
{
        maxThreads <- max(1, detectCores() - 1);
        internalCluster <- makeCluster(maxThreads);
        clusterExport(internalCluster, c("polyMatrixCoeff", "RootBootstrapping"), envir=environment());

    meanRoots <- as.complex(polyroot(polyMatrixCoeff[,1]));
    rootsCount <- length(polyMatrixCoeff[,1]) - 1;

    print("Start matrix generation");

    rootsMatrix <- do.call('rbind', parLapply(internalCluster, 1:points, function(i) 
    { 
        RootBootstrapping(mean = polyMatrixCoeff[,"mean"], sd = polyMatrixCoeff[,"sd"]);
    }));

    stopCluster(internalCluster);

    currentPlan <- plan();
    print(currentPlan)

    print("Start kmeans calculation");

    plan(multiprocess)

    colnames(rootsMatrix) <- c("Re", "Im");

    result <- listenv();
    methods <- c("euclidean", "maximum", "manhattan", "canberra")

  result[["kmean"]] %<-% { kmeans(rootsMatrix, rootsCount, nstart = rootsCount); }
    result[["kmedian"]] %<-% { kGmedian(rootsMatrix, nstart = rootsCount); }

  for (i in 1:length(methods)) 
  { 
    result[[methods[i]]] %<-% { amap::Kmeans(rootsMatrix, rootsCount, nstart = rootsCount, method = methods[i], iter.max = 10000); }
  }

  plan(currentPlan);    

    devNull <- sapply(1:6, function(i) { print(result[[i]]$centers); })

    rootsMatrix <- NULL;
    gc(FALSE, FALSE);
}

points <- 100000;
polyMatrixCoeff <- matrix(nrow=3, ncol=2);
polyMatrixCoeff [1,1] <- 1;
polyMatrixCoeff [1,2] <- 0;

polyMatrixCoeff [2,1] <- -0.7;
polyMatrixCoeff [2,2] <- 0.02;

polyMatrixCoeff [3,1] <- 0.1;
polyMatrixCoeff [3,2] <- 0.01;

colnames(polyMatrixCoeff) <- c("mean", "sd")
GetRootsQuantiles(polyMatrixCoeff, c(0.05, 0.95), points);

gc(FALSE, FALSE);

so, after execution I have RScript processes staying in the memory without gc. Are there same finalization technique in 'future' package?

Thank you.

HenrikBengtsson commented 8 years ago

Hi, thanks for this feedback. This will be a long reply and possible a bit technical, but hopefully it clarifies what is happening and why, and that I'm all ears to ideas of for a nice solution to garbage collect remote/external R processes.

The issue

I assume your on Windows, because there plan(multiprocess) is effectively plan(multisession) which spawns off background R session (using parallel::makeCluster()). On Unix-like system it becomes plan(multicore), which forks off processes and those are one-time offs and will terminate as soon as their values are collected. So, with plan(multisession) in action, I think below is a minimal example that illustrates your point:

library("future")
plan(multisession)  ## So example works also on Unix
a %<-% { integer(length = 100e6) }
> object.size(a)
400000040 bytes

At this point, we have that:

  1. The main R process, which holds object a of size ~400 MB.
  2. The background R process that resolved the future, also holds a unnamed object of the same size.

Your point is that you wish to have the memory allocated by the background process to be automatically released at this point, correct? And this won't happen unless the garbage collector of that background session is run, which won't really happen unless the background session needs the memory for other things.

How to garbage collect the memory in background processes?

It should be clarified here that these background multisession processes are not one-time-off processes. Instead they're there sitting there in the background waiting to server the main processes. More on their life span below.

So, the short answer is that it is not easy to clear out the memory on background sessions right now. This stems from the fact that these background R sessions that sits there are servers the main R process are regular PSOCK cluster sessions internally basically created as

cl <- parallel::makeCluster(availableCores())
plan(cluster, workers=cl)

and AFAIK, just like for such PSOCK clusters, it is not easy to have them automatically cleaned out and garbage collected after their results have been collected.

The only approach I am aware of is to call the background sessions again and ask them to explicitly clean up, e.g. let each background session execute gc() - possibly with a rm(list=ls(all.names=TRUE)) first. I have though about doing this when the value of the future has been collected. However, since garbage collection may take some time to run and we do need to wait for the background session to finish the cleanup (each request for execution needs to be paired with a retrieval-of-results request), I didn't want to make that the default.

I'll try to think about this more. Maybe it could be an option (as in options()), an argument to plan(), and / or an argument to each individual future.

Also, I think automatic cleanup and garbage collection after the result has been retrieved could ideally be implemented as an optional post-return action/hook function part of the parallel package itself. This could also be done such that the main process is not blocked. Such a feature would gain many more users - not just users of the future package.

When are multisession processes terminated?

I guess the above setup of multisession background sessions begs the question: how long are the background processes alive?

The short answer is, until they are garbage collected. As explained above, plan(multisession) is just a neat version plan(cluster, workers=parallel::makeCluster(availableCores())). That parallel::makeCluster(availableCores()) object is recorded in an internal registry (future:::ClusterRegistry("get")), and as long that it is around, all background processes are kept alive. When that object disappears, it can be garbage collected and then those background sessions are terminated.

So, when and how is this "cluster" object removed? This happens automatically, when:

  1. You setup a new cluster that is different from the one you had in the past, e.g. plan(cluster, workers=...). Unfortunately, it is not enough to change strategy, e.g. plan(eager).
  2. When the future package is unloaded, e.g. unloadNamespace("future") and / or detach("package:future")

You could hack it and clear the internal registry as:

future:::ClusterRegistry("stop")

Maybe I should add a public function for achieving this.

Also / importantly, you can always gain full control by doing something like:

cl <- parallel::makeCluster(availableCores())
oplan <- plan(cluster, workers=cl)

...

plan(oplan)
stopCluster(cl)

Thanks again for this use case. It's really helpful. Here are a few comments about your setup independent of this issue:

RebelionTheGrey commented 8 years ago

Hello!

Thank you for your answer. I tried to use %tweak%list(gc=TRUE), plan(..., gc=TRUE), but with no effect - there is no gc called in each session after code execition has finished. I also tried to to use rm(list=ls(all.names=TRUE)) and gc(F,F) in the thread code block, but got an error:

[1] "Start matrix generation" [1] "Start kmeans calculation" Show Traceback

Rerun with Debug Error in result[[i]]$centers : $ operator is invalid for atomic vectors

9 print(result[[i]]$centers) at QuantileDistribution.R#43 8 FUN(X[[i]], ...) 7 lapply(X = X, FUN = FUN, ...) 6 sapply(1:6, function(i) { print(result[[i]]$centers) }) at QuantileDistribution.R#43 5 GetRootsQuantiles(coeffMatrixAR, c(0.05, 0.95), points) at Main.R#53 4 eval(expr, envir, enclos) 3 eval(ei, envir) 2 withVisible(eval(ei, envir)) 1 source("C:/User Root/Repository/R Projects/Roots weeding bootstrap/Main.R")

So, It would be great to have same external fuction in future package for garbage collection and same function for cluster destroying.

future:::ClusterRegistry("stop") kill slave processes, but each time I will need to use plan(multiprocess) before parallel tasks running. Also I didn't test it on nested topology - does it works in this example?

`> plan(multiprocess)

pid <- Sys.getpid() a %<-% {

  • cat("Resolving 'a' ...\n")
  • Sys.getpid()
  • } b %<-% {
  • cat("Resolving 'b' ...\n")
  • b1 %<-% {
  • cat("Resolving 'b1' ...\n")
  • Sys.getpid()
  • }
  • b2 %<-% {
  • cat("Resolving 'b2' ...\n")
  • Sys.getpid()
  • }
  • c(b.pid = Sys.getpid(), b1.pid = b1, b2.pid = b2)
  • }`

Thank you. Now I will use plan topology and wait for next realization. It's really good and useful project for me!

P.S. I attached .R scripts I used to test abilities of gc().
http://rgho.st/79GZqxbZT http://rgho.st/7xdJQvnZh http://rgho.st/7PL2vcKtN

Also I think, it would be great to realize this mechanism without event using, because events doesn't work in RStudio and R.NET - this problema based on using source function.

HenrikBengtsson commented 8 years ago

Hi, sorry for not being clear. I was giving lots of technical details on what's going on under the hood. Basically, in current version there is no built-in support for garbage collection of multisession / cluster workers. But I'll certainly work trying to get this supported.

However, below is a function you can setup and call to explicitly garbage collect your background processes:

gcCluster <- function() {
  cl <- future:::ClusterRegistry("get")
  if (length(cl) == 0) return()
  for (kk in 1:2) parallel::clusterEvalQ(cl, { gc() })
}

Then call gcCluster() manually whenever you wish to cleanup the background processes.

(*) The reason for using two gc() calls and not just one is explained in https://github.com/HenrikBengtsson/Wishlist-for-R/issues/27

HenrikBengtsson commented 8 years ago

Hi again, feel free to try out the following version:

source("http://callr.org/install#HenrikBengtsson/future@feature/gc-cluster")

Cluster futures (i.e. cluster, multisession and multiprocess on Windows) for which gc=TRUE will now cleanup and garbage collect the background process as soon as the value has been collected. For example:

> library("future")
> plan(multisession, gc=TRUE)

> a %<-% { integer(length = 100e6) }
## At this point the value has not yet been collected and is therefore kept
## in the background R session.

## The following will collect the value of the future and thereby cleanup
## the memory of the background process.
> str(a)
 int [1:100000000] 0 0 0 0 0 0 0 0 0 0 ...

Let me know if this is what you were looking for.

RebelionTheGrey commented 8 years ago

Hello. I renewed future package by link above and tried to run my code I used earlier. There is no effect neither using gcCluster <- function() { cl <- future:::ClusterRegistry("get") if (length(cl) == 0) return() for (kk in 1:2) parallel::clusterEvalQ(cl, { gc() }) } nor using gc=TRUE in renewed package. Just only cluster stopping works well or use new plan(multiprocess) command. I don't know why...

HenrikBengtsson commented 8 years ago

What operating system are you using?

RebelionTheGrey commented 8 years ago

Windows 10 x64 Professional, MRO 3.2.5 + RStudio. I tried to check many variants: print values of result, set them to NULL, set main list to NULL. No effect. Maybe this connected with MRO version - it's actually based on R 3.2.5. I'll check code in 3.3.1 today's night and write you.

HenrikBengtsson commented 8 years ago

Ok, have you tried my minimal example, i.e.

> library("future")
> plan(multisession, gc=TRUE)
> a %<-% { integer(length = 100e6) }
> str(a)

Does that work for you? If not, I have to go back to the drawing board. It could also be MRO 3.2.5 specific (hopefully not).

If that one works, it could be that you're looking at the R background processes created by your own internalCluster <- makeCluster(maxThreads), which is independent of the future.

RebelionTheGrey commented 8 years ago

Yes, your example works fine. I tried to expand it for functions and It also works well. stopCluster from parallel kill cluster normally. I'll try to rewrite code without using parallel directly. Maybe is a bug/anomaly in my own code. I'll write about results. Thank you.

HenrikBengtsson commented 8 years ago

Great to hear.

Now a FYI, which I held back on: There's a noticeable overhead involved when using futures. Thus, if the future expression involved only takes a very short time to be evaluated and you call it many times, it might not be worth it. For instance, in your case it looks like you're calling RootBootstrapping() 100,000 times and that function doesn't look like it takes a very long time. This can be avoided by chunking, i.e. doing multiple calls to RootBootstrapping() in each future to lower the impact of the overhead.

For the record: There are plans to provide mechanisms in the future package to avoid some of this overhead. For instance, full manual control of global variables will certainly help this, cf. https://github.com/HenrikBengtsson/future/issues/84.