mlr-org / mlr3batchmark

Connector between mlr3 and batchtools
5 stars 3 forks source link

Error in becnhmark grid: A Resampling is instantiated for a task with a different number of observations #26

Closed MislavSag closed 9 months ago

MislavSag commented 9 months ago

I have upgraded all mlr3 packages, including mlr3batchmark.

Now I am getting the error I didn't get before.

Ther error is:

### [bt]: Generating problem instance for problem '8a7d4bb8e5156165' ...
### [bt]: Applying algorithm 'run_learner' on problem '8a7d4bb8e5156165' for job 3 (seed = 4) ...
INFO  [09:41:37.721] [mlr3] Applying learner 'subsample.dropnacol.dropna.removeconstants_1.fixfactors.winsorizesimple.removeconstants_2.dropcorr.scale_branch.uniformization.scale.scale_unbranch.dropna_v2.nop_union_pca.pca.ica.featureunion.filter_branch.jmi.relief.gausscov_f1st.filter_unbranch.regr.lightgbm.tuned' on task 'taskRetWeek' (iter 1/1)
INFO  [09:41:40.017] [bbotk] Starting to optimize 9 parameter(s) with '<OptimizerHyperband>' and '<TerminatorNone>'
INFO  [09:41:40.128] [bbotk] Evaluating 1 configuration(s)
Error in benchmark_grid(self$task, self$learner, resampling, param_values = list(xss)) : 
  A Resampling is instantiated for a task with a different number of observations
40: (function (e) 
    traceback(2L))()
39: stop("A Resampling is instantiated for a task with a different number of observations")
38: benchmark_grid(self$task, self$learner, resampling, param_values = list(xss))
37: .__ObjectiveTuning__.eval_many(self = self, private = private, 
        super = super, xss = xss, resampling = resampling)
36: private$.eval_many(xss, resampling = list(<environment>))
35: eval(expr, p)
34: eval(expr, p)
33: eval.parent(expr, n = 1L)
32: invoke(private$.eval_many, xss, .args = self$constants$values)
31: .__Objective__eval_many(self = self, private = private, super = super, 
        xss = xss)
30: self$objective$eval_many(xss_trafoed)
29: .__OptimInstance__eval_batch(self = self, private = private, 
        super = super, xdt = xdt)
28: inst$eval_batch(xdt)
27: private$.optimize(inst)
26: doTryCatch(return(expr), name, parentenv, handler)
25: tryCatchOne(expr, names, parentenv, handlers[[1L]])
24: tryCatchList(expr, classes, parentenv, handlers)
23: tryCatch({
        private$.optimize(inst)
    }, terminated_error = function(cond) {
    })
22: optimize_default(inst, self, private)
21: .__Optimizer__optimize(self = self, private = private, super = super, 
        inst = inst)
20: private$.optimizer$optimize(inst)
19: .__TunerFromOptimizer__optimize(self = self, private = private, 
        super = super, inst = inst)
18: self$tuner$optimize(instance)
17: .__AutoTuner__.train(self = self, private = private, super = super, 
        task = task)
16: get_private(learner)$.train(task)
15: .f(learner = <environment>, task = <environment>)
14: eval(expr, p)
13: eval(expr, p)
12: eval.parent(expr, n = 1L)
11: invoke(.f, .args = .args, .opts = .opts, .seed = .seed, .timeout = .timeout)
10: encapsulate(learner$encapsulate["train"], .f = train_wrapper, 
        .args = list(learner = learner, task = task), .pkgs = learner$packages, 
        .seed = NA_integer_, .timeout = learner$timeout["train"])
9: learner_train(learner, task, sets[["train"]], sets[["test"]], 
       mode = mode)
8: workhorse(iteration = job$repl, task = data, learner = learner, 
       resampling = resampling, store_models = store_models, lgr_threshold = lgr::get_logger("mlr3")$threshold)
7: job$algorithm$fun(job = job, data = job$problem$data, instance = instance, 
       ...)
6: (function (...) 
   job$algorithm$fun(job = job, data = job$problem$data, instance = instance, 
       ...))(learner_hash = "b18020b11dad6832", learner_id = "subsample.dropnacol.dropna.removeconstants_1.fixfactors.winsorizesimple.removeconstants_2.dropcorr.scale_branch.uniformization.scale.scale_unbranch.dropna_v2.nop_union_pca.pca.ica.featureunion.filter_branch.jmi.relief.gausscov_f1st.filter_unbranch.regr.lightgbm.tuned", 
       store_models = FALSE)
5: do.call(wrapper, job$algo.pars, envir = .GlobalEnv)
4: with_preserve_seed({
       set_seed(list(seed = seed, rng_kind = rng_kind))
       code
   })
3: with_seed(job$seed, do.call(wrapper, job$algo.pars, envir = .GlobalEnv))
2: execJob.Experiment(job)
1: execJob(job)

It's hard to me to reproduce since there are lots of steps I am doing before getting to execJob(job) which produces an error. Additonaly, I had to to use only parts of mlr3batchmark package because it didn't work as is before.

I first prepare data, learners and resamplings with:

# create registry
print("Create registry")
packages = c("data.table", "gausscov", "paradox", "mlr3", "mlr3pipelines",
             "mlr3tuning", "mlr3misc", "future", "future.apply",
             "mlr3extralearners", "stats")
reg = makeExperimentRegistry(file.dir = dirname_, seed = 1, packages = packages)

# populate registry with problems and algorithms to form the jobs
print("Batchmark")
batchmark(designs, reg = reg)

# save registry
print("Save registry")
saveRegistry(reg = reg)

and than call the script

options(warn = -1)
library(data.table)
library(gausscov)
library(paradox)
library(mlr3)
library(mlr3pipelines)
library(mlr3viz)
library(mlr3tuning)
library(mlr3misc)
library(future)
library(future.apply)
library(mlr3extralearners)
library(batchtools)
library(mlr3batchmark)
library(checkmate)
library(stringi)
library(R6)
library(brew)

# UTILS -------------------------------------------------------------------
# utils functions
dir = function(reg, what) {
  fs::path(fs::path_expand(reg$file.dir), what)
}
getResultFiles = function(reg, ids) {
  fs::path(dir(reg, "results"), sprintf("%i.rds", if (is.atomic(ids)) ids else ids$job.id))
}
waitForFile = function(fn, timeout = 0, must.work = TRUE) {
  if (timeout == 0 || fs::file_exists(fn))
    return(TRUE)
  "!DEBUG [waitForFile]: `fn` not found via 'file.exists()'"
  timeout = timeout + Sys.time()
  path = fs::path_dir(fn)
  repeat {
    Sys.sleep(0.5)
    if (basename(fn) %chin% list.files(path, all.files = TRUE))
      return(TRUE)
    if (Sys.time() > timeout) {
      if (must.work)
        stopf("Timeout while waiting for file '%s'",
              fn)
      return(FALSE)
    }
  }
}
writeRDS = function (object, file, compress = "gzip") {
  batchtools:::file_remove(file)
  saveRDS(object, file = file, version = 2L, compress = compress)
  waitForFile(file, 300)
  invisible(TRUE)
}
UpdateBuffer = R6Class(
  "UpdateBuffer",
  cloneable = FALSE,
  public = list(
    updates = NULL,
    next.update = NA_real_,
    initialize = function(ids) {
      self$updates = data.table(
        job.id = ids,
        started = NA_real_,
        done = NA_real_,
        error = NA_character_,
        mem.used = NA_real_,
        written = FALSE,
        key = "job.id"
      )
      self$next.update = Sys.time() + runif(1L, 60, 300)
    },

    add = function(i, x) {
      set(self$updates, i, names(x), x)
    },

    save = function(jc) {
      i = self$updates[!is.na(started) & (!written), which = TRUE]
      if (length(i) > 0L) {
        first.id = self$updates$job.id[i[1L]]
        writeRDS(
          self$updates[i,!"written"],
          file = fs::path(
            jc$file.dir,
            "updates",
            sprintf("%s-%i.rds", jc$job.hash, first.id)
          ),
          compress = jc$compress
        )
        set(self$updates, i, "written", TRUE)
      }
    },

    flush = function(jc) {
      now = Sys.time()
      if (now > self$next.update) {
        self$save(jc)
        self$next.update = now + runif(1L, 60, 300)
      }
    }

  )
)

# RUN JOB -----------------------------------------------------------------
# load registry
if (interactive()) {
  reg = loadRegistry("experiments_test")
} else {
  reg = loadRegistry("experiments")
}

# extract integer
i = as.integer(Sys.getenv('PBS_ARRAY_INDEX'))
# i = 3L

# extract not done ids
ids_not_done = findNotDone(reg=reg)
ids_done = findDone(reg=reg)
(nrow(ids_not_done) + nrow(ids_done)) == 8866

# create job collection
# if (nrow(ids_done) == 0) {
#
# }
resources = list(ncpus = 4) # this shouldnt be important
jc = makeJobCollection(ids = NULL,
                       resources = resources,
                       reg = reg)

# resources = list(ncpus = 4) # this shouldnt be important
# jc = makeJobCollection(ids = ids_not_done,
#                        resources = resources,
#                        reg = reg)

# start buffer
buf = UpdateBuffer$new(jc$jobs$job.id)
update = list(started = batchtools:::ustamp(), done = NA_integer_, error = NA_character_, mem.used = NA_real_)

# get job
cat("Get Job \n")
# job = batchtools:::Job$new(
#   file.dir = jc$file.dir,
#   reader = batchtools:::RDSReader$new(FALSE),
#   id = jc$jobs[i]$job.id,
#   job.pars = jc$jobs[i]$job.pars[[1L]],
#   seed = 1 + jc$jobs[i]$job.id,
#   resources = jc$resources
# )
job = batchtools:::getJob(jc, i)
id = job$id

# execute job
cat("Execute Job")
gc(reset = TRUE)
update$started = batchtools:::ustamp()
result = execJob(job)

# save job
writeRDS(result, file = getResultFiles(jc, id), compress = jc$compress)

# memory usage
tryCatch({
  memory.mult = c(if (.Machine$sizeof.pointer == 4L) 28L else 56L, 8L)
  gc_info <- gc(verbose = FALSE)
  memory_used = sum(gc_info[, 1L] * memory.mult) / 1000000L
}, error = function(e) {
  memory_used <- 1000  # Set to NA or some default value in case of error
})

# updates
update$done = batchtools:::ustamp()
update$mem.used = memory_used
buf$add(i, update)
buf$flush(jc)
buf$save(jc)

I can send exmaple data from expriments-test if needed.

I am not sure, but it seems to me number of elements in exports folder was the same as number of designs. Now, this is not the case.

MislavSag commented 9 months ago

Solved with https://github.com/mlr-org/mlr3/issues/973