mschubert / clustermq

R package to send function calls as jobs on LSF, SGE, Slurm, PBS/Torque, or each via SSH
https://mschubert.github.io/clustermq/
Apache License 2.0
146 stars 27 forks source link

AWS Batch scheduler #208

Closed wlandau closed 3 years ago

wlandau commented 3 years ago

I propose AWS Batch as a new clustermq scheduler. Batch has become extremely popular, especially as traditional HPC is waning. I have a strong personal interest in making Batch integrate nicely with R (ref: https://github.com/wlandau/targets/issues/152, https://github.com/wlandau/tarchetypes/issues/8, https://wlandau.github.io/targets-manual/cloud.html, https://github.com/mschubert/clustermq/issues/102) and I am eager to help on the implementation side.

Batch is super easy to set up through the AWS web console, and I think it would fit nicely into clustermq's existing interface: options(clustermq.scheduler = "aws_batch") and options(clustermq.template = "batch.tmpl"), where batch.tmpl contains an AWS API call with the compute environment, job queue, job definition, and key pair. I think we could use curl directly instead of the much larger and rapidly developing paws package. I think this direct approach could be far more seamless and parsimonious than the existing SSH connector with multiple hosts.

wlandau commented 3 years ago

I think we could use curl directly instead of the much larger and rapidly developing paws package.

Starting to question this, actually. Now that I actually went through an AWS Batch tutorial and successfully ran some R code in a job, I appreciate how straightforward paws functionality is. I think I initially expected paws to provide documentation that is and should be part of AWS.

mschubert commented 3 years ago

AWS is definitely on the list of things I want to support, so thank you for raising this! In principle, it should be straightforward to support as long as you can SSH into your compute container and set up a reverse tunnel (I think).

That said, I've got zero experience with it, so I don't even know which steps I need to consider and I'm afraid I won't be able to read up on this very soon.

Should we coordinate roadmaps a bit with yours and @HenrikBengtsson?

wlandau commented 3 years ago

Awesome! I think all of us care about this, and I would love to coordinate. Also cc @davidkretch.

I have been experimenting some with Batch through the console, and thanks to you I can now send jobs to workers inside a Docker container via SSH. SSH into Batch jobs sounds trickier (ref: https://github.com/paws-r/paws/issues/330, https://stackoverflow.com/questions/64342893/ssh-into-aws-batch-jobs). However, the user guide repeatedly mentions the possibility of SSH for Batch container instances, so I am not convinced that it is impossible.

That said, I've got zero experience with it, so I don't even know which steps I need to consider and I'm afraid I won't be able to read up on this very soon.

No worries, I think I have the least general knowledge here.

wlandau commented 3 years ago

Here's another idea: for the moment, given the unexpected difficulty of tunneling into AWS Batch jobs, why not drop one level lower and work with EC2 instances directly? From https://gist.github.com/DavisVaughan/865d95cf0101c24df27b37f4047dd2e5, EC2 seems easier for us than Batch, and tackling the former first may help us work up to the latter later.

wlandau commented 3 years ago

There's a lot we can do to improve on https://gist.github.com/DavisVaughan/865d95cf0101c24df27b37f4047dd2e5, such as

  1. Abstracting away all the config. I don't want to have to micromanage individual IP addresses or package installation. Ideally, the user should just need to specify an instance type, pem key file, and AMI name in future::plan() or options().
  2. Setting up instances at the last minute and tearing them down ASAP. For targets/drake workflows, I don't want to have to call makeClusterPSOCK() and stopCluster(). It's a lot of micromanagement, and it's not parsimonious. If we set up and tear down intelligently, targets can avoid submitting jobs at all if all the targets are already up to date. targets and drake already do this with traditional HPC to avoid submitting superfluous jobs.
wlandau commented 3 years ago

For the kinds of workflows we deal with, the only value added I see of Batch relative to EC2 is cost optimized resource provisioning, e.g. waiting for spot instances to get cheap before submitting jobs. Not sure we can do that with EC2 alone. (That and the ability to automatically connect to S3, which targets will need, but we can work around that by shipping the right AWS environment variables along with R jobs.)

wlandau commented 3 years ago

It is worth noting that in the AWS Batch console, I cannot select the t2.micro or t3.micro instance types. I want to stick to these instance types as much as possible in development and testing because all AWS free tier accounts come with 750 free hours per month with t2.micro and t3.micro for the first year.

davidkretch commented 3 years ago

Hi @wlandau:

With respect to testing on AWS, I highly recommend applying for the AWS Open Source promotional credits program, which is described more here. We got this for the Paws package.

I haven't put nearly as much thought into this, but I think if R is going to be in control of starting and stopping instances, then I feel like I agree that Batch doesn't get you much extra. With respect to Batch, I think some of its pros and cons are:

Pros

Cons

To use spot prices with EC2, you could get spot prices with the DescribeSpotPriceHistory API call, but I think supporting spot prices would likely be a lot of work since it would also have to handle things like restarting jobs when instances get stopped due to changing prices.

wlandau commented 3 years ago

Thanks for the advice. Your assessment is helpful, and I had no idea about the promotional credit.

wlandau commented 3 years ago

How does the clustermq main process refer to the workers on traditional HPC? How exactly does it get the IPs / hostnames or whatever else it needs?

mschubert commented 3 years ago

We don't know workers' node or IP address because the scheduler will assign them to nodes. Instead, each worker connects back to the main process (which is listening on the port corresponding to the ID field), either directly or via an SSH tunnel.

So each worker needs to know either the host+port of the main process or of the SSH tunnel.

wlandau commented 3 years ago

Awesome! Sounds like my whole quixotic pursuit of worker IPs is moot! This makes me more optimistic about the potential to work through the scheduling software of Batch itself. I bet paws can handle it, but if not, AWS CLI seems reasonable as a system dependency.

By the way, I asked about ZeroMQ compatibility with Batch and got a great answer here. I do not have the expertise to understand all the cloud-specific network barriers, but I take it as more evidence that a new Batch QSys class is possible.

wlandau commented 3 years ago

I think we will need to handle AWS Batch differently from the other schedulers. Instead of traditional template files, Batch uses JSON strings called "job definitions". In the AWS CLI, users can pass job definitions to aws batch submit-job with the --cli-input-json flag.

But for our purposes, i think we should require the user to create a job definition in advance through the AWS web console or other means, then pass the job definition name to paws::batch()$submit_job() (example here). The command of the job should always be something like CMQ_AUTH={{ auth }} R --no-save --no-restore -e 'clustermq:::worker("{{ master }}")'. For customization, we could translate the template argument of Q() etc. into job definition parameters.

@mschubert, what do you think? If this sounds reasonable, what help would be most useful?

wlandau commented 3 years ago

Array jobs in AWS Batch are straightforward with paws. The following call submits an array of 3 jobs where each job prints its array index to the CloudWatch log.

paws::batch()$submit_job(
  jobDefinition = "job-definition",
  jobName = "example-job-array",
  jobQueue = "job-queue",
  arrayProperties = list(size = 3)
)

Prework:

  1. First, I configured AWS locally: I set AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, and AWS_DEFAULT_REGION in ~/.Renviron, and I made sure AWS_DEFAULT_REGION agreed with the region selected in the AWS web console.
  2. Then, I uploaded a Dockerfile to https://github.com/wlandau/cmq-docker and linked the repo to Docker Hub to build the image.
  3. Next, I created the simplest possible computing environment and job queue using the defaults in the AWS Batch web console.
  4. Lastly, I created a job definition using the image wlandau/cmq-docker and the command Rscript -e 'print(Sys.getenv("AWS_BATCH_JOB_ARRAY_INDEX")). For clustermq, I think we could simply replace the command with CMQ_AUTH={{ auth }} R --no-save --no-restore -e 'clustermq:::worker("{{ master }}")'. Here is what the job definition looks like when printed out using AWS CLI.
$ aws batch describe-job-definitions
{
    "jobDefinitions": [
        {
            "jobDefinitionName": "job-definition",
            "jobDefinitionArn": "arn:aws:batch:us-east-1:912265024257:job-definition/job-definition:3",
            "revision": 3,
            "status": "ACTIVE",
            "type": "container",
            "parameters": {},
            "containerProperties": {
                "image": "wlandau/cmq-docker",
                "vcpus": 2,
                "memory": 2048,
                "command": [
                    "Rscript",
                    "-e",
                    "print(Sys.getenv('AWS_BATCH_JOB_ARRAY_INDEX'))"
                ],
                "volumes": [],
                "environment": [],
                "mountPoints": [],
                "ulimits": [],
                "resourceRequirements": [],
                "linuxParameters": {
                    "devices": []
                }
            }
        }
    ]
}

So I think that sketches out how to submit array jobs to AWS Batch from R. If that looks good, is there anything else I can do to help get an implementation going?

mschubert commented 3 years ago

This looks great, thank you so much! :+1:

I still see a few issues before this can be implemented:

  1. blocking We still need a way to route TCP/IP between local clustermq and AWS. Reverse tunneling only works if the local process establishes the SSH connection, for tunneling from remote to local the local would need to accept incoming connections (note that we don't need this for HPC because the submitting and worker nodes are on the same network)
  2. nice-to-have I don't see an easy way to have a concept of common_data that is transferred to AWS only once and then distributed between workers
wlandau commented 3 years ago
  1. I have tried to look into connection issues in general, but this is new territory for me, and my results are mixed. The "Create a Key Pair" section of the Batch user guide claims that all you need is the usual PEM key that everyone uses for EC2 instances. Users should set that up in advance while creating the compute environment. I asked around for more details and got mixed results here (not very helpful) and here (seemed helpful, but the low-level details went over my head). @davidkretch suggested a way to get the IP address of an instance, but from your subsequent comments, it sounds like this may not be necessary.
  2. drake and targets rely on common data, so I have a strong personal interest in making that happen. What would it take? What is different between common data and ordinary ZeroMQ messages? I had assumed that once the worker is running and can ping-pong with the local process, we would be home free.
wlandau commented 3 years ago

Other details: we want to override the command in the job definition so the user does not have to manually write clustermq:::worker(), and we want to pass CMQ_AUTH. The following submit_job() call demonstrates how to do both. Works for me when I tested it.

paws::batch()$submit_job(
  jobDefinition = "job-definition",
  jobName = "job-array",
  jobQueue = "job-queue",
  arrayProperties = list(size = 2),
  containerOverrides = list(
    command = list(
      "R",
      "--no-save",
      "--no-restore",
      "-e",
      "print(Sys.getenv('CMQ_AUTH'))"
    ),
    environment = list(
      list(
        name = "CMQ_AUTH",
        value = "auth_value"
      )
    )
  )
)
wlandau commented 3 years ago

I don't see an easy way to have a concept of common_data that is transferred to AWS only once and then distributed between workers

Ah, I misread this. We’re just talking about making the broadcast more efficient. How does this work in traditional HPC?

mschubert commented 3 years ago

HPC is on a local network (usually >= 10 Gbps), so it doesn't matter. End user internet to AWS much slower. SSH+HPC will cache on the HPC head node.

Looks like we can use SSH for EC2: https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/AccessingInstancesLinux.html (not sure about lambda/batch)

In this case, we could start the container, and then use the SSH connector

edit: implementation could be through bastion hosts, with SSH+AWS working akin to SSH+HPC. Not sure if/how much this makes it more expensive

HenrikBengtsson commented 3 years ago

Sorry for radio silence and all. Struggling with time. I think it was an awesome idea to have a call on this (proposed in some thread i think)

Drive by comment regarding: "note that we don't need this for HPC because the submitting and worker nodes are on the same network". Some HPC environment are pretty locked down where compute nodes don't have internet access or SSH clients or daemons installed. That is useful to keep in mind when designing things, i.e. give people on such environments a fallback solution to work with

wlandau commented 3 years ago

Awesome! I will propose an hour on Google Meet. I attempted to capture some of our ideas in https://github.com/wlandau/r-cloud-ideas at a high level.

mschubert commented 3 years ago

Drive by comment regarding

I agree in principle, but we never require internet access of workers or SSH access between HPC compute nodes. If, however, a user wants the convenience of submitting remotely via SSH, then of course this is a requirement.

The comment above was for caching data on the remote end, which only makes sense if the connection of local>remote is much slower than remote>workers.

wlandau commented 3 years ago

This answer claims reverse tunneling should be possible if the compute environment is unmanaged. The poster recommends using the metadata endpoint to find out which EC2 instance a worker is running on. @davidkretch, is that the same as https://github.com/paws-r/paws/issues/330#issuecomment-711440381?

davidkretch commented 3 years ago

@wlandau The metadata endpoint would be much, much easier than the example code I created. The metadata endpoint is a local web API on the instance/container that you can query to get info about the machine you're running on. Paws doesn't have any publicly exposed functions for accessing them at the moment but it's pretty easy (example here).

Is it true that if the worker is reverse SSH tunneling to your R session, it would need to know your IP, and it's less critical that you know the worker nodes' IPs?

mschubert commented 3 years ago

Is it true that

Not quite: The SSH connection is established from the local session to AWS (or remote HPC), which attaches a reverse tunnel to the same connection (so we need to be able to access the remote session). The result is that the remote session can access this tunnel to connect to the local process.

davidkretch commented 3 years ago

Ah, I understand. In that case (local to worker), the metadata endpoint would be unhelpful here, because it is only accessible from the worker, but we'll need to know that information in our local session.

HenrikBengtsson commented 3 years ago

To clarify further, the remote session thinks it's connected to a port on the localhost, it just happens to be tunneled back to a port on your local compute via that SSH reverse tunnel.

Here's an illustration on how parallelly SSH into a remote machine with a reverse tunnel so that localhost:12345 on the remote machine will talk to localhost:12345 on your local computer;

> cl <- parallelly::makeClusterPSOCK("remote.server.org", port = 12345L, revtunnel = TRUE, dryrun=TRUE)
----------------------------------------------------------------------
Manually, (i) login into external machine 'remote.server.org':

  '/usr/bin/ssh' -R 12345:localhost:12345 remote.server.org

and (ii) start worker #1 from there:

  'Rscript' --default-packages=datasets,utils,grDevices,graphics,stats,methods -e 'workRSOCK <- tryCatch(parallel:::.slaveRSOCK, error=function(e) parallel:::.workRSOCK); workRSOCK()' MASTER=localhost PORT=12345 OUT=/dev/null TIMEOUT=2592000 XDR=FALSE

Alternatively, start worker #1 from the local machine by combining both step in a single call:

  '/usr/bin/ssh' -R 12345:localhost:12345 remote.server.org "'Rscript' --default-packages=datasets,utils,grDevices,graphics,stats,methods -e 'workRSOCK <- tryCatch(parallel:::.slaveRSOCK, error=function(e) parallel:::.workRSOCK); workRSOCK()' MASTER=localhost PORT=12345 OUT=/dev/null TIMEOUT=2592000 XDR=FALSE"
wlandau commented 3 years ago

Maybe I am getting too far ahead here, but it make sense to implement all this ZeroMQ + R + cloud functionality in an entirely new package of its own? (Say, cloudmq?) The more I learn about cloud services, the more I realize how different they are from traditional clusters, both in the interface (no template file), requirements (Docker container), and assumptions (no common local network). I worry that an AWS Batch QSys subclass in clustermq may need to fight an uphill battle against assumptions that worked well for clusters but not for the cloud.

wlandau commented 3 years ago

Then again, there is a lot more to clustermq than just the backends, so maybe the duplication would be unwise. @mschubert, clearly you know best.

mschubert commented 3 years ago

My current impression is:

wlandau commented 3 years ago

I'm not sure what we gain with Batch instead of more "traditional" instances

Automatic spot pricing and Docker image support seem compelling (https://github.com/mschubert/clustermq/issues/208#issuecomment-711440146). And if we pursue Lambda and Fargate in similar ways, we might see shorter worker startup times.

wlandau commented 3 years ago

Another thought: persistent clustermq workers on the cloud may already be straightforward with AWS-backed self-hosted runners in GitHub Actions. https://github.com/wlandau/targets-minimal#continuous-deployment is a proof of concept of targets + continuous deployment, and the principles apply to clustermq as well. In that example, it would be trivially easy to turn on clustermq multicore parallelism and AWS S3 storage to avoid storing big data on GtiHub. All we would need is a beefy multicore runner backed by AWS. Has anyone here used AWS-backed runners?

wlandau commented 3 years ago

I am thinking GHA runners could just make it easier to get R code on the cloud, provided the setup and teardown happens automatically.

davidkretch commented 3 years ago

It didn't look to me that GitHub Actions self hosted did auto setup/teardown. It looks like it uses an agent that runs on your infrastructure that polls GitHub and waits for work. If you do already have compute infrastructure, I think you could also hypothetically have clustermq talk to it directly rather than through GHA.

In terms of clustermq, which assumes an SSH connection, I think the only real options on AWS are

  1. EC2 or Elastic Container Service -- entirely custom, high complexity, but definitely possible.
  2. Batch -- may be possible, but unsure because it's somewhat different than the compute model assumed by Batch.

All other options I can think of would prohibit SSH connections and would need an S3 or other intermediary for delivering results.

The two open questions I have about Batch are:

I am currently slowly working on a non-clustermq approach using Lambda, but will look into the Batch approach next, hopefully later this month.

mschubert commented 3 years ago

Thanks Will & David!

I'm not sure if routing AWS access via GHA would simplify or complicate things. I'm definitely still interested in exploring an SSH-based approach (but also trying to wrap up a science project over here, so unfortunately not much spare time right now)