mschubert / clustermq

R package to send function calls as jobs on LSF, SGE, Slurm, PBS/Torque, or each via SSH
https://mschubert.github.io/clustermq/
Apache License 2.0
146 stars 27 forks source link

Using persistent workers #234

Closed mike-lawrence closed 3 years ago

mike-lawrence commented 3 years ago

Despite reading the technical description and browsing the drake code linked there, I'm still wrapping my head around how to properly use persistent workers. Here's what I've tried:

#set options & import ----
options(clustermq.scheduler = "multicore")
library(clustermq)

# create workers ----
num_workers = 2
w = workers(
    n_jobs = num_workers
)
on.exit(w$finalize())
w$receive_data() #look for confirmation that workers are ready

# define objects to send to workers & send----
z = letters
f = function(x) z[x]
w$set_common_data(
    export = list(z=z,f=f)
    , fun = identity
    , const = list()
    , rettype = list()
    , pkgs = character(0)
    , common_seed = 1
    , token = 'set_common_data_token'
)
w$send_common_data()
w$receive_data() #look for confirmation that common data was received

# send data to workers for computation ----
Q(
    x = as.list(1:2)
    , fun = f
    , workers = w
)

But clearly I'm doing something wrong because things seem to simply hang at the end with the message Running 2 calculations (0 objs/0 Mb common; 1 calls/chunk) .... Any tips?

mschubert commented 3 years ago

Thank you for pointing this out, there is indeed no example for reusing workers in the User Guide, and the Technical Docs are geared towards building your own API on top of clustermq (and is not required for normal users). Both should be more clearly documented.

To answer your question, if you want to reuse workers, you separate creating the pool from the call to Q():

w = workers(n_jobs=2)
Q(..., workers=w)
Q(..., workers=w)
w$cleanup()

Is this what you were after?

mike-lawrence commented 3 years ago

Close. At least if I have this:

# create workers ----
num_workers = 2
w = workers(
    n_jobs = num_workers
)
on.exit(w$finalize())
# define objects to send to workers & send----
z = letters
f = function(x) z[x]
# send data to workers for computation ----
Q(
    x = as.list(1:num_workers)
    , fun = f
    , workers = w
)

When I select it and run I get the error:

Running 2 calculations (0 objs/0 Mb common; 1 calls/chunk) ...
Error in summarize_result(job_result, n_errors, n_warnings, cond_msgs,  : 
  2/2 jobs failed (0 warnings). Stopping.
(Error #1) object 'z' not found
(Error #2) object 'z' not found

BUT when I select it and run again, without restarting the R session, it runs fine. What am I missing?

Obviously the workers need to be sent the z variable that the function f expects will be in the global environment, but my attempts to use w$set_common_data()/w$send_common_data() prior to the call to Q() result in a hang as reported in my first post. I'm not using the const argument to Q() because for my eventual usage z is going to be a large object on which multiple subsequent Q() calls will rely on, so I don't want to be sending it repeatedly each time.

wlandau commented 3 years ago

In that case, I think you will need Q(export = list(z = z)).

mike-lawrence commented 3 years ago

(Thanks @wlandau! I was in the midst of writing the below as you responded :) )

Ah, figured it out! the export argument is what I need. So to send a big object initially that subsequent functions will expect to be in the global env, one should do:

#set options & import ----
options(clustermq.scheduler = "multicore")
library(clustermq)

# create workers ----
num_workers = 3
w = workers(
    n_jobs = num_workers
)
on.exit(w$finalize())

# define objects to send to workers & send----
z = letters
f = function(x) z[x]
Q(
    x = as.list(1:num_workers)
    , fun = identity
    , workers = w
    , export = list(z=z)
)

# send common data to workers for computation ----
Q(
    x = as.list(1:num_workers)
    , fun = f
    , workers = w
)