ropensci / targets

Function-oriented Make-like declarative workflows for R
https://docs.ropensci.org/targets/
Other
927 stars 73 forks source link

Seamless integration with AWS S3 buckets #154

Closed wlandau closed 4 years ago

wlandau commented 4 years ago

Prework

After playing around with Metaflow's sandbox, I think there are two aspects of Metaflow-like cloud/HPC that we want in targets.

  1. AWS Batch as an HPC scheduler, which could happen through #152, https://github.com/mschubert/clustermq/issues/102, or https://github.com/HenrikBengtsson/future/issues/415. Fortunately, the existing tight integration traditional HPC systems like SLURM gives targets a head start here.
  2. Seamless data storage in S3 buckets.

This issue is about (2).

How Metaflow does it

I have yet to learn how to set up a serious batch computing environment on AWS. But locally, all one needs to do is set a bunch of environment variables in ~/.metaflow/config.yson. Then, any flow called locally will automatically store all the data to S3 and retrieve it when needed, regardless of whether the steps use decorators for AWS Batch. As far as I can tell, the data never needs to touch down locally. I am not sure how much of this behavior or these env vars differ outside the sandbox, but I suspect things are similar enough.

{
    "METAFLOW_AWS_SANDBOX_API_KEY": "***",
    "METAFLOW_AWS_SANDBOX_ENABLED": true,
    "METAFLOW_AWS_SANDBOX_INTERNAL_SERVICE_URL": "***",
    "METAFLOW_AWS_SANDBOX_REGION": "***",
    "METAFLOW_AWS_SANDBOX_STACK_NAME": "***",
    "METAFLOW_BATCH_CONTAINER_REGISTRY": "***",
    "METAFLOW_BATCH_JOB_QUEUE": "***",
    "METAFLOW_DATASTORE_SYSROOT_S3": "***",
    "METAFLOW_DEFAULT_DATASTORE": "s3",
    "METAFLOW_DEFAULT_METADATA": "service",
    "METAFLOW_ECS_S3_ACCESS_IAM_ROLE": "***",
    "METAFLOW_EVENTS_SFN_ACCESS_IAM_ROLE": "***",
    "METAFLOW_SERVICE_URL": "***",
    "METAFLOW_SFN_DYNAMO_DB_TABLE": "***",
    "METAFLOW_SFN_IAM_ROLE": "***"
}

Proposal for targets: more formats

In targets, storage classes such as fst and keras support custom methods for saving, loading, hashing, and serialization. I think it would be straightforward and elegant to write AWS versions of most of these. If we export the S3 generics, offload store_formats() to individual methods, and use S3 dispatch in the constructors of subclasses, we could even put these new methods in a new package (say, targets.aws.s3). Because the number of extra exports is so small, I doubt we will run into the same problem as https://github.com/wlandau/targets/issues/148#issuecomment-688295596.

If we're talking about just S3 storage, I think this approach will be far smoother than https://github.com/wlandau/tarchetypes/issues/8 or https://github.com/wlandau/tarchetypes/issues/11 in large pipelines. It also opens up possibilities for remote storage interaction. cc @mdneuzerling, @MilesMcBain, @noamross.

wlandau commented 4 years ago

To clarify, the idea is to simply use targets like normal except with different format settings. So instead of this:

# _targets.R
library(targets)
tar_pipeline(
  tar_target(raw_data_file, "data/raw_data.csv", format = "file" ),
  tar_target(raw_data,  read_csv(raw_data_file, col_types = cols())),
  tar_target(
    data,
    raw_data %>%
      mutate(Ozone = replace_na(Ozone, mean(Ozone, na.rm = TRUE)))
  ),
  tar_target(hist, create_plot(data)),
  tar_target(fit, biglm(Ozone ~ Wind + Temp, data))
)

we just write this:

# _targets.R
library(targets)
library(targets.aws.s3)
tar_pipeline(
  tar_target(raw_data_file, "data/raw_data.csv", format = "file" ),
  tar_target(raw_data,  read_csv(raw_data_file, col_types = cols()), format = "aws_fst_tbl"),
  tar_target(
    data,
    raw_data %>%
      mutate(Ozone = replace_na(Ozone, mean(Ozone, na.rm = TRUE))),
    format = "aws_fst_tbl"
  ),
  tar_target(hist, create_plot(data), format = "aws_qs"),
  tar_target(fit, biglm(Ozone ~ Wind + Temp, data), format = "aws_qs")
)

or even just this:

# _targets.R
library(targets)
library(targets.aws.s3)
tar_option_set(format = "aws_qs")
tar_pipeline(
  tar_target(raw_data_file, "data/raw_data.csv", format = "file" ),
  tar_target(raw_data,  read_csv(raw_data_file, col_types = cols())),
  tar_target(
    data,
    raw_data %>%
      mutate(Ozone = replace_na(Ozone, mean(Ozone, na.rm = TRUE)))
  ),
  tar_target(hist, create_plot(data)),
  tar_target(fit, biglm(Ozone ~ Wind + Temp, data))
)

and the data will go to a bucket (lots of paws magic in the backend). Combined with storage = "remote" and retrieval = "remote" on a cluster or #152, the data need not arrive at one's local machine.

wlandau commented 4 years ago

...we could even put these new methods in a new package (say, targets.aws.s3). Because the number of extra exports is so small, I doubt we will run into the same problem as https://github.com/wlandau/targets/issues/148#issuecomment-688295596.

I take that back. We would need to export methods from the file class, as well as assertion functions, and that's already too exposed. But the good news is that formats are cheap: modular and not a whole lot of code.

wlandau commented 4 years ago

But I still went through most of the prework, using S3 dispatch to break apart store_formats() and for store subclass constructors. This will make it easier to develop and maintain a large number of new formats.

wlandau commented 4 years ago

I am now comfortable with the relevant parts of the AWS console and CLI for S3. Next to learn: AWS S3 web API and regular curl, closely followed by R curl. I am almost positive this whole feature set is just a simple matter of figuring out the right calls to PUT, HEAD, and GET. I think we can use curl directly for this without having to go through httr or paws.

wlandau commented 4 years ago

Also, the "eventual" part of AWS eventual consistency means that if I overwrite a target, there may be a delay until the new target becomes available: https://stackoverflow.com/questions/64073793/etag-availability-guarantees-for-aws-s3-objects/64079706#64079706. So I think we should just hash locally, stick the hash in the metadata, and poll HEAD until the bucket has the right value.

wlandau commented 4 years ago

On second thought, direct use of curl gets a bit too involved. And to upload an object with paws, it looks like we need to readBin() first and handle multipart uploads differently. aws.s3::put_object() looks better suited to the task.

wlandau commented 4 years ago

Uploads are super simple, and multipart = TRUE seems to work even with small files.

aws.s3::put_object(
  file = "object-local-file-path",
  object = "object-key",
  bucket = "bucket-name",
  multipart = TRUE,
  headers = c("x-amz-meta-targets-hash" = "custom-hash")
)
wlandau commented 4 years ago

Getting the custom hash:

tryCatch({
  x <- suppressMessages(aws.s3::head_object(
    object = "object-key",
    bucket = "bucket-name"
  ))
  attr(x, "x-amz-meta-targets-hash")
}, error = function(e) NA_character_)
wlandau commented 4 years ago

And finally, downloading an S3 object:

aws.s3::save_object(
  object = "object-name"
  bucket = "bucket-name",
  file = "file-path"
)

Seems to all work super smoothly.

wlandau commented 4 years ago

Got a sketch in https://github.com/wlandau/targets/tree/154. It fits the existing internals reasonably well. And because of double inheritance (e.g. from classes tar_aws_s3 and tar_rds) it should stay reasonably concise.