HenrikBengtsson / future.callr

:rocket: R package future.callr: A Future API for Parallel Processing using 'callr'
https://future.callr.futureverse.org
62 stars 1 forks source link

The worker is not freed when an unresolved future loses its reference. #20

Open randy3k opened 2 years ago

randy3k commented 2 years ago

The "worker" is not freed when an unresolved future loses its reference.

future::plan(future.callr::callr, workers = 10)

future::nbrOfFreeWorkers()
#> [1] 10

x <- future.callr::callr(1)
x <- NULL

future::nbrOfFreeWorkers()
#> [1] 9
  1. If the future is finished, we should remove the future from the registry
  2. If the future is still running, there should be a mechanism to stop the R subprocess
randy3k commented 2 years ago

One workaround that I found is to clear the db object manually.

assign("db", list(), envir = environment(environment(future.callr:::nbrOfFreeWorkers.callr)$FutureRegistry))

I guess you could make use of weak references to keep track of the futures in the registry.

HenrikBengtsson commented 2 years ago

Thanks for reporting. Yes, "this is expected". Modulo issue #21, at least it should always be possible to create one more future in this situation, because when nbrOfFreeWorkers() == 0, then (and only then), an attempt to resolve and collect (="free up") one of the active futures.

Before anything else, I'm curious, how do you end up in this situation?

  1. If the future is finished, we should remove the future from the registry

Yes, one could hook into R's garbage collector by registering a finalizer using one dummy environment per future. When finalized, we could free up the internal registry. FWIW, this is what happens when we lose references to R connections; they're cleaned out when running the GC.

  1. If the future is still running, there should be a mechanism to stop the R subprocess

This is possible but less straightforward because it has to fit in with the Future API, which yet doesn't have a concept of terminating futures. It also introduces a side effect, i.e. terminating instead of letting a future expression complete. So, these type of behaviors must work the same regardless of future backend. It's no the roadmap to figure this out.

randy3k commented 2 years ago

I noticed that no workers were available after I have interrupted a call to future.apply::future_lapply.

r$> future::plan(future.callr::callr, workers = 5)

r$> future.apply::future_lapply(1:5, function(.) Sys.sleep(1000))
^C

r$> future::nbrOfFreeWorkers()
[1] 0
randy3k commented 2 years ago

But I just realized that it is a more general problem as it also affects multisession

r$> future::plan(future::multisession, workers = 5)

r$> future.apply::future_lapply(1:5, function(.) Sys.sleep(1000))
^C

r$> future::nbrOfFreeWorkers()
[1] 0
randy3k commented 2 years ago

Yes, one could hook into R's garbage collector by registering a finalizer using one dummy environment per future. When finalized, we could free up the internal registry. FWIW, this is what happens when we lose references to R connections; they're cleaned out when running the GC.

If the registry is keeping references of the futures, the futures will never be gc'ed. That's why I mentioned weak references a bit earlier.

HenrikBengtsson commented 2 years ago

Workaround

I don't I ever explained how to get out:

library(future.apply)
plan(multisession, workers = 2)
y <- future_lapply(1:2, function(.) Sys.sleep(1000)) ## Ctrl-C
#> ^C
nbrOfFreeWorkers()
#> [1] 0

where there are no more available workers. To get out of this state, we currently have to force a new set of workers;

plan(sequential)
plan(multisession, workers = 2)
nbrOfFreeWorkers()
#> [1] 2

Moving forward

Having said this, I think there's more that can be done by the future framework here. For example, after the original futures finished, which takes 1000 seconds, we should be able to create new futures, but currently we can't.

One approach could be to check if nbrOfFreeWorkers() == 0L, then an attempt to garbage collect internal futures should be made before creating the first future.

randy3k commented 2 years ago

The workaround only works with multisession, but not callr workers.

HenrikBengtsson commented 2 years ago

The workaround only works with multisession, but not callr workers.

Correct; I forgot about this. There's a similar problem when creating 'cluster' future with a BYO 'cluster';

> cl <- parallelly::makeClusterPSOCK(2L)
> plan(cluster, workers = cl)
> for (kk in 1:2) future(Sys.sleep(1000))
> nbrOfFreeWorkers()
[1] 0
> plan(sequential)
> nbrOfFreeWorkers()
[1] 1
> plan(cluster, workers = cl)
# stalls until one of the `cl` workers is free

I understand why this happens in both cases. It is actually only for plan(multisession), and plan(cluster, workers = nworkers) and some other cases where the plan(sequential) workaround works.