OdyOSG / ariadne

OHDSI Characterization Study tools
Other
0 stars 0 forks source link

non-scalable current implementation of treatment patterns function #1

Open A1exanderAlexeyuk opened 1 year ago

A1exanderAlexeyuk commented 1 year ago

@ablack3 @mdlavallee92

The size of the cohort is about 600,000 patients It's about 20 event cohorts (drugs) Run takes more than 4 hours without result, when session interrupted RAM filled with 9 GB data For cohort with 450,000 - takes 3.5 hours to run

library(dplyr, warn.conflicts = F)
library(stringr)

roadmap <- yaml::read_yaml(here::here('inst/configurations/roadmap_dynamic.yml')) %>% data.frame()
cohortsMap <- yaml::read_yaml(here::here('inst/configurations/cohorts_dynamic.yml')) %>% data.frame()

getNSCLCDrugs <- function() {
  read.csv(here::here('inst/settings/CohortsToCreate.csv')) %>%
    dplyr::filter(cohort_id %/% 1000 == 4 & ! stringr::str_detect(cohort_name, '_MM')) %>%
    dplyr::pull(cohort_id)
}

for(cdm_schema in unique(roadmap$cdmSchema)[c(5)]) {  # it's optum dod
  usethis::ui_info(glue::glue('{cdm_schema} is going'))
  dirs <- list.dirs(here::here('data'),recursive = FALSE)

  .database <- roadmap %>% 
    dplyr::filter(cdmSchema == cdm_schema) %>%
    dplyr::pull(.database) %>% 
    unique()

  targetRoad <- roadmap %>% 
    dplyr::filter(cdmSchema == cdm_schema & str_detect(tolower(name), 'myeloma')) %>%
    dplyr::pull(id)

  targetIds <- cohortsMap %>% filter(id %in% targetRoad) %>% pull(igIds)

  cohortTable <- glue::glue('cohort_{cdm_schema}')

  connectionDetails <- DatabaseConnector::createConnectionDetails(
    dbms = "redshift",
    user = Sys.getenv("USER"),
    password = Sys.getenv("PASSWORD"),
    port = "5439",
    server = glue::glue("{Sys.getenv('DATABASE_SERVER')}/{.database}"),
    pathToDriver = Sys.getenv("PATH_TO_DRIVER"))

  connection <- suppressMessages(DatabaseConnector::connect(connectionDetails))

  check <- DatabaseConnector::renderTranslateQuerySql(connection = connection,
                                                      sql = '
                                                      select distinct cohort_definition_id from
                                                      insight_gateway_app.@cohortTable' ,
                                                      cohortTable = cohortTable,
                                                      snakeCaseToCamelCase = T) %>%
    dplyr::pull(cohortDefinitionId)

  DatabaseConnector::disconnect(connection)
  for(targetId in c(   1021  # nsclc cohort about 500.000 people
  )) {
    if(targetId %in% check) {
      usethis::ui_info(glue::glue('{targetId} is going'))
      cancer <- cohortsMap %>% filter(igIds == targetId) %>%
        pull(name)
      vocabularyDatabaseSchema <- cdm_schema
      cdmDatabaseSchema <- cdm_schema
      cohortDatabaseSchema <- 'insight_gateway_app'
      resultsDatabaseSchema <- 'insight_gateway_app'
      cohortTable <- glue::glue('cohort_{cdm_schema}')
      if(str_detect(cancer, 'myeloma')) {
        drugIds <- getMultipleMyelomaDrugs()
      } else {drugIds <- getNSCLCDrugs()}
      tp <- ariadne::define_treatment_history(targetCohortId = targetId,
                                              targetCohortName = paste0(cancer ,'_', targetId),
                                              eventCohortIds = drugIds,
                                              includeTreatments = "startDate",
                                              periodPriorToIndex = 0,
                                              minEraDuration = 0,
                                              eraCollapseSize = 30,
                                              combinationWindow = 30,
                                              minPostCombinationDuration = 30,
                                              filterTreatments = "Changes",
                                              maxPathLength = 3,
                                              minCellCount = 5,
                                              minCellMethod = "Remove",
                                              groupCombinations = 10,
                                              addNoPaths = FALSE) %>%
        ariadne::build_treatment_history() %>%
        ariadne::build_treatment_patterns()
      data.table::fwrite(tp$treatmentPathways, (paste0(cdm_schema, '_', targetId, '.csv')))
    } else {  usethis::ui_info(glue::glue('{targetId} is NULL'))  }}}
mdlavallee92 commented 1 year ago

@A1exanderAlexeyuk what are the sizes of the drug cohorts and how many of them are there? Maybe we can create some fake data to replicate your set up

ablack3 commented 1 year ago

Hi @A1exanderAlexeyuk,

I think the short term solution is to run this in batches. So instead of processing your entire cohort table, batch your persons into smaller groups and run the treatment pathways extraction on each group. Then combine the results at the end.

Martin suggested that I work on the txPath package for this instead of ariadne. The goal will be for the txPath package to support large cohorts. Just wanted to provide an update.