Purpose:
Limit the RU consumption during bulk operations with OLTP Spark connector, so that
• available RUs on the container / database can be shared across processes by reserving only a given % of RUs for bulk operations
• minimize throttling during bulk operations there by reducing the overhead of retries
Context:
The spark dataframe that is used for bulk operations can have one or more spark partitions and determines the number of parallel spark tasks to import the documents to cosmos db container. The documents in each spark partition are in turn split across the cosmos db physical partitions based on the hash value of the defined partition key. The documents corresponding to each cosmos db physical partition are bucketed into minibatches and imported in parallel by the underlying spark tasks. In summary, each spark partition / task ingests into all cosmos db physical partitions in parallel at any given point of time, with a given fairly equally distributed dataset.
Details:
The 2 levers that are used to limit the RU consumption are
• Mini Batch size
• Sleep duration between mini batch imports
Below are the high level steps that are being used to dynamically control the above 2 levers during each minibatch import, so as to align the total RU consumption with the provided WriteThroughputBudget.
• An initial general case mini batch size is calculated based on the provided writeThroughputBudget, # of spark partitions and the baseRUConsumption
• An initial one-time bulk import is performed with this batch size and the RU consumption for the above batch import is collected. The collected RU consumption will take into consideration the indexed vs non-indexed target container, size of the imported docs etc:
• A miniBatchSizeAdjustmentFactor is calculated based on the above RU consumption and the minibatch size is adjusted based on this.
• Based on the Elapsed time and the consumed RU for each batch import, a sleep duration is calculated to limit the RU consumption per second and is used to pause the thread prior to the next batch import.
A new config “maxIngestionTaskParallelism” is added that may be used in rare BulkImport cases where there is not enough cores available on spark cluster as there are spark partitions. For eg:, if the ingestion DF has 100 spark partitions and the spark cluster has only 32 cores there by limiting it to have max parallelism of 32 spark tasks, then “maxIngestionTaskParallelism” can be set to 32 for the calculations to take into account the max number of parallel tasks.
Experiments:
Testing has been performed against indexed / non-indexed, manual / auto-scale containers with varying # of spark partitions, # of cosmos db partitions and the actual RU consumption have been in the +/- 10% ranges of the provided WriteThroughputRUBudget.
Purpose: Limit the RU consumption during bulk operations with OLTP Spark connector, so that • available RUs on the container / database can be shared across processes by reserving only a given % of RUs for bulk operations • minimize throttling during bulk operations there by reducing the overhead of retries
Context: The spark dataframe that is used for bulk operations can have one or more spark partitions and determines the number of parallel spark tasks to import the documents to cosmos db container. The documents in each spark partition are in turn split across the cosmos db physical partitions based on the hash value of the defined partition key. The documents corresponding to each cosmos db physical partition are bucketed into minibatches and imported in parallel by the underlying spark tasks. In summary, each spark partition / task ingests into all cosmos db physical partitions in parallel at any given point of time, with a given fairly equally distributed dataset.
Details: The 2 levers that are used to limit the RU consumption are • Mini Batch size • Sleep duration between mini batch imports
Below are the high level steps that are being used to dynamically control the above 2 levers during each minibatch import, so as to align the total RU consumption with the provided WriteThroughputBudget. • An initial general case mini batch size is calculated based on the provided writeThroughputBudget, # of spark partitions and the baseRUConsumption
• An initial one-time bulk import is performed with this batch size and the RU consumption for the above batch import is collected. The collected RU consumption will take into consideration the indexed vs non-indexed target container, size of the imported docs etc:
• A miniBatchSizeAdjustmentFactor is calculated based on the above RU consumption and the minibatch size is adjusted based on this.
• Based on the Elapsed time and the consumed RU for each batch import, a sleep duration is calculated to limit the RU consumption per second and is used to pause the thread prior to the next batch import.
A new config “maxIngestionTaskParallelism” is added that may be used in rare BulkImport cases where there is not enough cores available on spark cluster as there are spark partitions. For eg:, if the ingestion DF has 100 spark partitions and the spark cluster has only 32 cores there by limiting it to have max parallelism of 32 spark tasks, then “maxIngestionTaskParallelism” can be set to 32 for the calculations to take into account the max number of parallel tasks.
Experiments: Testing has been performed against indexed / non-indexed, manual / auto-scale containers with varying # of spark partitions, # of cosmos db partitions and the actual RU consumption have been in the +/- 10% ranges of the provided WriteThroughputRUBudget.