Azure / doAzureParallel

A R package that allows users to submit parallel workloads in Azure
MIT License
107 stars 51 forks source link

Merging large (>50MB) results #284

Closed ctlamb closed 6 years ago

ctlamb commented 6 years ago

I am running boosted regression trees on ecological data. These models work locally with %do% and also in the cloud in parallel for a subset of the data (300 rows instead of 25,000). However, when I scale up the analysis to all the data (25,000 rows) and %dopar%, after 7.5 hours all nodes successfully complete the models but results fail to merge. Is there a maximum data size that the package can merge and return, and can I change this?

Screen_Shot_2018_06_28_at_7_31_39_AM

brnleehng commented 6 years ago

Hi @ctlamb

How many results are there? What VM size are you running?

The merge task is run on a single VM. The single VM memory size would be the maximum data size. If it's 4 results of ~200 mb total. It should be running fine. Can I get the job logs of the merge task?

Thanks, Brian

Thanks Brian

ctlamb commented 6 years ago

Hi Brian,

4 results at the moment. Eventually I’ll scale up to 64 results. Each of which are 50mb.

I’m running standard_F2’s

How do I get these job logs? I followed the link in the error message but that link appears to be broken. I found the troubleshooting files on github manually, should I use setVerbose(TRUE) and or setAutoDeleteJob(FALSE)?

Appreciate the help. This package and resource is hopefully going to solve some major computing barriers im facing in my PhD.

brnleehng commented 6 years ago

I tried reproducing the this scenario. I ran 10 tasks with outputs of 50 mb, using Standard_F2s. It seemed too work fine.

What kind of outputs are you using for the model? It could be that one of the models failed and when the merge task tries to combine it. It failed.

setChunkSize(5)
foreach(i = 1:50) %dopar% {
  download.file(url='https://azureparallelr.blob.core.windows.net/datasets/JEOPARDY_CSV.csv',
                        destfile = "./JEOPARDY_CSV.csv")
  file <- read.csv("./JEOPARDY_CSV.csv")
  return(file)
}

To get the file logs:

# Standard output and error text files
getJobFile("job20180628205236", "merge", "stdout.txt", verbose = TRUE)
getJobFile("job20180628205236", "merge", "stderr.txt", verbose = TRUE)

# Getting the log files on the R process
getJobFile("job20180628205236", "merge", "wd/merge.txt", verbose = TRUE)
getJobFile("job20180628205236", "merge", "wd/doParallel.txt", verbose = TRUE)

I'll have to update the documentation on getting log files as they seemed to be outdated.

Thanks, Brian

ctlamb commented 6 years ago

OK, process of elimination. I’ll run that code on my end too to double check.

I’ll also create a reproducible boosted regression tree model and see if I can replicate the error then pass along with the error messages.

I’m going to be away from my computer for a week but will try to do this one evening. Thanks again.

ctlamb commented 6 years ago

Back at it.

I can confirm that I can run your Jeopardy download successfully.

So, moving forward, I created a reproducible example, which also recreated the error I was having. This takes a few hours to run.


###MAKE DATA
N <- 190000
X1 <- runif(N)
X2 <- 2*runif(N)
X3 <- ordered(sample(letters[1:4],N,replace=TRUE),levels=letters[4:1])
X4 <- factor(sample(letters[1:6],N,replace=TRUE))
X5 <- factor(sample(letters[1:3],N,replace=TRUE))
X6 <- 3*runif(N) 
X7 <- 31*runif(N) 
X8 <- 13*runif(N) 
mu <- c(-1,0,1,2)[as.numeric(X3)]

SNR <- 10 # signal-to-noise ratio
Y <- X1**1.5 + 2 * (X2**.5) + mu
sigma <- sqrt(var(Y)/SNR)
Y <- Y + rnorm(N,0,sigma)

data <- data.frame(Y=Y,X1=X1,X2=X2,X3=X3,X4=X4,X5=X5,X6=X6,X7=X7,X8=X8)

##SETUP PARAMATERS
param <- expand.grid(c(4,8),c(0.001,0.01))

##RUN MODEL
test <- foreach(i = 1:4,.packages = c("dismo", "gbm")) %dopar% {
   ##run model
   a <- gbm.step(data, gbm.x=c(2:9), gbm.y=1, tree.complexity=param[i,1],learning.rate=param[i,2],bag.fraction=0.65, family="gaussian", plot.main = FALSE, verbose=FALSE, silent=TRUE)
    return(a)
  }

This returns the following error for me:


Submitting tasks (4/4)
Submitting merge task. . .
Job Preparation Status: Package(s) being installed.............
Waiting for tasks to complete. . .
| Progress: 100.00% (4/4) | Running: 0 | Queued: 0 | Completed: 4 | Failed: 0 |
Tasks have completed. Merging results................An error has occurred in the merge task of the job 'job20180705170315'. Error handling is set to 'stop' and has proceeded to terminate the job. The user will have to handle deleting the job. If this is not the correct behavior, change the errorhandling property to 'pass'  or 'remove' in the foreach object. Use the 'getJobFile' function to obtain the logs. For more information about getting job logs, follow this link: https://github.com/Azure/doAzureParallel/blob/master/docs/40-troubleshooting.md#viewing-files-directly-from-compute-nodeError in e$fun(obj, substitute(ex), parent.frame(), e$data) : 
  object 'results' not found

I tried to get the job files but they seem to have been deleted

getJobFile("job20180705170315", "merge", "stdout.txt", verbose = TRUE)
$odata.metadata
[1] "https://doazpbatchkpas.westus.batch.azure.com/$metadata#Microsoft.Azure.Batch.Protocol.Entities.Container.errors/@Element"

$code
[1] "JobNotFound"

$message
$message$lang
[1] "en-US"

$message$value
[1] "The specified job does not exist.\nRequestId:149d0358-3b88-4a2f-9d38-3cab899d90bb\nTime:2018-07-05T22:51:09.1557593Z"
brnleehng commented 6 years ago

Hi @ctlamb

Turn off job auto deletion

setAutoDeleteJob(FALSE)

I was able to reproduce the issue. It appears in the merge task, there was a failure. We run a doParallel foreach loop in the merge task to read multiple files and merge. It appears the merge task couldn't read all of the files at once. I will continuing investigating on why.

I would recommend disabling the merge task (https://github.com/Azure/doAzureParallel/blob/master/docs/52-azure-foreach-options.md#bypassing-merge-task) since the outputs are coming out as 250mbs. This would mean you would have to iterate through the results yourself.

Logs in the doParallel file (Merge task)

Error in serialize(data, node$con) : ignoring SIGPIPE signal
Calls: <Anonymous> ... doTryCatch -> sendData -> sendData.SOCKnode -> serialize
Execution halted

Logs in the R session for merge task

No traceback available 
<simpleError in unserialize(socklist[[n]]): error reading from connection>
ctlamb commented 6 years ago

Thanks @brnleehng

OK I'v turned off auto deletion and added the argument to halt the merging process


test <- foreach(i = 1:4,.packages = c("dismo", "gbm"), .options.azure = list(enableMerge = FALSE)) %dopar% {
   ##run model
   a <- gbm.step(data, gbm.x=c(2:9), gbm.y=1, tree.complexity=param[i,1],learning.rate=param[i,2],bag.fraction=0.65, family="gaussian", plot.main = FALSE, verbose=FALSE, silent=TRUE)
    return(a)
  }

What I get when I run this now is

Error in storageClient$blobOperations$uploadBlob(id, paste0(getwd(), "/",  : 
  could not find function "uploadChunk"

An error I've had before, which I solved by reducing my foreach statement from: foreach(i = 1:4,.packages = c("dismo", "gbm", "tidyverse"), .errorhandling="pass", .verbose=TRUE) %dopar% {

To

foreach(i = 1:4,.packages = c("dismo", "gbm", "tidyverse")) %dopar% {

Seems these extra arguments in the foreach cause an issue, any thoughts?

brnleehng commented 6 years ago

By default, the foreach package will attempt to export variables that it will use in the foreach iteration. If the export environment becomes large, it will need to split it into chunks for each upload in Azure Storage.

The uploadChunk has an issue with authentication. Will be working on the fix.

https://github.com/Azure/rAzureBatch/issues/58

ctlamb commented 6 years ago

Thanks, Brian. So should I sit tight for now, or is there a way I can get the example I provided above to work in the meantime?

brnleehng commented 6 years ago

Hi @ctlamb

Yes, I'm still working on a fix. There was a regression when I converted rAzureBatch package to use R6 classes. I'll update this thread once the fix is done.

Thanks, Brian

ctlamb commented 6 years ago

OK thanks, Brian. Appreciate it

Cheers, Clayton

brnleehng commented 6 years ago

Hi @ctlamb

I've updated rAzureBatch package. Let me know if you are running into any other issues. https://github.com/Azure/rAzureBatch/pull/59

Thanks, Brian

ctlamb commented 6 years ago

Awesome, thanks @brnleehng.

I've re-ran my example above, which now succeeds to run without throwing the "upload chunk" error, but the same merge error still pops up, even with .options.azure = list(enableMerge = FALSE) enabled. Any thoughts?

> ##RUN MODEL
> test <- foreach(i = 1:4,.packages = c("dismo", "gbm"), .options.azure = list(enableMerge = FALSE)) %dopar% {
+    ##run model
+    a <- gbm.step(data, gbm.x=c(2:9), gbm.y=1, tree.complexity=param[i,1],learning.rate=param[i,2],bag.fraction=0.65, family="gaussian", plot.main = FALSE, verbose=FALSE, silent=TRUE)
+     return(a)
+   }
=========================================================================================================================================================================
Id: job20180713164810
chunkSize: 1
enableCloudCombine: TRUE
packages: 
    dismo; gbm; 
errorHandling: stop
wait: TRUE
autoDeleteJob: FALSE
=========================================================================================================================================================================
Submitting tasks (4/4)
Submitting merge task. . .
Job Preparation Status: Package(s) being installed.............
Waiting for tasks to complete. . .
| Progress: 100.00% (4/4) | Running: 0 | Queued: 0 | Completed: 4 | Failed: 0 |
Tasks have completed. Merging results...................An error has occurred in the merge task of the job 'job20180713164810'. Error handling is set to 'stop' and has proceeded to terminate the job. The user will have to handle deleting the job. If this is not the correct behavior, change the errorhandling property to 'pass'  or 'remove' in the foreach object. Use the 'getJobFile' function to obtain the logs. For more information about getting job logs, follow this link: https://github.com/Azure/doAzureParallel/blob/master/docs/40-troubleshooting.md#viewing-files-directly-from-compute-nodeError in e$fun(obj, substitute(ex), parent.frame(), e$data) : 
  object 'results' not found
brnleehng commented 6 years ago

Hi @ctlamb

I realized the Azure foreach options documentation are incorrect. The enableMerge flag was changed to enableCloudCombine. (https://github.com/Azure/doAzureParallel/pull/288)

I'm currently running the same model. I'll confirm if the job runs successfully or not.

Brian

ctlamb commented 6 years ago

Thanks @brnleehng,

If you have any insight in how to manually merge after too that would be helpful. I don’t have a sense what the non-merged output will look like, but the standard merged output works well for my purposes so I’m sure I can manually replicate that.

ctlamb commented 6 years ago

Hi @brnleehng just following up to see how the model ran?

brnleehng commented 6 years ago

@ctlamb

Yes, the model ran successfully. We will attempt to combine results local with the getJobResult (Note: there could be issues of running out of memory when reading the files in R).

getJobResult("job20180717191258")

Brian

ctlamb commented 6 years ago

Thanks @brnleehng. Really appreciate your persistence in helping to resolve this issue. I'm running models now and will attempt to combine with the getJobResult() you mention above. Do you see anyway to fix the merge task or will doAzureParallel always throw errors when merging these types of results?

As a side note, I wonder if I could just have the VM's upload the results somewhere in a public repository where I could snag them.

ctlamb commented 6 years ago

@brnleehng, an update:

The models ran as usual and failed in the same spot:

| Progress: 100.00% (4/4) | Running: 0 | Queued: 0 | Completed: 4 | Failed: 0 |
Tasks have completed. Merging results....................An error has occurred in the merge task of the job 'job20180718050337'. Error handling is set to 'stop' and has proceeded to terminate the job. The user will have to handle deleting the job. If this is not the correct behavior, change the errorhandling property to 'pass'  or 'remove' in the foreach object. Use the 'getJobFile' function to obtain the logs. For more information about getting job logs, follow this link: https://github.com/Azure/doAzureParallel/blob/master/docs/40-troubleshooting.md#viewing-files-directly-from-compute-nodeError in e$fun(obj, substitute(ex), parent.frame(), e$data) : 
  object 'results' not found

I ran getJobResult() after the model finished, but I received an error message:

> getJobResult("job20180718050337")
Getting job results...
Error in getJobResult("job20180718050337") : 
  job job20180718050337 has failed tasks and error handling is set to 'stop', no result will be available
brnleehng commented 6 years ago

Hi @ctlamb,

It looks like you have the enableCloudCombine flag set to TRUE which is returning the original error where parallel reads are causing the merge task to fail. You will be able to use getJobResult function to manually merge all of the results locally.

Also set your auto delete job to FALSE or else getJobResult will not find the results because we delete the folder.

setAutoDeleteJob(FALSE)

The VMs will always push your results into a Azure storage container (With the same name as the job id). We've created some built-in Azure storage container management functions. For more information, https://github.com/Azure/doAzureParallel/blob/master/docs/73-managing-storage.md

This workflow to download all your results and merge your results yourself.

Example:

results <- list()
files <- listStorageFiles("job20180718180611")

View(files)

getStorageFile(container = "job20180718180611", blobPath = "results/4-result.rds", downloadPath = "4-result.rds")

results[[1]] <- readRDS("4-result.rds")

Let me know if you have any more questions

Thanks! Brian

ctlamb commented 6 years ago

@brnleehng this all worked. I can't thank you enough!!

brnleehng commented 6 years ago

Awesome to hear that! Feel free to open another issue if you run into any other problems