ropensci / drake

An R-focused pipeline toolkit for reproducibility and high-performance computing
https://docs.ropensci.org/drake
GNU General Public License v3.0
1.34k stars 129 forks source link

Reproducible remote data sources #252

Closed noamross closed 6 years ago

noamross commented 6 years ago

A frequent challenge to both efficient, reproducible, and compact workflows is acquiring data from external sources. For instance, on a current project, I download Facebook's pre-trained fasttext word embedding model which is ~6GB. This takes a while. I don't want to include this file in the repository, instead putting code in the repo so collaborators get it as part of the workflow. I may or may want to update this when the remote sources changes. If I do, unfortunately, I need to download the whole thing to hash the file and check if it is different than before.

However, for many cases, there are strong heuristics that would allow me to check if the remote file is updated prior deciding whether to download. I think drake, or a small helper package that drake would use, seems a great home for functions to use these heuristics.

Some heuristics may be available in a file's content header which can be queried via a HEAD() call without downloading a file. These include:

If the file is on an FTP server, the modified date and size should be available as well.

Furthermore, based on parsing the URL itself, one could ascertain information on the file state for many important sources.:

So, if drake detects a URL in a work plan, like this

drake_plan(
     'crawl-300d-2M.vec.zip' = curl::curl_download("https://s3-us-west-1.amazonaws.com/fasttext-vectors/crawl-300d-2M.vec.zip")
 )

It could run the url against something like spyglass::check_remote_file(url, spyobj), where spyobj holds all the available heuristic checks. Checking against this object would tell you if the file has changed, and also what heuristics it is based on. drake could keep spyobj in the cache for the next lookup. drake users could decide on heuristics they would accept as a work plan option (hash or date-modified + size probably being the default).

(spyglass package name comes free.)

OK, maybe this is a bit out of scope and we should build this package separately, but interested in your thoughts!

Tagging @sckott and @willpearse based on their suppdata package and related DOI-parsing packages

wlandau commented 6 years ago

@noamross this is such an important point. As you say, drake may not be the place to build in this functionality, but I think it should at least have more guidance. The best practices guide and the cranlogs example should talk about remote data, and there could even be a whole separate vignette on remote data and big data.

I think I have a solution you can use right now. Does the code below cover it? The general principle applies to all remote data, and the download trigger could vary: you could pick a DOI, a time stamp, a hash, or something else. (Related: see triggers in workflow plans.) Also, how do you get a hash from HEAD()? Below, I use the time stamp, but the hash would be much better.

# Goal: get some cran download logs
# and only do a download when the remote time stamp changes.

library(drake)
library(curl)
library(httr)
library(R.utils)

url <- "http://cran-logs.rstudio.com/2018/2018-02-09-r.csv.gz"
query <- HEAD(url)

# The timestamp is an imported object.
# I use it as a dependency of the target.
# Every time you run this R script, we get a new timestamp.
# We will download from the url if and only if:
# - The timestamp changed.
# - The URL changed.
# - The code changed nontrivially.
# - We don't already have the target.
timestamp <- query$headers[["last-modified"]]
timestamp

## [1] "Mon, 12 Feb 2018 16:34:48 GMT"

# The ... is for other dependencies that should trigger new downloads.
get_logs <- function(url, ...){
  curl_download(url, "logs.csv.gz")       # Get a big file.
  gunzip("logs.csv.gz", overwrite = TRUE) # Unzip it.
  out <- read.csv("logs.csv", nrows = 4)  # Extract the data you need.
  unlink(c("logs.csv.gz", "logs.csv"))    # Remove the big files
  out                                     # Value of the target.
}

cranlogs_plan <- drake_plan(
  # The timestamp is a dependency, so it triggers a new download.
  # You could just put the body of get_logs() here,
  # but it makes it harder to read.
  logs = get_logs(url, timestamp)
)

# Let's see what the graph looks like.
config <- drake_config(cranlogs_plan)
vis_drake_graph(config)

dl1

make(cranlogs_plan)

## ...
## target logs

readd(logs)

## date     time     size version  os country ip_id
## 1 2018-02-09 13:01:13 82375220   3.4.3 win      RO     1
## 2 2018-02-09 13:02:06 74286541   3.3.3 win      US     2
## 3 2018-02-09 13:02:10 82375216   3.4.3 win      US     3
## 4 2018-02-09 13:03:30 82375220   3.4.3 win      IS     4

# The logs are up to date now.
vis_drake_graph(config)

dl2

# But what if our timestamp changes?
timestamp <- "new day"
outdated(config)

## [1] "logs"
# Fun fact: you could also use the timestamp
# as a dependency of get_logs() and `logs` will
# react the same way.

# The ... is for other dependencies that should trigger new downloads.
get_logs <- function(url){
  timestamp                               # Now a dependency of get_logs()
  curl_download(url, "logs.csv.gz")       # Get a big file.
  gunzip("logs.csv.gz", overwrite = TRUE) # Unzip it.
  out <- read.csv("logs.csv", nrows = 4)  # Extract the data you need.
  unlink(c("logs.csv.gz", "logs.csv"))    # Remove the big files
  out                                     # Value of the target.
}

cranlogs_plan <- drake_plan(
  # The timestamp is a dependency, so it triggers a new download.
  # You could just put the body of get_logs() here,
  # but it makes it harder to read.
  logs = get_logs(url)
)

config <- drake_config(cranlogs_plan)
vis_drake_graph(config) # The `timestamp` import points to `get_logs` now.

od

outdated(config)

## [1] "logs"

make(cranlogs_plan)

## target logs

outdated(config)

## character(0)

make(cranlogs_plan)

## All targets are already up to date.

# Drake knows that the new timestamp affects get_logs,
# which puts all downstream targets into question.
timestamp <- "yet another day"
outdated(config)

## [1] "logs"

make(cranlogs_plan)

## target logs
wlandau commented 6 years ago

Rather than use timestamp as an imported object, you could also put it in the workflow plan and assign the "always" trigger to make sure it runs every time.

cranlogs_plan <- drake_plan(
  timestamp = HEAD(url)$headers[["last-modified"]],
  logs = get_logs(url, timestamp),
  strings_in_dots = "literals"
) 
cranlogs_plan$trigger = c("always", "any")
cranlogs_plan

##      target                              command trigger
## 1 timestamp HEAD(url)$headers[["last-modified"]]  always
## 2      logs             get_logs(url, timestamp)     any

triggers()

## [1] "always"  "any"     "command" "depends" "file"    "missing"
noamross commented 6 years ago

Thanks your response, Will. I really like the last iteration here, and I'll think about how it could be simplified with a function in another package like spyglass::remote_file_status().

wlandau commented 6 years ago

I'm glad I could help, and I'm glad you brought it up. Drake reacts to things, but you have to tell it what to react to. This can be difficult in the current landscape of tools, and a new spyglass package would certainly make it easier to create the right triggers.

As I dug into HEAD() to try (unsuccessfully) to find the hash you mentioned, I was reminded of #6. For a while, I was looking for a tool that could fingerprint a package just for the sake of detecting changes in functionality, and I do not think such a tool exists. Package versions are too lax, and GitHub hashes are too conservative and not always available. This approach would have ended up making drake projects too brittle, so I decided to just recommend packrat instead.

willpearse commented 6 years ago

I've not got much to add other than this looks like a useful thing to have. I'm not quite sure how it would integrate with suppdata, but....

In suppdata any attempt to grab a file from a DOI returns a character with a local path to that file, and that character can (and often does!) have attributes. I can see last-modified dates and/or hashes, where they existed, being useful if you wanted to use spyglass downstream.

I'd also love to use spyglass in suppdata, of course, which (from the sounds of it) wouldn't be too tricky to do. I probably wouldn't make it the default in suppdata for the sake of making things easy as possible for beginners, but I can obviously see it being something people would want to use eventually!

Nice idea!!!

sckott commented 6 years ago

Another option is an etag https://en.wikipedia.org/wiki/HTTP_ETag if provided by the server

Data at a DOI should be fixed, but may be updated if the DOI is versioned

I don't think that's true. A DOI is only a persistent URI itself, and it can resolve to different URIs and I don't think the files at those URIs are necessarily unchanged


There's also cases in which remote files are very very unlikely to ever change, probably any in FTP servers such as all those at least US federal govt FTP servers that only change in that they add new files.

wlandau commented 6 years ago

Thankfully, HEAD() already gives us etags.

library(httr)
url <- "http://cran-logs.rstudio.com/2018/2018-02-09-r.csv.gz"
query <- HEAD(url)
query$headers$etag

## [1] "\"709379826e84e42f699b055f914313d7\""

But the apparent lack of a standard concerns me. From the wiki:

The method by which ETags are generated has never been specified in the HTTP specification.

Common methods of ETag generation include using a collision-resistant hash function of the resource's content, a hash of the last modification timestamp, or even just a revision number.

noamross commented 6 years ago

Well, if we cache the URL and the etag, then we at least avoid collisions between different sources. We could also have a minimum etag character count or something to avoid simple revision numbers.

wlandau commented 6 years ago

Finally got around to writing an example in the best practices guide, also referenced from the cranlogs example.

tiernanmartin commented 6 years ago

Thanks for documenting this, @wlandau !

I'm currently working on a project where some of the datasets need to be frequently edited by collaborators and are stored in Google Drive. I'm not sure if any other drake users are experimenting with the googledrive package, but I found the following pattern helpful for making sure that I only download data that has been recently modified:

library(googledrive)
library(drake)
library(tidyverse)

drive_get_datetime_modified <- function(dr_id_string){

  dr_id <- as_id(dr_id_string)

  datetime_modified <- dr_id %>% 
    drive_get %>% 
    mutate(modified = lubridate::as_datetime(map_chr(drive_resource, "modifiedTime"))) %>% 
    pull

  return(datetime_modified)
}

load_data <- function(trigger, dr_id, path){

  drive_download(dr_id, path)

  dat <- read_csv(path)

  return(dat)
}

chicken_csv <- drive_upload(
  drive_example("chicken.csv"),
  "chicken-upload.csv"
)

plan <- drake_plan(
  dataset_a_id = as_id("drive-id-string-REDACTED"),
  dataset_a_trigger = drive_get_datetime_modified(dataset_a_id),
  dataset_a = load_data(dataset_a_trigger, dataset_a_id, "chicken-upload.csv"),
  strings_in_dots = "literals"
) %>% 
  mutate(trigger = if_else(str_detect(target,"trigger"), "always", drake::default_trigger()))

The plan contains a trigger column that makes sure that targets whose names contain "trigger" are always run:

plan

# # A tibble: 3 x 3
#   target            command                                                              trigger
#   <chr>             <chr>                                                                <chr>  
# 1 dataset_a_id      "as_id(\"drive-id-string-REDACTED\")"                       any    
# 2 dataset_a_trigger drive_get_datetime_modified(dataset_a_id)                            always 
# 3 dataset_a         "load_data(dataset_a_trigger, dataset_a_id, \"chicken-upload.csv\")" any    

Running the plan twice in a row works as expected: the data is downloaded the first time but not the second (unless the file is modified in Google Drive):


make(plan)

# cache C:\Users\UrbanDesigner\Documents\tmp\test_proj\.drake
# connect 4 imports: chicken_csv, load_data, drive_get_datetime_modified, plan
# connect 3 targets: dataset_a_id, dataset_a_trigger, dataset_a
# check 9 items: as_id, drive_download, drive_get, lubridate::as_datetime, map_chr, mutate, pull, rea...
# check 2 items: drive_get_datetime_modified, load_data
# check 1 item: dataset_a_id
# target dataset_a_id
# check 1 item: dataset_a_trigger
# target dataset_a_trigger: trigger "always"
# check 1 item: dataset_a
# target dataset_a
# Target dataset_a messages:
#   File downloaded:
#   * chicken-upload.csv
# Saved locally as:
#   * chicken-upload.csv
#   Parsed with column specification:
# cols(
#   chicken = col_character(),
#   breed = col_character(),
#   sex = col_character(),
#   motto = col_character()
# )
# Used non-default triggers. Some targets may be not be up to date.

make(plan)

# cache C:/Users/UrbanDesigner/Documents/tmp/test_proj/.drake
# Unloading targets from environment:
#   dataset_a_trigger
#   dataset_a_id
#   dataset_a
# connect 4 imports: chicken_csv, load_data, drive_get_datetime_modified, plan
# connect 3 targets: dataset_a_id, dataset_a_trigger, dataset_a
# check 9 items: as_id, drive_download, drive_get, lubridate::as_datetime, map_chr, mutate, pull, rea...
# check 2 items: drive_get_datetime_modified, load_data
# check 1 item: dataset_a_id
# check 1 item: dataset_a_trigger
# load 1 item: dataset_a_id
# target dataset_a_trigger: trigger "always"
# check 1 item: dataset_a
# Used non-default triggers. Some targets may be not be up to date.

I just thought I'd share this in case others are doing something similar.

wlandau commented 6 years ago

Very cool, @tiernanmartin! Thank you for sharing. I like how you set up the triggers with one clever mutate() call. Really slick.

ldecicco-USGS commented 5 years ago

I was gleeful when I found this Issue since I needed to do the same thing. Please correct me if I'm wrong: is a more "modern" drake workflow (for others coming to this issue from google):

library(drake)
library(tidyverse)
library(googledrive)

drive_get_datetime_modified <- function(dr_id_string){

  dr_id <- as_id(dr_id_string)

  datetime_modified <- dr_id %>% 
    drive_get %>% 
    mutate(modified = lubridate::as_datetime(purrr::map_chr(drive_resource, "modifiedTime"))) %>% 
    pull

  return(datetime_modified)
}

drive_download_gd <- function(dr_id, path){

  drive_download(dr_id, path)

  if(file.exists(path)){
    return(path)
  } else {
    return(NULL)
  }

}

data_setup_plan <- drake_plan(

  dataset_a_id = as_id("drive-id-string-REDACTED"),
  dataset_a_trigger = drive_get_datetime_modified(dataset_a_id),
  dataset_a_path = target(command = drive_download_gd(dataset_a_id, path = "data/raw/dataset_a.xlsx"),
                              trigger = trigger(change = dataset_a_trigger)),
  dataset_a = readxl::read_excel(dataset_a_path,
                         sheet = "My sheet"),
  strings_in_dots = "literals"

)

make(data_setup_plan)
wlandau commented 5 years ago

I like where this is going, and your code does use drake's latest trigger API. A couple suggestions:

data_setup_plan <- drake_plan(
  latest_dataset_id = target(
    command = get_latest_id("drive-id-prefix"),
    trigger = trigger(condition = TRUE) # If the ID is going to change, we need to check it every time.
  )
  dataset_a_path = target(
    command = drive_download_gd(latest_dataset_id, path = "data/raw/dataset_a.xlsx"),
    trigger = trigger(change = drive_get_datetime_modified(latest_dataset_id))
  ),
  ...
)

Alternatively, the ID and modification time could just be imported objects that you pre-compute before every make().

latest_dataset_id <- get_latest_id("drive-id-prefix")
time_stamp <- drive_get_datetime_modified(latest_dataset_id)

data_setup_plan <- drake_plan(
  dataset_a_path = drive_download_gd(
    latest_dataset_id,
    time_stamp = time_stamp, # To make sure time_stamp is a dependency so changes trigger downstream builds
    path = "data/raw/dataset_a.xlsx"
  ),
  ...
)

make(data_setup_plan)

or equivalently:

latest_dataset_id <- get_latest_id("drive-id-prefix")
time_stamp <- drive_get_datetime_modified(latest_dataset_id)

data_setup_plan <- drake_plan(
  dataset_a_path = target(
    command = drive_download_gd(
      latest_dataset_id,
      path = "data/raw/dataset_a.xlsx"
    ),
    trigger = trigger(change = time_stamp)
  ),
  ...
)

make(data_setup_plan)

Since you need the ID and the time stamp at the beginning of the pipeline, it is entirely up to you whether they are targets or pre-computed imported objects. If putting everything in the plan increases confidence in reproducibility, a that's a good reason to make them targets. On the other hand, if you are on a cluster and you run make(parallelism = "future"), you may want to avoid the extra overhead of submitting transient workers just to run tiny jobs for dataset_a_id and dataset_a_trigger.

ldecicco-USGS commented 5 years ago

I removed a bit of that last comment because I had it wrong.

I'm unable to convince the workplan to re-download the file with the script above.

I just tested by uploading a new version of the "dataset_a" file to Google Drive. When I run in R, I do get the newly updated time:

dataset_a_trigger = drive_get_datetime_modified(dataset_a_id)
> dataset_a_trigger 
[1] "2018-12-14 21:07:07 UTC"

But if I do it via drake:

make(data_setup_plan)
Unloading targets from environment:
  dataset_a_id
  dataset_a_trigger
All targets are already up to date.
> readd(dataset_a_trigger)
[1] "2018-12-14 19:13:14 UTC"

Which was the earlier modified time....so the file doesn't feel the need to re-download.

wlandau commented 5 years ago

If we are talking about the data_setup_plan from https://github.com/ropensci/drake/issues/252#issuecomment-447438909, then yes, that makes sense. In you original data_setup_plan, there is a target already called dataset_a_trigger. So when you call make(data_setup_plan), drake first unloads the dataset_a_trigger object from your environment (the one with the value of "2018-12-14 21:07:07 UTC") because the name "dataset_a_trigger" conflicts with a target name in the plan. Then, make() reaches the dataset_a_trigger target (previously built with a value of "2018-12-14 19:13:14 UTC") and it thinks the target is already up to date because there is no trigger telling it to check Google Drive.

wlandau commented 5 years ago

So to use dataset_a_trigger as an imported object instead, we must remove the dataset_a_trigger target in data_setup_plan.

ldecicco-USGS commented 5 years ago

OK, I think I'm getting things moving..... This is what I've got that seems to be working.

library(drake)
library(tidyverse)
library(googledrive)

drive_get_datetime_modified <- function(dr_id){
  datetime_modified <- dr_id %>% 
    drive_get %>% 
    mutate(modified = lubridate::as_datetime(purrr::map_chr(drive_resource, "modifiedTime"))) %>% 
    pull

  return(datetime_modified)
}

drive_download_gd <- function(dr_id, path, time_stamp){
  drive_download(dr_id, path, overwrite = TRUE)

  if(file.exists(path)){
    return(path)
  } else {
    return(NULL)
  }  
}

dataset_a_id = as_id("1XQAlkUCvzHjBBoIbxVY1E6bsXxG3wSJj")
last_modified_dataset_a <- drive_get_datetime_modified(dataset_a_id)

data_setup_plan <- drake_plan(

  a_download = target(command = drive_download_gd(dataset_a_id,
                                                      path = file_out("data/raw/dataset_a.xlsx"),
                                                      time_stamp = last_modified_dataset_a),
                          trigger = trigger(change = last_modified_dataset_a)),
  dataset_a = readxl::read_excel(a_download, sheet = "some_sheet"),
  strings_in_dots = "literals"
)

make(data_setup_plan) 

Seems to be working great. A few things I don't understand...

  1. Should I be wrapping the "path" argument with file_out in the drive_download_gd target?
  2. Why can't I wrap the path in the read_excel function with file_in? When I do, it gives me the error:
    target dataset_a
    fail dataset_a
    Error: Target `dataset_a` failed. Call `diagnose(dataset_a)` for details. Error message:
    object 'a_download' not found

    (but if I run it as I have it above, it works fine)

  3. Just to make sure I understand, you recommend taking small "target"s out and have them run outside the plan (pre-computed imported objects) to optimize HTC/parallel runs, correct? If I'm not running stuff in parallel, then it's better for reproduciblity, correct? Is there more overhead to worry about on the targets if it's all linear?

Thanks!

wlandau commented 5 years ago

OK, I think I'm getting things moving..... This is what I've got that seems to be working.

Awesome!

A couple initial comments on your code:

Should I be wrapping the "path" argument with file_out in the drive_download_gd target?

Yes, that part of your code looks great.

Why can't I wrap the path in the read_excel function with file_in? When I do, it gives me the error...

Ah, this is interesting and tricky. Are you talking about something like file_in(download_a)? Unfortunately, drake requires that you use the literal path: file_in("data/raw/dataset_a.xlsx"). The reason is that drake needs to resolve the full dependency structure, including all your files, before it starts to build targets, and it cannot predict the value of download_a in advance because it does not run your code in advance. Relevant: https://github.com/ropensci/drake/issues/353, https://stackoverflow.com/questions/50725436/r-drake-file-out-name-with-variable.

In this respect, you have two main options for your plan:

data_setup_plan <- drake_plan(
  a_download = drive_download_gd(
    dataset_a_id,
    path = file_out("data/raw/dataset_a.xlsx"),
    time_stamp = last_modified_dataset_a
  ),
  dataset_a = readxl::read_excel(a_download, sheet = "some_sheet"),
  strings_in_dots = "literals"
)

or

data_setup_plan <- drake_plan(
  a_download = drive_download_gd(
    dataset_a_id,
    path = file_out("data/raw/dataset_a.xlsx"),
    time_stamp = last_modified_dataset_a
  ),
  dataset_a = readxl::read_excel(
    file_in("data/raw/dataset_a.xlsx"),
    sheet = "some_sheet"
  ),
  strings_in_dots = "literals"
)

Both options should work in your case, but I recommend the latter because it makes the dependency relationships more explicit. Plus, the graph from vis_drake_graph(show_output_files = TRUE) is more useful.

Just to make sure I understand, you recommend taking small "target"s out and have them run outside the plan (pre-computed imported objects) to optimize HTC/parallel runs, correct? If I'm not running stuff in parallel, then it's better for reproduciblity, correct? Is there more overhead to worry about on the targets if it's all linear?

Correct on all counts. Converting small targets to imports usually reduces HPC-related overhead, and converting imports to targets is more pedantic in the reproducibility sense.

On the machines I work with, overhead starts to get a little irritating when you go from 100 to 1000 targets, and it's a bit more serious when you go from 1000 to 10000. But other than that, if you are not working with HPC systems, then moving a handful of objects in or out of the plan should not make much of a difference.

The linearity of the build sequence is just determined by the dependency structure, and in this case, it is the same regardless of whether the time stamp is a target or an import.