awslabs / emr-dynamodb-connector

Implementations of open source Apache Hadoop/Hive interfaces which allow for ingesting data from Amazon DynamoDB
Apache License 2.0
217 stars 134 forks source link

dynamodb.throughput.write.percent not being respected #51

Open tdoshea90 opened 7 years ago

tdoshea90 commented 7 years ago

Hello I am using the connector package with the below configuration but the write capacity and percent is not being respected for some reason. https://imgur.com/a/Ht2yw (attachments aren't working for some reason on github today). For 5 minutes right when the job starts the consumed capacity is always 2x the write capacity, then it levels out to the provisioned write capacity.

I am running this job in us-west-2 but writing to us-east-1

Random specs: Release label:emr-5.9.0 Hadoop distribution:Amazon 2.7.3 Applications:Spark 2.2.0, Hive 2.3.0, Hue 4.0.1, Zeppelin 0.7.2, Ganglia 3.7.2

      private JobConf setDynamoWriteJobConf(JavaSparkContext sc) {
          JobConf jobConf = new JobConf(sc.hadoopConfiguration());

          jobConf.set("dynamodb.servicename", "dynamodb");
          jobConf.set("dynamodb.output.tableName", "test");
          jobConf.set("dynamodb.endpoint", "dynamodb.us-east-1.amazonaws.com");
          jobConf.set("dynamodb.regionid", "us-east-1");
          jobConf.set("dynamodb.throughput.write", "500");
          jobConf.set("dynamodb.throughput.write.percent", ".5");
          jobConf.set("dynamodb.version", "2012-08-10");

          jobConf.set("mapred.output.format.class", "org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat");
          jobConf.set("mapred.input.format.class", "org.apache.hadoop.dynamodb.read.DynamoDBInputFormat");

          return jobConf;
      }

write command:

          JavaPairRDD<Text, DynamoDBItemWritable> writeToDynamo = dataset
                  .javaRDD()
                  .map(createDdbWritable())
                  .mapToPair(item -> new Tuple2<>(new Text(""), item));

          writeToDynamo.saveAsHadoopDataset(setDynamoWriteJobConf(sc));

Any explanation on why this could be occurring?

taklwu commented 6 years ago

hi there, we need more information from you as well, did you see this multiple time since Oct 24 ? and also you mentioned writing from us-west-2 to us-east-1, do you see it only when cross-region write or within region write had the same issue?

taklwu commented 6 years ago

Another look into to writer logic, confirmed that dynamodb.throughput.write is not used for controlling how much data can be or must be write to dynamodb (but this information is obtained from the DynamoDb table configuration) , and dynamodb.throughput.write.percent is used to calculate the maximum permissibleWritesPerSecond instead of fixed throughput per second which in your graph it's at some point 0. I'm wondered if it throws any exception in logs, if so, can you provide us here.

More about the logic of permissibleWritesPerSecond You can see https://github.com/awslabs/emr-dynamodb-connector/blob/fd4d241a858c3efb04fbf8bc383332265b7d012a/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/write/AbstractDynamoDBRecordWriter.java#L96

  @Override
  public void write(K key, V value) throws IOException {
    if (value == null) {
      throw new RuntimeException("Null record encountered. At least the key columns must be "
          + "specified.");
    }

    verifyInterval();
    if (progressable != null) {
      progressable.progress();
    }

    DynamoDBItemWritable item = convertValueToDynamoDBItem(key, value);
    BatchWriteItemResult result = client.putBatch(tableName, item.getItem(),
        permissibleWritesPerSecond - writesPerSecond, reporter);

    batchSize++;
    totalItemsWritten++;

    if (result != null) {
      if (result.getConsumedCapacity() != null) {
        for (ConsumedCapacity consumedCapacity : result.getConsumedCapacity()) {
          double consumedUnits = consumedCapacity.getCapacityUnits();
          totalIOPSConsumed += consumedUnits;
        }
      }

      int unprocessedItems = 0;
      for (List<WriteRequest> requests : result.getUnprocessedItems().values()) {
        unprocessedItems += requests.size();
      }
      writesPerSecond += batchSize - unprocessedItems;
      batchSize = unprocessedItems;
    }
  }

  private void verifyInterval() {
    if (writesPerSecond >= permissibleWritesPerSecond) {
      if (writesPerSecond > 0) {
        iopsController.update(writesPerSecond, totalIOPSConsumed);
      }
      permissibleWritesPerSecond = iopsController.getTargetItemsPerSecond();

      if (totalItemsWritten > nextPrintCount) {
        log.info("Total items written: " + totalItemsWritten);
        log.info("New writes per second: " + permissibleWritesPerSecond);
        nextPrintCount += PRINT_COUNT_INCREMENT;
      }

      DynamoDBUtil.verifyInterval(intervalBeginTime, 1000L);
      intervalBeginTime = new DateTime(DateTimeZone.UTC).getMillis();
      totalIOPSConsumed = 0;
      writesPerSecond = 0;
    }
  }

https://github.com/awslabs/emr-dynamodb-connector/blob/d1a244018afe7d7e6751c901097bca72d7eb5a01/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/IopsController.java#L69

  public void update(long itemsPerSecond, double iopsConsumed) {
    DateTime currentTime = getCurrentTime();
    if (currentTime.isAfter(lastUpdateTime.plus(iopsUpdateDuration))) {
      lastUpdateTime = currentTime;
      targetIops = iopsCalculator.calculateTargetIops();
    }

    if (itemsPerSecond != 0 && iopsConsumed != 0) {
      double itemsPerIO = itemsPerSecond / iopsConsumed;
      targetItemsPerSecond = calculateTargetItemsPerSecond(operationType, targetIops, itemsPerIO);
    }
  }
snowlover173 commented 5 years ago

Just to add more info to this ticket as people might search for the same question, based on my experience with EMR-DynamoDB-connector:

jobConf.set("dynamodb.throughput.write", value) doesn't have any impact on the calculations. The only thing that you need to setup is percent, like this: jobConf.set("dynamodb.throughput.write.percent", "0.5");

However, it doesn't mean it is going to consume 50% of WCU. If you checkout logs of your cluster, the calculations, number of tasks, and the value of throughput for each task, you can see that as it is a spark job, it depends on number of instances, and type of instances. They define how much memory is available for the yarn. It also depends on mapper and reducer number and memory defined in mapred-site.xml.

Therefor, by just defining that number you won't get exactly what you want. If you need to know what is exactly happening , you need to checkout logs of the related container in your cluster. I think the spark version of EMR-DynamoDB-connector is better for connecting 2 services(EMR and DDB) only than playing with percentage.

dhirennavani commented 5 years ago

I faced the similar issue. You can actually get the number of concurrent tasks. The total number of concurrent tasks is equal to the number of partitions of the Spark RDD. Thats how we calculate the percentage. This is a workaround.

val numTasks = dynamoDBRDD.getNumPartitions

ddbConf.setNumMapTasks(1)
ddbConf.set(DynamoDBConstants.THROUGHPUT_WRITE_PERCENT(npxJobArgs.getFromArgs(NPXJobArgsEnum.DDB_WRITE_PERCENT).toDouble / numTasks).toString)

I propose a fix whereby the users can actually have the option to send the number of tasks as input based on which writeIOPS can be calculated. If that sounds good by the maintainers of the repo. I can work towards that.

nekshan commented 1 year ago

Faced a similar issue and digging into the logs found that the maxParallelTasks is being set to 1 which is in turn because "The calculated max number of concurrent map tasks is less than 1. Use 1 instead". https://github.com/awslabs/emr-dynamodb-connector/blob/42ba850a0f3472fecf82bf9da3a4afe2f063132b/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/util/TaskCalculator.java#L87 https://github.com/awslabs/emr-dynamodb-connector/blob/master/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/write/WriteIopsCalculator.java#L69

Haven't dug deeper yet but the following workaround works:

ddbConf.set(DynamoDBConstants.THROUGHPUT_WRITE_PERCENT, (0.9 / ddbConf.getNumMapTasks()).toString)