psolymos / pbapply

Adding progress bar to '*apply' functions in R
https://peter.solymos.org/pbapply/
154 stars 6 forks source link

The pblapply is running slower and slower than expected. How should I fix it? #68

Open laleoarrow opened 3 months ago

laleoarrow commented 3 months ago

Hi, thanks for developing such a great tool! However, I ran into some problems with it when performing a large (kinda?) loop using pbapply to accelerate the process. I did add gc() and rm() function in the loop, but it doesnt help and runs slower and slower as the loop number goes.

It seems that each CPU core it used to calculate it occupied more and more RAM than it should be for one loop though. In the beginning, each thread took ~9 RAM, then it grows larger without shrinking back.

image

Maybe I understand pbapply somewhere wrong, but I could not locate the problems. So any thoughts or suggestion would be appreciated! Here is the code I use. (The code is running on a ARM Macbook Pro (2023) with 128 RAM.)

gmb_files <- list.files(path = "~/ref/mbqtl/473/473hg19", pattern = "\\hg19.tsv.gz$", full.names = T)
cl <- makeCluster(4, type = "FORK")
res_stage1s <- pblapply(gmb_files, function(gmb){ # gmb = gmb_files[1]
# res_stage1s <- lapply(gmb_files[1:3], function(gmb){ # gmb = gmb_files[1]
  gmb_id <- basename(gmb) %>% gsub("hg19.tsv.gz","",.); leo_message(paste("# Processing: ", gmb_id))
  gmb_file <- fread(gmb) %>%  mutate_at("CHR", as.integer)
  res_one_gmbs <- lapply(1:nrow(loci), function(i){
    # initial parameters
    gwas_names <- c("T1D", "Iridocyclitis")
    chr <- loci$CHR[i]
    bp <- loci$BP[i]
    win <- 500 # kb
    gene <- loci$GENE[i]
    Evidence <- ifelse(loci$Evidence[i]=="PLACO_COLOC","PC",loci$Evidence[i])
    reminder <- paste0(Evidence,"_", gmb_id, "@", loci$CHR[i],":",loci$BP[i]-win*1000/2,"-",loci$BP[i]+win*1000/2)
    # format hyprcoloc input
    hypr_loci_dat <- format_hyprc_dat(gwas_names, chr, bp, win=win, 
                                      gmb=T, gmb_file=gmb_file, gmb_id=gmb_id)
    beta_matrix <- hypr_loci_dat$beta_matrix
    se_matrix <- hypr_loci_dat$se_matrix
    snp_id = rownames(beta_matrix); leo_message(paste("\nSNP length:", length(snp_id)))
    # hyprcoloc
    binary_traits = c(1,1,0)
    hypr_result <- hyprcoloc::hyprcoloc(beta_matrix, se_matrix, 
                                        trait.names = colnames(beta_matrix), snp.id = rownames(beta_matrix),
                                        binary.outcomes = binary_traits)
    reg.res <- hypr_result$results %>% 
      mutate(index = reminder, chr = chr, bp = bp, gmb_id = gmb_id, gene = gene) %>% 
      select(index, chr, bp, gmb_id, gene, everything())
    # sensitivity
    .check_hyprcoloc_postive <- function(reg.res) {
      coloced_traits <- strsplit(reg.res$traits, ",")[[1]] %>% length()
      logi1 <- nrow(reg.res) == 1
      # logi2 <- reg.res$traits != "None"
      # logi3 <- !is.na(reg.res$posterior_prob) && reg.res$posterior_prob > 0.25 
      logi4 <- coloced_traits == 3
      logi = logi1 && logi4
      if (logi) {return(TRUE)} else {return(FALSE)}
    }
    if (.check_hyprcoloc_postive(reg.res)) {
      sen <- hyprcoloc::sensitivity.plot(beta_matrix, se_matrix,
                                         trait.names = colnames(beta_matrix), snp.id = rownames(beta_matrix),
                                         reg.thresh = c(0.5, 0.6, 0.7), align.thresh = c(0.5, 0.6, 0.7), prior.c = c(0.05, 0.02, 0.01, 0.005), 
                                         equal.thresholds = FALSE, similarity.matrix = TRUE)
      sim.mat <- sen[[2]] # for self-defined pheatmap
      save.path <- paste0("./figure/hyprcoloc/", reminder, ".pdf")
      title <- paste0(loci$Evidence[i], "@", loci$CHR[i],":",loci$BP[i]-win*1000/2,"-",loci$BP[i]+win*1000/2)
      p <- self_draw_hypr(sim.mat, title, save = T, save.path = save.path) # grid::grid.draw(p$gtable)
    } else {message(paste0("# Failed to hyprcoloc for loci: ", reminder))}
    # clean cache
    rm(list = c("gwas", "chr", "bp", "win", "gene", "Evidence", "reminder",
                "hypr_result", "beta_matrix", "se_matrix", "snp_id",
                "sen", "sim.mat", "save.path", "title", "p")); gc()
    return(reg.res)
  })
  res_one_gmb <- do.call("rbind", res_one_gmbs)
  rm(gmb_file); rm(gmb_id); gc()
  return(res_one_gmb)
}, cl = cl)
stopCluster(cl); rm(cl); gc()
psolymos commented 3 months ago

Hi @laleoarrow I am not sure what is going on, but I suspect that some of the steps you are doing does not parallelize well due to non exportable objects (see https://cran.r-project.org/web/packages/future/vignettes/future-4-non-exportable-objects.html).

Can you also try the future backend and see if that helps?

library(future)
cl <- parallel::makeCluster(n)
plan(cluster, workers = cl)
r2 <- pblapply(..., cl = "future")
parallel::stopCluster(cl)
laleoarrow commented 3 months ago

@psolymos Thx for ur prompt response! I tried to replace the original code with future backend as follows:

plan(multisession, workers = 10) # plan(sequential)
options(future.globals.maxSize = 10 * 1024^3) # i.e., 10GG; should takes ~1G for my objects in theory
res_stage1s <- pblapply(length(gmb_files):1,  cl = "future", FUN = function(j){ # apply future parallelization for outer loop
  #------------------outer loop-------------------#
  ...
  gmb_file <- fread(gmb) %>%  mutate_at("CHR", as.integer) # %>% reduce_data(loci, win=500) # load a file in outer loop
  #------------------inner loop-------------------#
  res_one_gmbs <- pblapply(1:nrow(loci), function(i){ # no parallelization for the inner loop
...

Although intuitively the ram rise slower than before, the issue persists still unfortunately and would stuck the whole process somewhere in middle.