mlr-org / mlr3pipelines

Dataflow Programming for Machine Learning in R
https://mlr3pipelines.mlr-org.com/
GNU Lesser General Public License v3.0
137 stars 25 forks source link

`targetmutate` doesn't work with more than one target? #694

Open bblodfon opened 1 year ago

bblodfon commented 1 year ago

Hi,

I want to change the target of a survival task which has two target variables, namely time and status. Doing the following fails (haven't put a trafo function but it's irrelevant):

library(mlr3proba)
#> Loading required package: mlr3
library(mlr3pipelines)
task = tsk('lung')

trgmut = po('targetmutate')
trgmut$train(list(task))
#> Error in initialize(...): unused argument (target = c("time", "status"))
#> This happened PipeOp targetmutate's $train()

Created on 2022-09-22 with reprex v2.0.2

The pipeline I want to build depends on this. With a SurvTask as an input I would like to:

Could you help me with this? Of course it can be done more manually, but it would be cooler to do it with the mlr3pipelines.

mb706 commented 1 year ago

I think this is because mlr3::convert_task assumes that every TaskXxx$new() has a target-argument.

@mllg do you think that mlr3proba is violating some implicit assumptions here, or that mlr3 should be adjusted?

mb706 commented 1 year ago

@bblodfon I think a workaround currently would be to write your own PipeOp that does what you want here. The book chapter may seem a bit daunting, but I think the solution here would be relatively straightforward.

An example that does some shuffling of the "time" target column could look like the following. Please be aware that I don't know mlr3proba so well so I don't really know what I am doing inside the .train_task() function -- you should test this and see if it does what you expect.

library("mlr3pipelines")
library("mlr3proba")

lung.task <- tsk("lung")

library("paradox")  # for paramset of our new po-class

# inherit from PipeOpTaskPreproc. It takes care of
# defining 'Task' (in our case TaskSurv) input and output.
# We have to overwrite the .train_task()/.predict_task()-functions.
PipeOpTimeShuffle <- R6::R6Class("PipeOpTimeShuffle", inherit = PipeOpTaskPreproc,
  public = list(
    # default initialize function header for a concrete pipeop class:
    # it is a good idea to have the `id = "xxx"` and `param_vals = list()`
    # arguments and pass them on to super$initialize.
    initialize = function(id = "timeshuffle", param_vals = list()) {
      # as a demo I am also including a hyperparameter here
      p <- ps(replace = p_lgl(tags = "required"))
      p$values = list(replace = FALSE)
      super$initialize(id = id, param_set = p, param_vals = param_vals,
        can_subset_cols = FALSE, task_type = "TaskSurv"
      )
    }
  ),
  private = list(
    .train_task = function(task) {
      pvals <- self$param_set$get_values()
      newtime <- task$data(cols = "time")
      if (nrow(newtime) > 1) {  # sample misbehaves when 1st argument has len 1!
        newtime$time <- sample(newtime$time, replace = pvals$replace)
      }
      # $cbind() overwrites old task columns.
      # I am not sure if this breaks inside resample(),
      # you should test it...
      task$cbind(newtime)
    },
    .predict_task = function(task) task
  )
)

pots <- PipeOpTimeShuffle$new()

lung.task$head(2)
#>    time status age inst meal.cal pat.karno ph.ecog ph.karno sex wt.loss
#> 1:  306   TRUE  74    3     1175       100       1       90   m      NA
#> 2:  455   TRUE  68    3     1225        90       0       90   m      15
pots$train(list(lung.task))[[1]]$head(2)
#>    time status age inst meal.cal pat.karno ph.ecog ph.karno sex wt.loss
#> 1:  351   TRUE  74    3     1175       100       1       90   m      NA
#> 2:   13   TRUE  68    3     1225        90       0       90   m      15
pots$train(list(lung.task))[[1]]$head(2)  # different
#>    time status age inst meal.cal pat.karno ph.ecog ph.karno sex wt.loss
#> 1:  310   TRUE  74    3     1175       100       1       90   m      NA
#> 2:  177   TRUE  68    3     1225        90       0       90   m      15
pots$predict(list(lung.task))[[1]]$head(2)  # unchanged during prediction
#>    time status age inst meal.cal pat.karno ph.ecog ph.karno sex wt.loss
#> 1:  306   TRUE  74    3     1175       100       1       90   m      NA
#> 2:  455   TRUE  68    3     1225        90       0       90   m      15
bblodfon commented 1 year ago

@mb706 thanks so much for the educative example, I will go through it!

bblodfon commented 1 year ago

The code above made me realize that my manual solution was shuffling the test set's targets as well, which after some thought, it wasn't what I wanted to do! (amazing that you noticed this)

I've tried some things to make the pipeline work, but I can't check if the trained tasks are indeed shuffled during training (the backends are removed? maybe I am checking the wrong objects?). I think the second way with the benchmark would be a better way to do it, but let me know what do you think:

library('mlr3pipelines')
library('mlr3proba')
#> Loading required package: mlr3
library('paradox')

# task
task = tsk('lung')
pre = po('encode', method = 'treatment') %>>%
  po('imputelearner', lrn('regr.rpart'))
task = pre$train(task)[[1]]
#task$missings()
#task$head()

# PipeOpSurvShuffle ----
PipeOpSurvShuffle = R6::R6Class('PipeOpSurvShuffle', inherit = PipeOpTaskPreproc,
  public = list(
    initialize = function(id = 'survshuffle', param_vals = list()) {
      p = ps(replace = p_lgl(tags = 'required'))
      p$values = list(replace = FALSE)
      super$initialize(id = id, param_set = p, param_vals = param_vals,
        can_subset_cols = FALSE, task_type = 'TaskSurv'
      )
    }
  ),
  private = list(
    .train_task = function(task) {
      pvals = self$param_set$get_values()
      surv  = task$data(cols = c('time', 'status'))
      if (nrow(surv) > 1) {  # `sample` 'misbehaves' when 1st argument has length 1!
        surv$time   = sample(surv$time,   replace = pvals$replace)
        surv$status = sample(surv$status, replace = pvals$replace)
      }
      # to test if this works...
      task$cbind(surv)
    },
    .predict_task = function(task) task
  )
)

poss = PipeOpSurvShuffle$new()

task$head(4)
#>    time status age sex inst meal.cal pat.karno ph.ecog ph.karno wt.loss
#> 1:  306   TRUE  74   1    3     1175       100       1       90       7
#> 2:  455   TRUE  68   1    3     1225        90       0       90      15
#> 3: 1010  FALSE  56   1    3     1315        90       0       90      15
#> 4:  210   TRUE  57   1    5     1150        60       1       90      11
poss$train(list(task))[[1]]$head(4) # ok,different
#>    time status age sex inst meal.cal pat.karno ph.ecog ph.karno wt.loss
#> 1:  188   TRUE  74   1    3     1175       100       1       90       7
#> 2:  428   TRUE  68   1    3     1225        90       0       90      15
#> 3:  153   TRUE  56   1    3     1315        90       0       90      15
#> 4:  519   TRUE  57   1    5     1150        60       1       90      11

# Make pipeline
gr = ppl('greplicate', poss %>>% po('learner', lrn('surv.coxph')), 10)
#gr$plot()
train_task = task$clone()$filter(rows = 1:200) # don't know how else to pass this, expect manually
test_task  = task$clone()$filter(rows = 201:228)

gr$train(train_task)
#> $surv.coxph_1.output
#> NULL
#> 
#> $surv.coxph_2.output
#> NULL
#> 
#> $surv.coxph_3.output
#> NULL
#> 
#> $surv.coxph_4.output
#> NULL
#> 
#> $surv.coxph_5.output
#> NULL
#> 
#> $surv.coxph_6.output
#> NULL
#> 
#> $surv.coxph_7.output
#> NULL
#> 
#> $surv.coxph_8.output
#> NULL
#> 
#> $surv.coxph_9.output
#> NULL
#> 
#> $surv.coxph_10.output
#> NULL
res = gr$predict(test_task)

scores = list()
for (i in 1:length(res)) {
  scores[[i]] = res[[i]]$score()
}
cindex = dplyr::bind_rows(scores)
cindex
#> # A tibble: 10 × 1
#>    surv.cindex
#>          <dbl>
#>  1       0.460
#>  2       0.439
#>  3       0.453
#>  4       0.554
#>  5       0.475
#>  6       0.655
#>  7       0.511
#>  8       0.468
#>  9       0.561
#> 10       0.446

# ok, got the result, let's try to see if train tasks where shuffled:
gr$state$surv.coxph_1$train_task$head(4) # backend lost?
#> Error: The backend of Task 'lung' has been removed. Set `store_backends` to `TRUE` during model fitting to conserve it.

# try with resample
# learner = as_learner(poss %>>% po('learner', lrn('surv.coxph')))
#
# rs = resample(task, learner, rsmp('holdout', ratio = 0.7),
#   store_models = TRUE, store_backends = TRUE)
# rs$score()$learner[[1]]$model$surv.coxph$train_task$head(4) # backend lost?

# try with benchmark (better way I think)
learner = as_learner(poss %>>% po('learner', lrn('surv.coxph')))
cop = po('copy', 10) # copy task 10 times
task_list = cop$train(list(task))
bm_grid = benchmark_grid(task_list, learner, rsmp('holdout', ratio = 0.7))
bm = benchmark(bm_grid, store_models = TRUE, store_backends = TRUE)
#> INFO  [14:26:09.074] [mlr3] Running benchmark with 10 resampling iterations 
#> INFO  [14:26:09.131] [mlr3] Applying learner 'survshuffle.surv.coxph' on task 'lung' (iter 1/1) 
#> INFO  [14:26:09.279] [mlr3] Applying learner 'survshuffle.surv.coxph' on task 'lung' (iter 1/1) 
#> INFO  [14:26:09.423] [mlr3] Applying learner 'survshuffle.surv.coxph' on task 'lung' (iter 1/1) 
#> INFO  [14:26:09.549] [mlr3] Applying learner 'survshuffle.surv.coxph' on task 'lung' (iter 1/1) 
#> INFO  [14:26:09.694] [mlr3] Applying learner 'survshuffle.surv.coxph' on task 'lung' (iter 1/1) 
#> INFO  [14:26:09.841] [mlr3] Applying learner 'survshuffle.surv.coxph' on task 'lung' (iter 1/1) 
#> INFO  [14:26:09.978] [mlr3] Applying learner 'survshuffle.surv.coxph' on task 'lung' (iter 1/1) 
#> INFO  [14:26:10.118] [mlr3] Applying learner 'survshuffle.surv.coxph' on task 'lung' (iter 1/1) 
#> INFO  [14:26:10.270] [mlr3] Applying learner 'survshuffle.surv.coxph' on task 'lung' (iter 1/1) 
#> INFO  [14:26:10.419] [mlr3] Applying learner 'survshuffle.surv.coxph' on task 'lung' (iter 1/1) 
#> INFO  [14:26:10.549] [mlr3] Finished benchmark

bm$score()$learner[[1]]$model$surv.coxph$train_task$head(4) # backend lost?
#> Error: The backend of Task 'lung' has been removed. Set `store_backends` to `TRUE` during model fitting to conserve it.

Created on 2022-09-23 with reprex v2.0.2

Of course, bm$score() provides the performance scores, I just want to check if they are properly calculated. I wonder, using a score that requires the train set, i.e. bm$score(msr('surv.cindex', weight_meth = 'G2')), would it in this case use the shuffled train set for each task?!

mb706 commented 1 year ago

I think what you are looking for is gr$keep_results = TRUE. When setting a Graph's keep_results flag, it causes the PipeOps to store the result of their computation in the $.result-slot. If you access the results of the shuffling PipeOps after training, you can see the targets are changed. (After you do gr$predict(...) and check the $.results, you will also notice they all have targets unchanged.)

# [...]
test_task  = task$clone()$filter(rows = 201:228)

gr$keep_results = TRUE

gr$train(train_task)
# [...]
gr$pipeops$survshuffle_1$.result[[1]]$head(2)
#>    time status age sex inst meal.cal pat.karno ph.ecog ph.karno wt.loss
#> 1:  145   TRUE  74   1    3     1175       100       1       90       7
#> 2:  740   TRUE  68   1    3     1225        90       0       90      15
gr$pipeops$survshuffle_2$.result[[1]]$head(2)
#>    time status age sex inst meal.cal pat.karno ph.ecog ph.karno wt.loss
#> 1:  310   TRUE  74   1    3     1175       100       1       90       7
#> 2:   88   TRUE  68   1    3     1225        90       0       90      15
gr$pipeops$survshuffle_3$.result[[1]]$head(2)
#>    time status age sex inst meal.cal pat.karno ph.ecog ph.karno wt.loss
#> 1:  180   TRUE  74   1    3     1175       100       1       90       7
#> 2:  511   TRUE  68   1    3     1225        90       0       90      15

Using train_task does not work here, since it is only kept around to make sure the metadata of prediction-data matches the trainingdata (names and types of columns etc.).

(I would think another way to make sure something is happening is to look at the performance values and check that they do not differ significantly from uninformed predictions?)