broadinstitute / cromwell

Scientific workflow engine designed for simplicity & scalability. Trivially transition between one off use cases to massive scale production environments
http://cromwell.readthedocs.io/
BSD 3-Clause "New" or "Revised" License
995 stars 360 forks source link

Too Many Requests with AWS Batch backend #4303

Open multimeric opened 6 years ago

multimeric commented 6 years ago

When running certain highly parallel WDL workflows, I'm getting the error cromwell.core.CromwellFatalException: software.amazon.awssdk.services.batch.model.BatchException: Too Many Requests (Service: null; Status Code: 429; Request ID: cffe6e45-d66c-11e8-a1df-05402551b0ba). The specific case where this happens is in the gatk3-data-processing workflow, when running the ApplyBQSR task, which is run in parallel over some calculated intervals.

The full error trace I get is:

2018-10-23 02:39:07,631 cromwell-system-akka.dispatchers.backend-dispatcher-53345 ERROR - AwsBatchAsyncBackendJobExecutionActor [UUID(6d97fef4)GPPW.ApplyBQSR:15:1]: Error attempting to Execute
software.amazon.awssdk.services.batch.model.BatchException: Too Many Requests (Service: null; Status Code: 429; Request ID: cfc6e34e-d66c-11e8-be0b-dd778498cf15)
        at software.amazon.awssdk.core.http.pipeline.stages.HandleResponseStage.handleErrorResponse(HandleResponseStage.java:114)
        at software.amazon.awssdk.core.http.pipeline.stages.HandleResponseStage.handleResponse(HandleResponseStage.java:72)
        at software.amazon.awssdk.core.http.pipeline.stages.HandleResponseStage.execute(HandleResponseStage.java:57)
        at software.amazon.awssdk.core.http.pipeline.stages.HandleResponseStage.execute(HandleResponseStage.java:40)
        at software.amazon.awssdk.core.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:239)
        at software.amazon.awssdk.core.http.pipeline.stages.TimerExceptionHandlingStage.execute(TimerExceptionHandlingStage.java:40)
        at software.amazon.awssdk.core.http.pipeline.stages.TimerExceptionHandlingStage.execute(TimerExceptionHandlingStage.java:30)
        at software.amazon.awssdk.core.http.pipeline.stages.RetryableStage$RetryExecutor.doExecute(RetryableStage.java:139)
        at software.amazon.awssdk.core.http.pipeline.stages.RetryableStage$RetryExecutor.execute(RetryableStage.java:105)
        at software.amazon.awssdk.core.http.pipeline.stages.RetryableStage.execute(RetryableStage.java:66)
        at software.amazon.awssdk.core.http.pipeline.stages.RetryableStage.execute(RetryableStage.java:47)
        at software.amazon.awssdk.core.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:239)
        at software.amazon.awssdk.core.http.StreamManagingStage.execute(StreamManagingStage.java:56)
        at software.amazon.awssdk.core.http.StreamManagingStage.execute(StreamManagingStage.java:42)
        at software.amazon.awssdk.core.http.pipeline.stages.ClientExecutionTimedStage.executeWithTimer(ClientExecutionTimedStage.java:71)
        at software.amazon.awssdk.core.http.pipeline.stages.ClientExecutionTimedStage.execute(ClientExecutionTimedStage.java:55)
        at software.amazon.awssdk.core.http.pipeline.stages.ClientExecutionTimedStage.execute(ClientExecutionTimedStage.java:39)
        at software.amazon.awssdk.core.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:239)
        at software.amazon.awssdk.core.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:239)
        at software.amazon.awssdk.core.http.pipeline.stages.ExecutionFailureExceptionReportingStage.execute(ExecutionFailureExceptionReportingStage.java:35)
        at software.amazon.awssdk.core.http.pipeline.stages.ExecutionFailureExceptionReportingStage.execute(ExecutionFailureExceptionReportingStage.java:24)
        at software.amazon.awssdk.core.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:281)
        at software.amazon.awssdk.core.client.SyncClientHandlerImpl.doInvoke(SyncClientHandlerImpl.java:149)
        at software.amazon.awssdk.core.client.SyncClientHandlerImpl.invoke(SyncClientHandlerImpl.java:131)
        at software.amazon.awssdk.core.client.SyncClientHandlerImpl.execute(SyncClientHandlerImpl.java:100)
        at software.amazon.awssdk.core.client.SyncClientHandlerImpl.execute(SyncClientHandlerImpl.java:76)
        at software.amazon.awssdk.core.client.SdkClientHandler.execute(SdkClientHandler.java:45)
        at software.amazon.awssdk.services.batch.DefaultBatchClient.registerJobDefinition(DefaultBatchClient.java:644)
        at cromwell.backend.impl.aws.AwsBatchJob.$anonfun$createDefinition$2(AwsBatchJob.scala:198)
        at cats.effect.internals.IORunLoop$.cats$effect$internals$IORunLoop$$loop(IORunLoop.scala:85)
        at cats.effect.internals.IORunLoop$.startCancelable(IORunLoop.scala:41)
        at cats.effect.internals.IOBracket$BracketStart.run(IOBracket.scala:74)
        at cats.effect.internals.Trampoline.cats$effect$internals$Trampoline$$immediateLoop(Trampoline.scala:70)
        at cats.effect.internals.Trampoline.startLoop(Trampoline.scala:36)
        at cats.effect.internals.TrampolineEC$JVMTrampoline.super$startLoop(TrampolineEC.scala:93)
        at cats.effect.internals.TrampolineEC$JVMTrampoline.$anonfun$startLoop$1(TrampolineEC.scala:93)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
        at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:81)
        at cats.effect.internals.TrampolineEC$JVMTrampoline.startLoop(TrampolineEC.scala:93)
        at cats.effect.internals.Trampoline.execute(Trampoline.scala:43) 
        at cats.effect.internals.TrampolineEC.execute(TrampolineEC.scala:44)
        at cats.effect.internals.IOBracket$BracketStart.apply(IOBracket.scala:60)
        at cats.effect.internals.IOBracket$BracketStart.apply(IOBracket.scala:41)
        at cats.effect.internals.IORunLoop$.cats$effect$internals$IORunLoop$$loop(IORunLoop.scala:134)
        at cats.effect.internals.IORunLoop$.start(IORunLoop.scala:34)
        at cats.effect.internals.IOBracket$.$anonfun$apply$1(IOBracket.scala:36)
        at cats.effect.internals.IOBracket$.$anonfun$apply$1$adapted(IOBracket.scala:33)
        at cats.effect.internals.IORunLoop$RestartCallback.start(IORunLoop.scala:328)
        at cats.effect.internals.IORunLoop$.cats$effect$internals$IORunLoop$$loop(IORunLoop.scala:117)
        at cats.effect.internals.IORunLoop$.start(IORunLoop.scala:34)
        at cats.effect.IO.unsafeRunAsync(IO.scala:258)
        at cats.effect.IO.unsafeToFuture(IO.scala:345)
        at cromwell.backend.impl.aws.AwsBatchAsyncBackendJobExecutionActor.executeAsync(AwsBatchAsyncBackendJobExecutionActor.scala:342)
        at cromwell.backend.standard.StandardAsyncExecutionActor.executeOrRecover(StandardAsyncExecutionActor.scala:943)
        at cromwell.backend.standard.StandardAsyncExecutionActor.executeOrRecover$(StandardAsyncExecutionActor.scala:935)
        at cromwell.backend.impl.aws.AwsBatchAsyncBackendJobExecutionActor.executeOrRecover(AwsBatchAsyncBackendJobExecutionActor.scala:74)
        at cromwell.backend.async.AsyncBackendJobExecutionActor.$anonfun$robustExecuteOrRecover$1(AsyncBackendJobExecutionActor.scala:65)
        at cromwell.core.retry.Retry$.withRetry(Retry.scala:38)
        at cromwell.backend.async.AsyncBackendJobExecutionActor.withRetry(AsyncBackendJobExecutionActor.scala:61)
        at cromwell.backend.async.AsyncBackendJobExecutionActor.cromwell$backend$async$AsyncBackendJobExecutionActor$$robustExecuteOrRecover(AsyncBackendJobExecutionActor.scala:65)
        at cromwell.backend.async.AsyncBackendJobExecutionActor$$anonfun$receive$1.applyOrElse(AsyncBackendJobExecutionActor.scala:88)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
        at akka.actor.Actor.aroundReceive(Actor.scala:517)
        at akka.actor.Actor.aroundReceive$(Actor.scala:515)
        at cromwell.backend.impl.aws.AwsBatchAsyncBackendJobExecutionActor.aroundReceive(AwsBatchAsyncBackendJobExecutionActor.scala:74)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:588)
        at akka.actor.ActorCell.invoke(ActorCell.scala:557)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
        at akka.dispatch.Mailbox.run(Mailbox.scala:225)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
        at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

I understand why this is happening - Cromwell is simply sending too many requests to AWS Batch over a short space of time. However the limit is apparently 500 per second (https://forums.aws.amazon.com/thread.jspa?messageID=708581), so perhaps Cromwell is doing something unusual.

In the short term, I think the best solution is to catch this exception, sleep for a while, and then continue sending requests.

wleepang commented 6 years ago

When Cromwell submits a job to AWS Batch it actually makes about 3-4 API requests, so hitting the API request limit can happen quickly.

The following specification in the application conf is a workaround:

concurrent-job-limit = 16

Is this not working for you?

multimeric commented 6 years ago

Good call. I'll try that setting and see what happens

multimeric commented 6 years ago

Actually it seems that the config I was using already had that line in it, set to 16. I've reduced the concurrent-job-limit to 8, and I'll see how it goes

seandavi commented 6 years ago

I closed #4355 as a duplicate, not because it was resolved.

multimeric commented 6 years ago

So, setting concurrent-job-limit to 8 did resolve my issue (and I hit another, unrelated issue instead).

However, I don't believe this is the ideal way to resolve this. I (and I assume most other users) want maximum concurrency with my jobs, we just want to avoid this error. If we caught this Too Many Requests error, and just waited for a few seconds before retrying these requests, it would surely resolve this issue in a cleaner way.

seandavi commented 6 years ago

I agree, @TMiguelT.

mcroken commented 6 years ago

This sounds like a problem with the backend dispatcher. Would it make sense to add a sleep statement in the config?

seandavi commented 6 years ago

Naive question, but where would one add such a sleep statement? Is there an example that we could refer to?

multimeric commented 6 years ago

It would have to be added to Cromwell itself. There's nothing in the config or in the WDL language that could solve this

seandavi commented 6 years ago

@TMiguelT, that was what I thought--just double-checking.

multimeric commented 6 years ago

I mean if WDL had some kind of sleep function, you could put it into a scatter block so that each scatter started at a different time, but really this kind of backend-specific stuff should be handled by Cromwell, not WDL. If you found at way to make it work, your WDL would be heavily tied to the AWS backend and wouldn't work as well on other backends

mcroken commented 6 years ago

The thread.sleep command would need to be added to whichever actor(s) is actually submitting messages to the API. This doesn't strike me as too onerous for the developers, but you're right, it's definitely part of the scala and not the config files. Some minimal exception catching is also called for.

Rather than throttling concurrent jobs it probably makes more sense to limit the number and frequency of concurrent workflow submissions:

system {

  # Cromwell will cap the number of running workflows at N
  max-concurrent-workflows = 5000 # No practical limit on the number of total workflows

  # Cromwell will launch up to N submitted workflows at a time, regardless of how many open workflow slots exist
  max-workflow-launch-count = 4 # Too conservative?

  # Number of seconds between workflow launches
  new-workflow-poll-rate = 5 # Too conservative?
}

This should stagger submissions without limiting the total amount of work being done.

The number of threads available to the backend-dispatcher also appears to settable. You could create an artificial bottleneck there to protect AWS's API.`

geoffjentry commented 6 years ago

Hi all - surfacing for a moment to agree that this is absolutely in the purview of the backend in terms of the correct solution. That's not to say that there aren't hacks one can do in the interim to get things working until we loop back around to this.

delocalizer commented 6 years ago

If the AWS backend uses ioActor this may already be covered in configuration?

system {
  io {
    # Global Throttling - This is mostly useful for GCS and can be adjusted to match
    # the quota availble on the GCS API
    #number-of-requests = 100000
    #per = 100 seconds

    # Number of times an I/O operation should be attempted before giving up and failing it.
    #number-of-attempts = 5
  }
}
multimeric commented 6 years ago

Ah, good call! I found the documentation for it: https://cromwell.readthedocs.io/en/develop/Configuring/#io.

I'll have to test if it applies to AWS. Hopefully it does

seandavi commented 6 years ago

@delocalizer, I think the equivalent of GCS (google cloud storage) on AWS is s3. That is a different API than the one in question here--AWS Batch. Therefore, I don't think the IO settings are going to be helpful in dealing with the issue of Batch API limits.

@geoffjentry, thanks for "surfacing" your comments.

delocalizer commented 6 years ago

@seandavi I know that GCS != S3, but when I had a brief look at the source where that configuration io block is being used, number-of-requests is used to set a throttle on a fairly low-level Actor that at least at might be used by the AWS batch backend... I haven't looked at the implementation in detail. I'll do that tomorrow.

seandavi commented 6 years ago

Ahhh, I see! Sorry to be pessimistic (without reading the source first)! I'm certainly interested to see what you find.

delocalizer commented 6 years ago

@seandavi realistic, it turns out :) looks like API calls to Batch don't go via that.

A pessimist is a man who has been compelled to live with an optimist

Horneth commented 6 years ago

Yes the I/O actor really is a "Filesystem I/O Actor" and does not handle backend API calls. The PAPI backend has a custom actor for that (PipelinesApiManager I think) but it's not (yet) available to all backends unfortunately

wleepang commented 5 years ago

Just curious if putting a sleep here or here are viable solutions until something more generic in the core of Cromwell can be implemented.

alartin commented 5 years ago

Just curious if putting a sleep here or here are viable solutions until something more generic in the core of Cromwell can be implemented.

Any update and interim solution available? I am curious if there is an alternative like python aiohttp concurrent requests number limit rather than sleep.

alartin commented 5 years ago

@wleepang I am little bit confused by concurrent-job-limit. Does this option means limit of the concurrent AWS Batch underlying computing jobs or limit of concurrent API calls to AWS Batch? This is totally different. Say, if I have 1000 samples and one job per sample, I would expect 1000 concurrent AWS Batch job (Array job with size of 1000), if it can not handle even 16 samples concurrently, it will make no sense for batch mode. If it means latter, although the limit is 16 API calls per second (let's assume concurrent jobs unit is per second), you can still submit an array job per API call which supports concurrent 1000 samples computing jobs under the hood since the concurrent number of API calls is not equal (actually not related to) the concurrent computing jobs launched by AWS Batch with which you can submit/query all jobs/jobs status in an array job with one api call. One more question: my understanding is that AWS Batch backend will convert scatter jobs into an array job of AWS Batch, is that right?

TimurIs commented 5 years ago

Simple workaround - add the following block into the configuration file:

system {
  job-rate-control {
    jobs = 1
    per = 1 second
  }
}

Drastically improved the situation for me

multimeric commented 5 years ago

That looks awfully promising, will try it out!

wleepang commented 5 years ago

I tested @TimurIs workaround yesterday. It has the desired effect of throttling API requests. There were no API request errors reported in the logs. I would recommend this as the solution for this issue.

wleepang commented 5 years ago

@alartin - the concurrent-job-limit limits how many jobs will be in a "runnable / running" state at a time. It also has the side effect of limiting how many jobs are submitted when the workflow starts.

Scatter jobs do not currently map to AWS Batch Array jobs. It would definitely be a good thing to implement and it would also be an effective way to avoid API request limits.

seandavi commented 5 years ago

What kind of concurrency (what concurrent-job-limit did you use) have you, @wleepang, and @TimurIs, been able to achieve using the job-rate-control stuff? I'm just curious.

wleepang commented 5 years ago

@seandavi - implementing the config suggested by @TimurIs and removing the specification of concurrent-job-limit I was able to run the following workflow with out issue.

task t {
  Int id
  command { echo "scatter index = ${id}" }
  runtime {
    docker: "ubuntu:latest"
    cpu: 1
    memory: "512MB"
  }
  output { String out = read_string(stdout()) }
}

workflow w {
  Array[Int] arr = range(1000)
  scatter(i in arr) { call t { input: id = i } }
  output { Array[String] t_out = t.out }
}

Approximate numbers:

wleepang commented 5 years ago

@TimurIs - out of curiosity, where did you find that configuration option? I don't see it documented in the example conf.

TimurIs commented 5 years ago

@TimurIs - out of curiosity, where did you find that configuration option? I don't see it documented in the example conf.

Examined the source code. Wanted to add a manual delay in the code, but, just by luck, found the reference to this config option

seandavi commented 5 years ago

Thanks, @wleepang, for the numbers and simple test code. @TimurIs, nice catch and thanks for sharing!

wleepang commented 5 years ago

Quick update. I tweaked the config to be:

system {
  job-rate-control {
    jobs = 1
    per = 2 second
  }
}

and ran the test workflow above. I saw maximum concurrency - i.e. Batch requested the full number of vCPUs set in my compute environment (100). About 500 jobs succeeded before Cromwell threw an OOM exception. No Batch API Request Limit exceptions were encountered.

Horneth commented 5 years ago

That's a cool use of this configuration to work around this issue !

I just want to give some context around it. This rate control was originally put in place to protect Cromwell against excessive load or a very large spike of jobs becoming runnable in a short period of time. Through this mechanism Cromwell can also stop starting new jobs altogether when under too heavy load. While this achieve the desired effect of rate limiting how many submit requests are being sent to AWS batch in a period of time, I think a medium-term better fix is too implement something similar to the PipelinesApiRequestManager for the AWS backend. The reason is that it acts as a coarse level of granularity which might have undesired side-effects:

1) it is a system wide configuration, meaning in a multi backend Cromwell it might be too constraining for some backends and not enough for others 2) It also rate limits starting jobs that might actually be call cached and incur 0 requests to AWS Batch, making it too conservative 3) It only helps rate limiting the number of job creation requests to batch. Once a job is started, it issues status requests to monitor the job which aren't throttled

Not to say that this is a bad workaround, I think it's a good workaround, but still a workaround :)

alartin commented 5 years ago

I think in general the number of concurrent jobs is determined by both of client side (cromwell) and server side(aws batch). In cormwell, there should be a rate limit of api call (no matter it is job submission or job status query) to avoid DDoS to the server side. On the server side like aws batch, there is also a config for rate limit of concurrent api call, if the number of concurrent api call exceeds the rate limit of server side, the server side may refuse to server so it is important not to set rate limit on the client side/cromwell over the server side rate limit. While on server side, if the concurrent jobs require more resources than the limit such as cpus and mem (compute env in aws batch) , it is the server side responsibility to put the concurrent jobs to queue and make sure they can be launched later when resource is available rather than throwing errors unless the queue is expired (say, resource is still not available one week later). IMHO, aws batch backend should implement the scatter jobs in array jobs which support multiple jobs submission and status query in one single api call, otherwise, it is too easy to exceed the rate limit of aws batch.

jobs submission by user --> cromwell (rate limit config) --> aws batch gateway (rate limit config) --> aws batch compute env (resource limit)

geoffjentry commented 5 years ago

@alartin You're spot on in regards to array jobs. Unfortunately the internal design of Cromwell makes it difficult to do this, it's come up before for HPC backends as well. Something we should address some day but it'd be a fairly major undertaking

alartin commented 5 years ago

@alartin You're spot on in regards to array jobs. Unfortunately the internal design of Cromwell makes it difficult to do this, it's come up before for HPC backends as well. Something we should address some day but it'd be a fairly major undertaking

Hi @geoffjentry I wonder if there is any guide for developer, especially for the backend impl. Or is there some doc/slide available for that? I am willing to implement backend for other public cloud vendor and curious about how to get it started quickly.

geoffjentry commented 5 years ago

Hi @alartin - unfortunately not at the moment. There used to be one but it is so horribly out of date that it's not even worth looking at. That said, we can certainly help you in the endeavor and there are a couple of examples to look at (both the AWS and Alicloud backends were created by 3rd parties). If this is something you're interested in doing, can you email me directly (user jgentry at broadinstitute.com) and we can discuss further.

seandavi commented 5 years ago

In case there is interest, the upcoming Bio Hackathon at NIH might represent an opportunity to jumpstart the process. NO PRESSURE, just mentioning it. Deliverables do not necessarily need to be limited to software, but tutorials, documentation, applications, etc., are all possibilities. https://www.surveymonkey.com/r/Feb2019_NCBI_hackathon

cjllanwarne commented 5 years ago

See https://github.com/broadinstitute/cromwell/pull/4817 for a fix that hopefully soothes this problem by catching the 429s on status polls and hooking them into some existing status poll retry logic.

It's not a panacea so I don't think we can close this issue yet, but alongside the job start throttling mentioned in previous comments, it seems to make the problem a lot less noticeable - at least in our CI environment.

dazza-codes commented 5 years ago

See also https://github.com/boto/botocore/pull/1307