GoogleCloudDataproc / hadoop-connectors

Libraries and tools for interoperability between Hadoop-related open-source software and Google Cloud Platform.
Apache License 2.0
280 stars 238 forks source link

Rate limit of GCS connector #10

Closed oren-yowza closed 9 years ago

oren-yowza commented 9 years ago

Hi, I'm using Spark on a Google Compute Engine cluster with the Google Cloud Storage connector (instead of HDFS, as recommended), and get a lot of "rate limit" errors, as follows:

java.io.IOException: Error inserting: bucket: *****, object: *****
  at com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.wrapException(GoogleCloudStorageImpl.java:1600)
  at com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl$3.run(GoogleCloudStorageImpl.java:475)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
  at java.lang.Thread.run(Thread.java:745)
Caused by: com.google.api.client.googleapis.json.GoogleJsonResponseException: 429 Too Many Requests
{
  "code" : 429,
  "errors" : [ {
    "domain" : "usageLimits",
    "message" : "The total number of changes to the object ***** exceeds the rate limit. Please reduce the rate of create, update, and delete requests.",
    "reason" : "rateLimitExceeded"
  } ],
  "message" : "The total number of changes to the object ***** exceeds the rate limit. Please reduce the rate of create, update, and delete requests."
}
  at com.google.api.client.googleapis.json.GoogleJsonResponseException.from(GoogleJsonResponseException.java:145)
  at com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:113)
  at com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:40)
  at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:432)
  at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352)
  at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469)
  at com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl$3.run(GoogleCloudStorageImpl.java:472)
  ... 3 more

Is there a way to control the read/write rate of the GCS connector?

dennishuo commented 9 years ago

This should be fixed now with https://github.com/GoogleCloudPlatform/bigdata-interop/commit/141b1efab9ef23b6b5f5910d8206fcbc228d2ed7

To test, just run:

git clone https://github.com/GoogleCloudPlatform/bigdata-interop.git
cd bigdata-interop
mvn -P hadoop1 package
# Or or Hadoop 2
mvn -P hadoop2 package

And you should find the files "gcs/target/gcs-connector-_-shaded.jar" available for use. To plug it into bdutil, simply gsutil cp gcs/target/gcs-connector-_shaded.jar gs://<your-bucket>/some-path/and then editbdutil/bdutil_env.shfor Hadoop 1 orbdutil/hadoop2_env.sh to change:

GCS_CONNECTOR_JAR='https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-1.4.1-hadoop2.jar'

To instead point at your gs://<your-bucket>/some-path/ path; bdutil automatically detects that you're using a gs:// prefixed URI and will do the right thing during deployment.

Please let us know if it fixes the issue for you!

oren-yowza commented 9 years ago

It looks good now, I don't get those errors anymore, thanks!

Though, i'm curious about your solution - I expected to see some exponential backoff or rate limiting of the API requests. Instead I saw that you checked whether the write operation did actually success even when got error. Is this always the case? The GCS always writes the object when answering rateLimit errors? And no need for exponential backoff retries?

dennishuo commented 9 years ago

Right, we considered whether exponential backoff would be appropriate; we do already plug in a lower-level retry initializer which makes exponential backoff on 5xx errors. One consideration is that initial backoffs for 5xx errors are quite short, since under normal circumstances those indicate simply re-sending and getting routed to a fresh frontend should fix the problem, whereas bucketing for these particular types of 429 errors can be on the order of seconds, so that there'd be several retries which fail out with a 429 again, ultimately really slowing down the startup of Spark jobs.

In this case, it's not quite the original GCS write which succeeded writing (in fact, on a rateLimit error, we should expect the request which errored out to not have been written), but actually other concurrent writers from possibly other machines which caused them to be written.

In this particular case of "empty objects" being related to directory placeholders, that means we can optimize out the need to retry over the course of several seconds since the rate limit likely means another writer already did our job for us (also why we have to check the metadata, in cases where createEmptyObjects is used for more advanced metadata).