audienceproject / spark-dynamodb

Plug-and-play implementation of an Apache Spark custom data source for AWS DynamoDB.
Apache License 2.0
175 stars 90 forks source link

Added option for choosing absolute read, write throughput (cumulative). Issue with Parallelism while writing is handled. #66

Open LalithSrinivas opened 4 years ago

LalithSrinivas commented 4 years ago

What ? Add support for Dynamo batch writes using absolute RCU and WCU limit set by the spark job, along with target capacity based batch writes. We also added an option to increase / decrease the parallelism while writing data in dynamoDB by setting number of partitions. Why ? Adding the support for absolute RCU and WCU gives the user more flexibility in terms of running the job. It also reduces the manual work of calculating the target-capacity % based on the provisioned RCU & WCU Other changes We also noticed that retries to write a dynamo record is infinite without a hard limit. We give an option for the user to set a limit on the retries using maxretries How

  1. By passing "absWrite", "absRead" options, the user can choose to define cumulative write and read throughput. This is achieved by considering "absread", "abswrite" parameters. If parameters map contains these keys. Then their values are considered as a cumulative throughput limit. Else there are set to -1 at first, then, as before, throughput is fetched from the table properties. (lines from 69, TableConnector.scala)

  2. If a data frame is user-defined and it is distributed in more (or less) partitions than the value of defaultParallelism parameter, then there will be an overflow (or underflow) of Write Capacity Units. To handle this issue, one should identify the parallelism factor for a user-defined data frame. That can be done by identifying number of tasks running. which is equals to (number of stages) * (number of partitions, the DF is in). As the task is of one stage, number of tasks = number of partitions of DF. Hence an argument with numInputDFPartitions parameter. (line 57, 85, TableConnector.scala)

  3. Infinite recursion case in handleBatchWriteResponse method is handled by adding a maximum retries constraint. The maxRetries can be set by user, by using maxRetries parameter