ropensci / drake

An R-focused pipeline toolkit for reproducibility and high-performance computing
https://docs.ropensci.org/drake
GNU General Public License v3.0
1.34k stars 128 forks source link

Combining tibbles that come from dynamic and transform maps #1327

Closed edgBR closed 3 years ago

edgBR commented 3 years ago

Prework

Hi again drake colleagues. It seems that now my implementation is working and now I can train, evaluated properly in train and test data and save my models and accuracy metrics in s3! Prework here:

Original workflow: #1293 Modified workflow to correct the DAG: #1294 First attemp to rewrite the plan using dynamic targets: #1311 Linking dynamic targets: #1314 Linking multiple dynamic targets to a target: #1321

Description

The small beast now looks like this:

image

And my plan looks as follows:

new_plan_dynamic_branch_test <- drake_plan(
  unitMetadata = getMetadata(
    environment = "PROD",
    key_directory = config_parameters$LOCAL_CONFIG$DirectoryKeyCloud_RStudio,
    operex_schema = config_parameters$SF_CONFIG$schema_name, db_src = c(1, 2, 3)
  ),
  lastData = getLastData(
    environment = "PROD",
    key_directory = config_parameters$LOCAL_CONFIG$DirectoryKeyCloud_RStudio,
    schema = config_parameters$SF_CONFIG$schema_name
  ),
  finalMetadata = getApplicablePackages(
    unit_metadata = unitMetadata, 
    units_with_recent_data = lastData
  ),
  counterCombination = getCountersComb(
    df_in = finalMetadata, 
    environment = "PROD", 
    key_directory = config_parameters$LOCAL_CONFIG$DirectoryKeyCloud_RStudio, 
    schema = config_parameters$SF_CONFIG$schema_name
  ),
  counterCombinationIndex = counterCombination %>%  group_indices(),
  getFullData = getLifeFullData(
    environment = "PROD",
    key_directory = config_parameters$RSTUDIO_CLOUD_CONF$KeyDir,
    max_forecasting_horizon = argument_parser$horizon,
    list_of_machines = counterCombination$machine %>% unique()
  ),
  getIndividualData = target(
    getIndividualCounterData(
      full_data = getFullData,
      combination_df = counterCombination, 
      combination_df_index = counterCombinationIndex
    ),
    dynamic = cross( # Use `dynamic =` instcounterCombination %>% group_indices()ead of `transform =`
      counterCombinationIndex # no tidy evaluation needed for dynamic branching
    )
  ),
  processingData = target(
    featureEngineering(
      raw_data = getIndividualData,
      max_forecasting_horizon = argument_parser$horizon
    ),
    dynamic = map(getIndividualData)
  ),
  training_data = target(
    timeSeriesSplitter(
      input_data = processingData,
      max_forecast_horizon = argument_parser$horizon,
      type = "train"
    ),
    dynamic = map(processingData)
  ),
  testing_data = target(
    timeSeriesSplitter(
      input_data = processingData,
      max_forecast_horizon = argument_parser$horizon,
      type = "test"
    ),
    dynamic = map(processingData)
  ),
  models_training = target( ### split train/validation set
    trainModels(
      input_data = training_data,
      max_forecast_horizon = argument_parser$horizon, 
      model_type = model_type,
      max_multisession_cores = argument_parser$sessions
    ),
    dynamic = map(training_data), # dynamic branching
    transform = map(model_type = !!model_types)
  ),
  accuracy = target(
    accuracy_explorer(
      mode = "test",
      models = models_training,
      max_forecast_horizon = argument_parser$horizon,
      bucket = argument_parser$outputbucket,
      bucket_folder = "/test_lifecounter3",
      testing_data = testing_data
    ), 
    dynamic = map(models_training, testing_data),
    transform = map(models_training)
  ),
  saveModels = target(
    saveModelsS3(
      model = models_training,
      bucket = argument_parser$outputbucket, 
      bucket_folder = "/test_3", 
      max_forecasting_horizon = argument_parser$horizon
    ),
    dynamic = map(models_training),
    transform = map(model_type)
  )

Desired result

Now my objective was to evaluate the best model for an specific counter combination as I call and then refit my model to the whole historical data accordingly. The function that I wrote to do this is as follow:

bestModel <- function(input_data, bucket, bucket_folder, max_forecasting_horizon) {

  snsr <- unique(input_data$snsr)
  machine <- unique(input_data$machine)
  db_src <- unique(input_data$db_src)
  print(paste0("Refitting best model for ", machine, 
               " db_src ", db_src, " and counter ", 
               snsr, " and horizon ", max_forecasting_horizon))

  bucket_objects <- get_bucket(bucket = bucket, prefix = paste0(bucket_folder, "/", 
                                                                package_b_number, "/horizon_", 
                                                                max_forecasting_horizon, "/", 
                                                                "snsr_", snsr, "/",
                                                                "db_src_", db_src))
  accuracy_log_names <- c()
  for(i in 1:length(bucket_objects)) {
    accuracy_log_names <- c(
      accuracy_log_names, 
      bucket_objects[[i]]$Key)

  }

  model_metrics_paths <- bucket_objects_df %>% filter(grepl('.csv', value)) 
  model_binaries_paths <- bucket_objects_df %>% filter(!grepl('.csv', value))

  accuracy_log_values <- c()

  for (i in 1:length(model_metrics_paths$value)){

    accuracy_log_values[[i]] <- s3read_using(data.table::fread, 
                                          bucket = bucket, 
                                          object = model_metrics_paths$value[i]) 

  }

  accuracy_log_values <- accuracy_log_values %>% bind_rows() %>% drop_na(MAPE)
  bestModel <- accuracy_log_values  %>% top_n(1, wt = desc(MAPE))  
  print(paste0("Best model for ", machine, " and snsr_key ", 
               snsr, " and ", db_src, " and db_src ", 
               db_src, " is ", bestModel$.model))
  return(bestModel$.model)

}

The problem is that when I add this to my plan I was doing as follows:

new_plan_dynamic_branch_test <- drake_plan(
  unitMetadata = getMetadata(
    environment = "PROD",
    key_directory = config_parameters$LOCAL_CONFIG$DirectoryKeyCloud_RStudio,
    operex_schema = config_parameters$SF_CONFIG$schema_name, db_src = c(1, 2, 3)
  ),
  lastData = getLastData(
    environment = "PROD",
    key_directory = config_parameters$LOCAL_CONFIG$DirectoryKeyCloud_RStudio,
    schema = config_parameters$SF_CONFIG$schema_name
  ),
  finalMetadata = getApplicablePackages(
    unit_metadata = unitMetadata, 
    units_with_recent_data = lastData
  ),
  counterCombination = getCountersComb(
    df_in = finalMetadata, 
    environment = "PROD", 
    key_directory = config_parameters$LOCAL_CONFIG$DirectoryKeyCloud_RStudio, 
    schema = config_parameters$SF_CONFIG$schema_name
  ),
  counterCombinationIndex = counterCombination %>%  group_indices(),
  getFullData = getLifeFullData(
    environment = "PROD",
    key_directory = config_parameters$RSTUDIO_CLOUD_CONF$KeyDir,
    max_forecasting_horizon = argument_parser$horizon,
    list_of_machines = counterCombination$machine %>% unique()
  ),
  getIndividualData = target(
    getIndividualCounterData(
      full_data = getFullData,
      combination_df = counterCombination, 
      combination_df_index = counterCombinationIndex
    ),
    dynamic = cross( # Use `dynamic =` instcounterCombination %>% group_indices()ead of `transform =`
      counterCombinationIndex # no tidy evaluation needed for dynamic branching
    )
  ),
  processingData = target(
    featureEngineering(
      raw_data = getIndividualData,
      max_forecasting_horizon = argument_parser$horizon
    ),
    dynamic = map(getIndividualData)
  ),
  training_data = target(
    timeSeriesSplitter(
      input_data = processingData,
      max_forecast_horizon = argument_parser$horizon,
      type = "train"
    ),
    dynamic = map(processingData)
  ),
  testing_data = target(
    timeSeriesSplitter(
      input_data = processingData,
      max_forecast_horizon = argument_parser$horizon,
      type = "test"
    ),
    dynamic = map(processingData)
  ),
  models_training = target( ### split train/validation set
    trainModels(
      input_data = training_data,
      max_forecast_horizon = argument_parser$horizon, 
      model_type = model_type,
      max_multisession_cores = argument_parser$sessions
    ),
    dynamic = map(training_data), # dynamic branching
    transform = map(model_type = !!model_types)
  ),
  accuracy = target(
    accuracy_explorer(
      mode = "test",
      models = models_training,
      max_forecast_horizon = argument_parser$horizon,
      bucket = argument_parser$outputbucket,
      bucket_folder = "/test_lifecounter3",
      testing_data = testing_data
    ), 
    dynamic = map(models_training, testing_data),
    transform = map(models_training)
  ),
  saveModels = target(
    saveModelsS3(
      model = models_training,
      bucket = argument_parser$outputbucket, 
      bucket_folder = "/test_3", 
      max_forecasting_horizon = argument_parser$horizon
    ),
    dynamic = map(models_training),
    transform = map(model_type)
  ),
 GetBestModel = target(
    bestModel(
      input_df = accuracy,
      bucket = argument_parser$outputbucket,
      bucket_folder = "/test_3",
      max_forecasting_horizon = argument_parser$horizon
     )
)
)

Clearly not the right approach as the target does not happen after saveModels:

image

My idea now is to return a dataframe in accuracy. I have modified the function as follows:

accuracy_explorer <- function(mode, models, max_forecast_horizon, bucket, bucket_folder, testing_data=NULL) {
  if(mode == "train") {
   ..................
  } else if(mode=="test"){
   ...................
  }
  s3write_using(accuracy_metrics, 
                write_csv,
                object = paste0(bucket_folder, 
                                "/",
                                unique(accuracy_metrics$machine),
                                "/horizon_",
                                max_forecast_horizon,
                                "/snsr_",
                                unique(accuracy_metrics$snsr),
                                "/db_src_",
                                unique(accuracy_metrics$db_src),
                                "/",
                                unique(accuracy_metrics$.model), 
                                "_accuracy_metrics.csv"), 
                bucket = bucket)
  return(accuracy_metrics)
}

So the idea is to now join all of the subtargets tibbles in one but honestly I am a bit lost of how to do this. I looked into #685 but I dont know if this is applicable for my case as I have dynamic and static branching mixed together.

Any suggestions?

BR /Edgar

wlandau commented 3 years ago

tibbles are great data structures for pipelines. For a single dynamic target, all the sub-targets are automatically combined.

library(drake)
library(tibble)
plan <- drake_plan(
  index = seq_len(3),
  data1 = target(tibble(x = index), dynamic = map(index)),
  data2 = data1
)

make(plan)
#> ▶ target index
#> ▶ dynamic data1
#> > subtarget data1_0b3474bd
#> > subtarget data1_b2a5c9b8
#> > subtarget data1_71f311ad
#> ■ finalize data1
#> ▶ target data2

# Sub-targets automatically combined in exploratory data analysis.
readd(data1)
#> # A tibble: 3 x 1
#>       x
#>   <int>
#> 1     1
#> 2     2
#> 3     3

# Sub-targets automatically combined in downstream targets.
readd(data2)
#> # A tibble: 3 x 1
#>       x
#>   <int>
#> 1     1
#> 2     2
#> 3     3

Created on 2020-10-01 by the reprex package (v0.3.0)

For dynamic branching within static branching, you can use combine() to bring everything together.

library(drake)
library(tibble)
library(dplyr)
#> 
#> Attaching package: 'dplyr'
#> The following objects are masked from 'package:stats':
#> 
#>     filter, lag
#> The following objects are masked from 'package:base':
#> 
#>     intersect, setdiff, setequal, union
plan <- drake_plan(
  index_dynamic = seq_len(2),
  data_mapped = target(
    tibble(static = index_static, dynamic = index_dynamic),
    transform = map(index_static = c(1, 2)),
    dynamic = map(index_dynamic)
  ),
  data_combined = target(
    bind_rows(data_mapped),
    transform = combine(data_mapped)
  )
)

plot(plan)


make(plan)
#> ▶ target index_dynamic
#> ▶ dynamic data_mapped_1
#> > subtarget data_mapped_1_0b3474bd
#> > subtarget data_mapped_1_b2a5c9b8
#> ■ finalize data_mapped_1
#> ▶ dynamic data_mapped_2
#> > subtarget data_mapped_2_0b3474bd
#> > subtarget data_mapped_2_b2a5c9b8
#> ■ finalize data_mapped_2
#> ▶ target data_combined

readd(data_combined)
#> # A tibble: 4 x 2
#>   static dynamic
#>    <dbl>   <int>
#> 1      1       1
#> 2      1       2
#> 3      2       1
#> 4      2       2

Created on 2020-10-01 by the reprex package (v0.3.0)

Comments: