mschubert / clustermq

R package to send function calls as jobs on LSF, SGE, Slurm, PBS/Torque, or each via SSH
https://mschubert.github.io/clustermq/
Apache License 2.0
146 stars 27 forks source link

targets stalls using the worker API with the multiprocess backend #278

Closed mschubert closed 2 years ago

mschubert commented 2 years ago

Discussed in https://github.com/mschubert/clustermq/discussions/266

Originally posted by **wlandau** June 2, 2021 `targets` is having trouble with the `multiprocess` backend for nontrivially large pipelines. The following example always hangs on my Macbook after it gets most of the way through. All the workers are still alive when it hangs. ```{r} library(targets) tar_script({ options(clustermq.scheduler = "multiprocess") list( tar_target( index_batch, seq_len(100), ), tar_target( data_continuous, index_batch, pattern = map(index_batch) ), tar_target( data_discrete, index_batch, pattern = map(index_batch) ), tar_target( fit_continuous, data_continuous, pattern = map(data_continuous) ), tar_target( fit_discrete, data_discrete, pattern = map(data_discrete) ) ) }) tar_make_clustermq(workers = 4, callr_function = NULL) ``` It always [here in the `targets` source](https://github.com/ropensci/targets/blob/fd7d2a3432451dd35e7a477ad83692ebf5024186/R/class_clustermq.R#L205): ```{r} message <- if_any(self$workers > 0L, self$crew$receive_data(), list()) ``` This only happens to me for the multiprocess backend. (I tried multicore and SGE, and both seem to work fine, although with multicore I sometimes get "error: unable to shutdown some parallel processes" at the very end of my unit tests.) I am having a tough time reproducing it outside `targets`. The following event loop seems to work fine. ```{r} options(clustermq.scheduler = "multiprocess") library(clustermq) w <- workers(n_jobs = 4) token <- w$set_common_data( fun = identity, const = list(), pkgs = character(0), common_seed = 0, rettype = "list", export = list() ) x <- w$receive_data() w$send_common_data() x <- w$receive_data() for (rep in seq_len(100)) { w$send_call(2 + 2) x <- w$receive_data() } for (worker in seq_len(3)) { w$send_shutdown_worker() x <- w$receive_data() } w$send_shutdown_worker() w$cleanup() w$finalize() ```
mschubert commented 2 years ago

Duplicate of #272