mrc-ide / rrq

:runner::runner::runner: Lightweight Redis queues
https://mrc-ide.github.io/rrq/
Other
24 stars 4 forks source link

queuing calls inside other functions #4

Open jamieRowen opened 6 years ago

jamieRowen commented 6 years ago

Love this package as I am a big fan of the python rq package and this feels like the closest I can get in R at the moment. I notice that you have been doing a fair amount of work in this space, so thanks very much and kudos.

I have come across a problem that yields an error:

[2018-04-10 13:30:49] STOP ERROR worker_1 | This is an uncaught error in rrq, probably a bug! worker_1 | Error in storr_copy(dest, self, list, namespace, skip_missing) : worker_1 | Missing values; can't copy: worker_1 | - from namespace 'objects', key: '6717f2823d3202449301145073ab8719' worker_1 | Calls: rrq_worker ... tryCatch -> tryCatchList -> tryCatchOne -> worker_1 | Execution halted

as a reproducible example i have a a script file "myfuns.R"

slowdouble <- function(x) {
  Sys.sleep(x)
  print(x * 2)
  x*2
}

I create a context and save it separately to be loaded in both the master and worker with

context_holder = "context"
dir.create(context_holder)
root = paste0(context_holder, "/test_context")
out = paste0(context_holder,"/context.RData")
orig_source = "myfuns.R"

new_source = paste0(context_holder,"/",orig_source)
file.copy(orig_source,new_source)

context = context::context_save(root,sources = new_source)
context = context::context_load(context, new.env(parent = .GlobalEnv))
saveRDS(context, out)

Run the worker in one R process

library(rrq)
context = readRDS("context/context.RData")
rrq_worker(context, redux::hiredis(host = host, port = port))

Then in the master

library(rrq)
context = readRDS("context/context.RData")
obj = rrq_controller(context,redux::hiredis(host = host,port = port))

to create the controller object.

It works fine if I run

obj$enqueue(slowdouble(1))

but breaks if I do

f = function(x){
  obj$enqueue(slowdouble(x))
}
f(1)

Any ideas on why this might be or what I might do to fix it?

richfitz commented 6 years ago

Hi @jamieRowen - sorry for missing this; I don't see most github notifications as they get a bit overwhelming!

This slightly modified version of your script works for me:

Sys.setenv(R_TESTS = "")
root <- tempfile()
context <- context::context_save(root, sources = "myfuns.R")
context <- context::context_load(context, new.env(parent = .GlobalEnv))
obj <- rrq_controller(context, redux::hiredis())
on.exit(obj$destroy())

## For testing, use: worker_command(obj)
wid <- workers_spawn(obj, timeout = 5, progress = PROGRESS)

t <- obj$enqueue(slowdouble(1))
obj$task_wait(t)

f <- function(x) {
  obj$enqueue(slowdouble(x))
}
t <- f(1)
obj$task_wait(t)

I think that the issue is the use of saveRDS/readRDS is messing up your environments