DOI-USGS / ds-pipelines-targets-example-wqp

An example targets pipeline for pulling data from the Water Quality Portal (WQP)
Other
10 stars 14 forks source link

Force a retry when a query hangs? #77

Closed lindsayplatt closed 1 year ago

lindsayplatt commented 2 years ago

I've had a couple of instances with the national pull pipeline I am working on gets stuck on a single query (I know it is stuck and not jsut taking awhile because it is only for 1 site and 1 parameter and we are waiting upwards of 1 hour). This has happened quite a few times and simply killing and then restarting the pipeline gets me passed the issue.

I am wondering if we need to implement our own timeout of sorts in the pull_data() function that initiates a retry when something gets stuck for too long. I have been playing around with code like this but haven't yet figured something out that does what I want:

# When all of the retries take > timeout, the entire retry command throws an error and stops
# I want something where the timeout would trigger a retry, so I am nesting these

# In this example, I am trying to get the outside retry to retry when the timeout happens
# I can't seem to get it to retry at all
n <- 0
retry::retry({
  n <- n + 1
  retry::retry({
    Sys.sleep(3) # this is my simulated download
  }, when = "Error:", timeout = n)
}, when = "Error:", max_tries = 5)
lekoenig commented 2 years ago

@lindsayplatt are you getting an error message when this happens (e.g. 404 or 500 errors), or is the build just stalling out? I'm guessing the latter, but wanted to check.

lindsayplatt commented 2 years ago

I haven't found an actual error, just seeing it not making any progress for some time.

lekoenig commented 2 years ago

@lindsayplatt I tried your reprex and think the issue is that the timeout is always exceeded. If we add some stochasticity to the n assignment the wrapped retry approach seems to work, although it sometimes doesn't succeed within max_tries:

> n <- 0
> retry::retry({
+     n <- rnorm(1, mean = 2, sd = 0.25) + 1
+     retry::retry({
+         Sys.sleep(3) # this is my simulated download
+     }, when = "Error:", timeout = n)
+ }, when = "Error:", max_tries = 5)
Error in `Sys.sleep()`:
! reached elapsed time limit
Run `rlang::last_error()` to see where the error occurred.
> 
> n <- 0
> retry::retry({
+     n <- rnorm(1, mean = 2, sd = 0.25) + 1
+     retry::retry({
+         Sys.sleep(3) # this is my simulated download
+     }, when = "Error:", timeout = n)
+ }, when = "Error:", max_tries = 5)
> 
> n
[1] 3.475201
>

Before I noticed this (👆) about the wrapped retry approach, I briefly tried using helpers from purrr for this use case. First, I defined a pseudo-download function that mimics what we would have in fetch_wqp_data():

# x is the target "download time" that it takes to complete our function and
# calculate 2+2. This is meant to be a placeholder for the dataRetrieval call.
# Because of our use of rnorm(), the actual download time will sometimes be
# greater than x and sometimes less.
download_data <- function(x){
  time <- Sys.sleep(rnorm(1,mean=x,sd=0.25))
  2+2
}

# pull_data() is the function we would define and call in fetch_wqp_data(),
# currently line 245 in 2_download/fetch_wqp_data.R:
pull_data <- function(actual_download_time, timeout_seconds){
  retry::retry(
  expr = download_data(actual_download_time), # dataRetrieval::readWQP() goes here
  when = "Error:", 
  timeout = timeout_seconds)
}

The code below adds a dependency to our fetch_wqp_data function but might be useful if we want to get fancy and implement delays (fixed or exponential backoff) between attempts. Otherwise, I think we can stick to using retry.

# The code below defines a *modified* function that attempts
# to run pull_data() up to the maximum number of attempts, 
# waiting 1 second between attempts.
pull_data_w_retries <- purrr::insistently(
  f = pull_data, 
  rate = purrr::rate_delay(1, max_times = max_tries)
)

max_tries <- 5
pull_data_w_retries(actual_download_time = 3, timeout_seconds = 2.9)