ropensci / drake

An R-focused pipeline toolkit for reproducibility and high-performance computing
https://docs.ropensci.org/drake
GNU General Public License v3.0
1.34k stars 129 forks source link

Forcing DAG direction #1294

Closed edgBR closed 4 years ago

edgBR commented 4 years ago

Prework

Dear community, thanks to will I was able to complete my drake workflow splitting a model fitting using the fable package in a way that allow me to decrease the memory consumption of my server from 220GBs to 70GBs (pretty big success here) with the only limitation increasing 50% my running time (from 60mins to 90).

Prework is available here: https://github.com/ropensci/drake/issues/1293

Description

Now I am trying to fetch all of the accuracy metrics of my models to get the best one but the problem is that this step is being executed before my models run (maybe because the accuracy csv files are already there?)

Reproducible example

The plan is as follows:

plan_branched <- drake_plan(
  life_counter_data = getLifeCounterData(environment = "PROD", 
                                         key_directory = config_parameters$LOCAL_CONFIG$DirectoryKeyCloud_RStudio, 
                                         max_forecasting_horizon = argument_parser$horizon),
  unit_metadata = getMetadata(environment = "PROD", 
                              key_directory = config_parameters$LOCAL_CONFIG$DirectoryKeyCloud_RStudio, 
                              operex_schema = config_parameters$SF_CONFIG$schema_name, db_src = c(1,2,3)),
  unit_with_recent_data = getLastData(life_counter_data),
  processed_data = featureEngineering(raw_data = life_counter_data, 
                                      metadata = unit_metadata, 
                                      recent_units = unit_with_recent_data,
                                      max_forecasting_horizon = argument_parser$horizon),
  ts_models = target(
    trainModels(
      input_data = processed_data, 
      max_forecast_horizon = argument_parser$horizon,
      max_multisession_cores = argument_parser$sessions,
      model_type = type
    ),
    transform = map(type = !!model_types)
  ),
  accuracy = target(
    accuracy_explorer(
      mode = "train", 
      models = ts_models, 
      max_forecast_horizon = argument_parser$horizon,
      directory_out = "/data1/"
    ),
    transform = map(ts_models, .id = type)
  ),
  saving = target(
    saveModels(
      models = ts_models, 
      directory_out = "/data1/", 
      max_forecasting_horizon = argument_parser$horizon, 
      max_multisession_cores = argument_parser$sessions
    ),
    transform = map(ts_models, .id = type)
  ),
  final_accuracy = target(bestModel(models_metrics_uri = "/data1/", 
                             metric_selected = "MAPE", 
                             final_metrics_uri = "/data1/", 
                             metrics_store = "local", 
                             max_forecast_horizon = argument_parser$horizon)
)
)

My final accuracy function is as follows:

#' Selecting best time series model for individual units
#' @author 
#' @param model_metrics_uri 
#' @param metric_selected 
#' @param final_metrics_uri
#' @param metrics_store
#' @return A tibble with the processed data and no gaps caused by different start date and end date of the individual time series.

bestModel <- function(models_metrics_uri, metric_selected, final_metrics_uri, metrics_store, max_forecast_horizon) {
  if(metrics_store == "local") {

    model_metrics_joined <- list.files(path = models_metrics_uri, pattern = "*.csv", full.names = TRUE) %>% 
      map_df(~fread(.)) %>% 
      filter_all(all_vars(!is.infinite(.))) %>% 
      drop_na() %>% 
      group_by(a,b,c)

    best_model_tbl <- model_metrics_joined %>% summarise(metric = min({metric_selected}))
    print(paste0("Saving best model accuracy metrics for horizon ", max_forecast_horizon))
    accuracy_log = write_csv(x = best_model_tbl,
                             path = paste0(final_metrics_uri, "final_accuracy_metrics_horizon_", max_forecast_horizon, ".csv"))

  } else if(metrics_store =="s3") {

    #TODO max_priority

  } else if(metrics_store =="cloudwatch") {

    #TODO holding up until we decide model monitoring platform

  } else if(metrics_store == "mlflow") {

    #TODO holding up until we decide model monitoring platform, but prioritized over cloudwatch as it is avaialble in the RStudio instance

  }
}

But my dag looks as follows:

image

Desired result

I would like to load the accuracy metrics after I have save my models and compute the accuracy.

Session info

sessionInfo() 
R version 4.0.0 (2020-04-24) 
Platform: x86_64-pc-linux-gnu (64-bit) 
Running under: Ubuntu 16.04.6 LTS  
Matrix products: default BLAS:   /usr/lib/atlas-base/atlas/libblas.so.3.0 
LAPACK: /usr/lib/atlas-base/atlas/liblapack.so.3.0 
locale:  [1] LC_CTYPE=en_US.UTF-8       LC_NUMERIC=C               LC_TIME=en_US.UTF-8        LC_COLLATE=en_US.UTF-8     LC_MONETARY=en_US.UTF-8     [6] LC_MESSAGES=en_US.UTF-8    LC_PAPER=en_US.UTF-8       LC_NAME=C                  LC_ADDRESS=C               LC_TELEPHONE=C             [11] LC_MEASUREMENT=en_US.UTF-8 LC_IDENTIFICATION=C    

attached base packages: 
[1] stats     graphics  grDevices utils     datasets  methods   base      

other attached packages: 
 [1] tidyselect_1.1.0      feather_0.3.5         xts_0.12-0            zoo_1.8-8             DescTools_0.99.35     data.table_1.12.8     qs_0.22.1              [8] optparse_1.6.6        furrr_0.1.0           future_1.17.0         imputeTS_3.0          timetk_2.0.0          feasts_0.1.3          forecast_8.12 
 [15] fable.prophet_0.1.0   Rcpp_1.0.4.6          fable_0.2.1.9000      fabletools_0.2.0.9000 tsibble_0.9.1         forcats_0.5.0         stringr_1.4.0         
[22] dplyr_1.0.0           purrr_0.3.4           readr_1.3.1           tidyr_1.1.0           tibble_3.0.1          ggplot2_3.3.2         tidyverse_1.3.0       [29] aws.s3_0.3.21         drake_7.12.2          tictoc_1.0            ConfigParser_1.0.0    R6_2.4.1              ini_0.3.1             DBI_1.1.0             [36] odbc_1.2.2            lubridate_1.7.9        

loaded via a namespace (and not attached):  

[1] colorspace_1.4-1      ellipsis_0.3.0        class_7.3-17          base64enc_0.1-3       fs_1.4.1              rstudioapi_0.11       listenv_0.8.0          
[8] farver_2.0.3          getopt_1.20.3         bit64_0.9-7           mvtnorm_1.1-0         prodlim_2019.11.13    fansi_0.4.1           xml2_1.3.2            
[15] codetools_0.2-16      splines_4.0.0         jsonlite_1.7.0        packrat_0.5.0         broom_0.5.6           anytime_0.3.7         dbplyr_1.4.3          
[22] compiler_4.0.0        httr_1.4.1            backports_1.1.8       assertthat_0.2.1      Matrix_1.2-18         cli_2.0.2             htmltools_0.5.0       
[29] visNetwork_2.0.9      prettyunits_1.1.1     tools_4.0.0           igraph_1.2.5          gtable_0.3.0          glue_1.4.1            cellranger_1.1.0      
[36] fracdiff_1.5-1        vctrs_0.3.1           urca_1.3-0            nlme_3.1-147          lmtest_0.9-37         timeDate_3043.102     gower_0.2.1           
[43] globals_0.12.5        rvest_0.3.5           lifecycle_0.2.0       MASS_7.3-51.6         scales_1.1.0          ipred_0.9-9           aws.ec2metadata_0.2.0 
[50] hms_0.5.3             parallel_4.0.0        expm_0.999-4          yaml_2.2.1            quantmod_0.4.17       curl_4.3              aws.signature_0.5.2   
[57] rpart_4.1-15          stringi_1.4.6         tseries_0.10-47       TTR_0.23-6            filelock_1.0.2        boot_1.3-25           lava_1.6.7            
[64] storr_1.2.1           rlang_0.4.6           pkgconfig_2.0.3       distributional_0.1.0  lattice_0.20-41       htmlwidgets_1.5.1     stinepack_1.4         
[71] recipes_0.1.12        bit_1.1-15.2          magrittr_1.5          generics_0.0.2        base64url_1.4         txtq_0.2.0            pillar_1.4.4          
[78] haven_2.2.0           withr_2.2.0           survival_3.1-12       nnet_7.3-14           modelr_0.1.7          crayon_1.3.4          RApiSerialize_0.1.0   
[85] progress_1.2.2        grid_4.0.0            readxl_1.3.1          blob_1.2.1            reprex_0.3.0          digest_0.6.25         stringfish_0.12.1     [92] munsell_0.5.0         sessioninfo_1.1.1     quadprog_1.5-8
wlandau commented 4 years ago

drake discovers dependency relationships using static code analysis. The command for final_accuracy must literally mention the symbols of any targets it depends on. The following plan should work.

library(drake)
model_types <- c("model1", "model2")
plan <- drake_plan(
  life_counter_data = getLifeCounterData(environment = "PROD", 
                                         key_directory = config_parameters$LOCAL_CONFIG$DirectoryKeyCloud_RStudio, 
                                         max_forecasting_horizon = argument_parser$horizon),
  unit_metadata = getMetadata(environment = "PROD", 
                              key_directory = config_parameters$LOCAL_CONFIG$DirectoryKeyCloud_RStudio, 
                              operex_schema = config_parameters$SF_CONFIG$schema_name, db_src = c(1,2,3)),
  unit_with_recent_data = getLastData(life_counter_data),
  processed_data = featureEngineering(raw_data = life_counter_data, 
                                      metadata = unit_metadata, 
                                      recent_units = unit_with_recent_data,
                                      max_forecasting_horizon = argument_parser$horizon),
  ts_models = target(
    trainModels(
      input_data = processed_data, 
      max_forecast_horizon = argument_parser$horizon,
      max_multisession_cores = argument_parser$sessions,
      model_type = type
    ),
    transform = map(type = !!model_types)
  ),
  accuracy = target(
    accuracy_explorer(
      mode = "train", 
      models = ts_models, 
      max_forecast_horizon = argument_parser$horizon,
      directory_out = "/data1/"
    ),
    transform = map(ts_models, .id = type)
  ),
  saving = target(
    saveModels(
      models = ts_models, 
      directory_out = "/data1/", 
      max_forecasting_horizon = argument_parser$horizon, 
      max_multisession_cores = argument_parser$sessions
    ),
    transform = map(ts_models, .id = type)
  ),
  aggregated_accuracy = target(
    # Could be dplyr::bind_rows(accuracy)
    # if the accuracy_* targets are data frames:
    list(accuracy),
    transform = combine(accuracy)
  ),
  final_accuracy = {
    # Mention the symbol "aggregated_accuracy" so final_accuracy runs last:
    aggregated_accuracy
    bestModel(models_metrics_uri = "/data1/", 
                                    metric_selected = "MAPE", 
                                    final_metrics_uri = "/data1/", 
                                    metrics_store = "local", 
                                    max_forecast_horizon = argument_parser$horizon)
  }
)

Screen Shot 2020-07-09 at 8 32 12 AM

wlandau commented 4 years ago

Also, please have a look at https://books.ropensci.org/drake/plans.html#how-to-choose-good-targets. Targets are R objects that drake automatically saves and retrieves from storage, and it tracks changes to these values to keep targets up to date. If you are saving all your data to custom files, e.g. directory_out = "/data1/", thendrake` does not know how to watch your results for changes, and it will not be able to automatically rerun targets at the right times. So in your case, I recommend either returning the fitted models themselves from the targets or using dynamic files. Dynamic files may be easier if you are willing to return the individual file paths of the models and accuracy metrics you save.

wlandau commented 4 years ago

There's also file_in() and file_out(), which tell drake to watch for changes in files known ahead of time, but dynamic files are probably easier to think about.

edgBR commented 4 years ago

Hi @wlandau

The reason because I saved my models is because my workflow was crashing when I was storing the targets, but I do not know if this was normal.

wlandau-lilly commented 4 years ago

If you do decide to save models, I recommend format = "qs" because it is lighter in storage than drake's default save method.

Do you need to store the entire model object? I am not familiar with fable but in a lot of cases, you can save data frames of strategic summaries instead of entire model objects. Some fitted models are super large, and some models have pointers that are only valid in the current R session and cannot be reloaded in the a new session. (For example, Keras models cannot be saved and loaded with saveRDS() and readRDS(), so they require keras::save_model_hdf5() or foramt = "keras" in drake.)

The Bayesian analysis example here and here is an example of how to deal with these problems. Markov chain Monte Carlo generates a large number of posterior samples, and so it is unfeasible to save every single fitted model. So the custom functions in the workflow generate a data frame of posterior summaries instead of saving the entire model.

edgBR commented 4 years ago

Hi @wlandau-lilly

I am using qs for saving the binaries:

saveModels <- function(models, directory_out, max_forecasting_horizon, max_multisession_cores) {
print("Saving the all-mighty mable")
qsave(x = models, file = paste0(directory_out, attributes(models)$model, "_horizon_", max_forecasting_horizon, ".qs"), 
      preset = "custom",
      shuffle_control = 15,
      algorithm = "zstd",
      nthreads = max_multisession_cores)
#saveRDS(object = models, file = paste0(directory_out, "ts_models_horizon_", max_forecasting_horizon, ".rds"))
print(paste0("End workflow for ", attributes(models)$model, " models with maximum forecasting horizon ", max_forecasting_horizon))
}

The problem is that fable needs the binary containing the model to make the forecast. Should I use the format="qs" directly in the drake plan with file_out?

BR /E

wlandau commented 4 years ago

So the physical model files need to be there? Nothing you can do about it?

I that case, maybe combine the model-fitting step and forecasting step into a single target. Data in the cache will be lighter that way. Merging two targets into one is a good strategy sometimes if you find yourself running too many targets or saving too much data. See https://books.ropensci.org/drake/plans.html#how-to-choose-good-targets for a discussion of the tradeoffs.

The example at https://github.com/wlandau/drake-examples/blob/13e6edf9d6c4b60c0c57d0fc303cfba63702e9f2/stan is a similar situation. In Bayesian analysis, posterior samples eat up a lot of data, and we don't want to save everything for every single model. So we combine model-fitting and summarization into a single step and return a one-line data frame for each model. See https://github.com/wlandau/drake-examples/blob/13e6edf9d6c4b60c0c57d0fc303cfba63702e9f2/stan/R/functions.R#L62-L85 and https://github.com/wlandau/drake-examples/blob/13e6edf9d6c4b60c0c57d0fc303cfba63702e9f2/stan/R/plan.R#L16-L20.