The Azure Cosmos DB BulkExecutor library for Java acts as an extension library to the Cosmos DB Java SDK and provides developers out-of-the-box functionality to perform bulk operations in Azure Cosmos DB.
This project includes samples, documentation and performance tips for consuming the BulkExecutor library. You can download the official public maven package from here.
For example, using maven, you can add the following dependency to your maven pom file:
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>documentdb-bulkexecutor</artifactId>
<version>2.0.0</version>
</dependency>
/**
* Use the instance of {@link DocumentClient} to perform bulk operations in target {@link DocumentCollection} instance at specified allocated throughput.
* @param client an instance of {@link DocumentClient}
* @param partitionKeyDef specifies the {@link PartitionKeyDefinition} of the collection
* @param databaseName name of the database
* @param collectionName name of the collection
* @param offerThroughput specifies the throughput you want to allocate for bulk operations out of the collection's total throughput
* @return an instance of {@link Builder}
*/
public Builder from(DocumentClient client,
String databaseName,
String collectionName,
PartitionKeyDefinition partitionKeyDef,
int offerThroughput)
/**
* Use the given size to configure max mini-batch size (specific to bulk import API).
* If not specified will use the default value of 200 KB.
* @param size specifies the maximum size of a mini-batch used in bulk import API.
* @return {@link Builder}
*/
public Builder withMaxMiniBatchSize(int size)
/**
* Use the given count to configure max update mini-batch count (specific to bulk update API).
* If not specified will use the default value of 500.
* @param count specifies the maximum count of update items in a mini-batch used in bulk update API.
* @return {@link Builder}
*/
public Builder withMaxUpdateMiniBatchCount(int count)
/**
* Use the given retry options to apply to {@link DocumentClient} used in initialization of {@link DocumentBulkExecutor}.
* @param options an instance of {@link RetryOptions}
* @return {@link Builder}
*/
public Builder withInitializationRetryOptions(RetryOptions options)
/**
* Instantiates {@link DocumentBulkExecutor} given the configured {@link Builder}.
* @return the newly instantiated instance of {@link DocumentBulkExecutor}
* @throws Exception if there is any failure
*/
public DocumentBulkExecutor build() throws Exception
The bulk import API accepts a collection of JSON-serialized documents:
public BulkImportResponse importAll(
Collection<String> documents,
boolean isUpsert,
boolean disableAutomaticIdGeneration,
Integer maxConcurrencyPerPartitionRange) throws DocumentClientException;
The result of the bulk import API call contains the getter functions:
public int getNumberOfDocumentsImported();
public double getTotalRequestUnitsConsumed();
public Duration getTotalTimeTaken();
public List<Exception> getErrors();
public List<Object> getBadInputDocuments();
Initialize DocumentClient
ConnectionPolicy connectionPolicy = new ConnectionPolicy();
connectionPolicy.setMaxPoolSize(1000);
DocumentClient client = new DocumentClient(
HOST,
MASTER_KEY,
connectionPolicy,
ConsistencyLevel.Session)
Initialize DocumentBulkExecutor with high retry option values for the client SDK and then set to 0 to pass congestion control to DocumentBulkExecutor for its lifetime
// Set client's retry options high for initialization
client.getConnectionPolicy().getRetryOptions().setMaxRetryWaitTimeInSeconds(30);
client.getConnectionPolicy().getRetryOptions().setMaxRetryAttemptsOnThrottledRequests(9);
// Builder pattern Builder bulkExecutorBuilder = DocumentBulkExecutor.builder().from( client, DATABASE_NAME, COLLECTION_NAME, collection.getPartitionKey(), offerThroughput) // throughput you want to allocate for bulk import out of the collection's total throughput
// Instantiate DocumentBulkExecutor DocumentBulkExecutor bulkExecutor = bulkExecutorBuilder.build()
// Set retries to 0 to pass complete control to bulk executor client.getConnectionPolicy().getRetryOptions().setMaxRetryWaitTimeInSeconds(0); client.getConnectionPolicy().getRetryOptions().setMaxRetryAttemptsOnThrottledRequests(0);
* Call importAll API
```java
BulkImportResponse bulkImportResponse = bulkExecutor.importAll(documents, false, true, null);
You can find the complete sample command line tool consuming the bulk import API here
To build the command line tool from source (jar can be found in target folder):
mvn clean package
Here is a sample command line invocation for bulk import:
java -Xmx12G -jar bulkexecutor-sample-1.0-SNAPSHOT-jar-with-dependencies.jar -serviceEndpoint *** -masterKey *** -databaseId bulkImportDb -collectionId bulkImportColl -operation import -shouldCreateCollection -collectionThroughput 1000000 -partitionKey /profileid -maxConnectionPoolSize 6000 -numberOfDocumentsForEachCheckpoint 1000000 -numberOfCheckpoints 10
When the given sample command line tool is run (to bulk import 10 million documents of ~1KB) on a standard D16s v3 Azure Ubuntu VM in East US against a Cosmos DB collection in East US with 1 million RU/s allocated throughput - with command line configs numberOfDocumentsForEachCheckpoint set to 1000000 and numberOfCheckpoints set to 10, we observe the following performance for bulk import:
Total Number of documents inserted : 10000000
Average RUs/second : 628386
Average #Inserts/second : 108340
As seen, we observe >10x improvement in the write throughput using the bulk import API while providing out-of-the-box efficient handling of throttling, timeouts and transient exceptions - allowing easier scale-out by adding additional DocumentBulkExecutor client instances on individual VMs to achieve even greater write throughputs.
The bulk update (a.k.a patch) API accepts a collection of update items - each update item specifies the list of field update operations to be performed on a document identified by an id and parititon key value.
public BulkUpdateResponse updateAll(
Collection<UpdateItem> updateItems,
Integer maxConcurrencyPerPartitionRange) throws DocumentClientException;
Definition of UpdateItem
public class UpdateItem
{
private String id;
private Object partitionKeyValue;
private List<UpdateOperationBase> updateOperations;
public UpdateItem(String id, Object partitionKeyValue, List<UpdateOperationBase> list)
{
this.id = id;
this.partitionKeyValue = partitionKeyValue;
this.updateOperations = list;
}
public String getId()
{
return this.id;
}
public Object getPartitionKeyValue()
{
return this.partitionKeyValue;
}
public List<UpdateOperationBase> getUpdateOperations()
{
return this.updateOperations;
}
}
Supports incrementing any numeric document field by a specific value
public class IncUpdateOperation
{
public IncUpdateOperation(String field, Double value)
}
Supports setting any document field to a specific value
public class SetUpdateOperation<TValue>
{
public SetUpdateOperation(String field, TValue value)
}
Supports removing a specific document field along with all children fields
public class UnsetUpdateOperation
{
public UnsetUpdateOperation(String field)
}
Supports appending an array of values to a document field which contains an array
public class PushUpdateOperation
{
public PushUpdateOperation(String field, Object[] value)
}
Supports removing a specific value (if present) from a document field which contains an array
public class RemoveUpdateOperation<TValue>
{
public RemoveUpdateOperation(String field, TValue value)
}
Note: For nested fields, use '.' as the nesting separtor. For example, if you wish to set the '/address/city' field to 'Seattle', express as shown:
SetUpdateOperation<String> nestedPropertySetUpdate = new SetUpdateOperation<String>("address.city", "Seattle");
The result of the bulk update API call contains the getter functions:
public int getNumberOfDocumentsUpdated();
public double getTotalRequestUnitsConsumed();
public Duration getTotalTimeTaken();
public List<Exception> getErrors();
Initialize DocumentClient
ConnectionPolicy connectionPolicy = new ConnectionPolicy();
connectionPolicy.setMaxPoolSize(1000);
DocumentClient client = new DocumentClient(
HOST,
MASTER_KEY,
connectionPolicy,
ConsistencyLevel.Session)
Initialize DocumentBulkExecutor with high retry option values for the client SDK and then set to 0 to pass congestion control to DocumentBulkExecutor for its lifetime
// Set client's retry options high for initialization
client.getConnectionPolicy().getRetryOptions().setMaxRetryWaitTimeInSeconds(30);
client.getConnectionPolicy().getRetryOptions().setMaxRetryAttemptsOnThrottledRequests(9);
// Builder pattern Builder bulkExecutorBuilder = DocumentBulkExecutor.builder().from( client, DATABASE_NAME, COLLECTION_NAME, collection.getPartitionKey(), offerThroughput) // throughput you want to allocate for bulk import out of the collection's total throughput
// Instantiate DocumentBulkExecutor DocumentBulkExecutor bulkExecutor = bulkExecutorBuilder.build()
// Set retries to 0 to pass complete control to bulk executor client.getConnectionPolicy().getRetryOptions().setMaxRetryWaitTimeInSeconds(0); client.getConnectionPolicy().getRetryOptions().setMaxRetryAttemptsOnThrottledRequests(0);
* Define the update items along with corresponding field update operations
```java
SetUpdateOperation<String> nameUpdate = new SetUpdateOperation<>("Name","UpdatedDocValue");
UnsetUpdateOperation descriptionUpdate = new UnsetUpdateOperation("description");
ArrayList<UpdateOperationBase> updateOperations = new ArrayList<>();
updateOperations.add(nameUpdate);
updateOperations.add(descriptionUpdate);
List<UpdateItem> updateItems = new ArrayList<>(cfg.getNumberOfDocumentsForEachCheckpoint());
IntStream.range(0, cfg.getNumberOfDocumentsForEachCheckpoint()).mapToObj(j -> {
return new UpdateItem(Long.toString(prefix + j), Long.toString(prefix + j), updateOperations);
}).collect(Collectors.toCollection(() -> updateItems));
BulkUpdateResponse bulkUpdateResponse = bulkExecutor.updateAll(updateItems, null)
You can find the complete sample command line tool consuming the bulk update API here
To build the command line tool from source (jar can be found in target folder):
mvn clean package
Here is a sample command line invocation for bulk update:
java -Xmx12G -jar bulkexecutor-sample-1.0-SNAPSHOT-jar-with-dependencies.jar -serviceEndpoint *** -masterKey *** -databaseId bulkUpdateDb -collectionId bulkUpdateColl -operation update -collectionThroughput 1000000 -partitionKey /profileid -maxConnectionPoolSize 6000 -numberOfDocumentsForEachCheckpoint 1000000 -numberOfCheckpoints 10
Prior to running the above bulk update, ensure sample documents have been imported using:
java -Xmx12G -jar bulkexecutor-sample-1.0-SNAPSHOT-jar-with-dependencies.jar -serviceEndpoint *** -masterKey *** -databaseId bulkUpdateDb -collectionId bulkUpdateColl -operation import -shouldCreateCollection -collectionThroughput 1000000 -partitionKey /profileid -maxConnectionPoolSize 6000 -numberOfDocumentsForEachCheckpoint 1000000 -numberOfCheckpoints 10
When the given sample command line tool is run (to bulk update 10 million documents) on a standard D16s v3 Azure Ubuntu VM in East US against a Cosmos DB collection in East US with 1 million RU/s allocated throughput - with command line configs numberOfDocumentsForEachCheckpoint set to 1000000 and numberOfCheckpoints set to 10, we observe the following performance for bulk update:
Total Number of documents updated : 10000000
Average RUs/second : 564108
Average #Updates/second : 61244
The bulk update API is designed similar to bulk import - look at the implementation details of bulk import API for more details.
The bulk delete API accepts a query, whose results will be deleted from the collection
public BulkDeleteResponse deleteAll(
String query,
RequestOptions) throws DocumentClientException;
public BulkDeleteResponse deleteAll(
String query) throws DocumentClientException;
The result of the bulk delete API call contains the getter functions:
public int getNumberOfDocumentsDeleted();
public double getTotalRequestUnitsConsumed();
public Duration getTotalTimeTaken();
Initialize DocumentClient
ConnectionPolicy connectionPolicy = new ConnectionPolicy();
connectionPolicy.setMaxPoolSize(1000);
DocumentClient client = new DocumentClient(
HOST,
MASTER_KEY,
connectionPolicy,
ConsistencyLevel.Session)
Initialize DocumentBulkExecutor with high retry option values for the client SDK and then set to 0 to pass congestion control to DocumentBulkExecutor for its lifetime
// Set client's retry options high for initialization
client.getConnectionPolicy().getRetryOptions().setMaxRetryWaitTimeInSeconds(30);
client.getConnectionPolicy().getRetryOptions().setMaxRetryAttemptsOnThrottledRequests(9);
// Builder pattern Builder bulkExecutorBuilder = DocumentBulkExecutor.builder().from( client, DATABASE_NAME, COLLECTION_NAME, collection.getPartitionKey(), offerThroughput) // throughput you want to allocate for bulk delete out of the collection's total throughput
// Instantiate DocumentBulkExecutor DocumentBulkExecutor bulkExecutor = bulkExecutorBuilder.build()
// Set retries to 0 to pass complete control to bulk executor client.getConnectionPolicy().getRetryOptions().setMaxRetryWaitTimeInSeconds(0); client.getConnectionPolicy().getRetryOptions().setMaxRetryAttemptsOnThrottledRequests(0);
* Call deleteAll API
```java
String query = "select * from c where c.partitionKey = \"partitionKey-27\" and c.property = \"propertyValue\"";
// If the partition key is one of the filters used in the query, make sure to also include RequestOptions with the partition key set
RequestOptions requestOptions = new RequestOptions();
requestOptions.setPartitionKey(new PartitionKey("partitionKey-27");
BulkDeleteResponse bulkDeleteResponse = bulkExecutor.deleteAll(query, requestOptions);
You can find the complete sample command line tool consuming the bulk delete API here
You can configure the command line configurations to be passed in CmdLineConfiguration here.
To build the command line tool from source (jar can be found in target folder):
mvn clean package
Here is a sample command line invocation for bulk delete:
java -Xmx12G -jar bulkexecutor-sample-1.0-SNAPSHOT-jar-with-dependencies.jar -serviceEndpoint *** -masterKey *** -databaseId bulkDeleteDb -collectionId bulkDeletetColl -operation delete -partitionKey /profileid -maxConnectionPoolSize 6000
While the deleteAll api can be called both with and without RequestOptions, if the query contains a partition key filter, including RequestOptions (containing the PartitionKey value) in the call would be more RU efficient.
This project welcomes contributions and suggestions. Most contributions require you to agree to a Contributor License Agreement (CLA) declaring that you have the right to, and actually do, grant us the rights to use your contribution. For details, visit https://cla.microsoft.com.
When you submit a pull request, a CLA-bot will automatically determine whether you need to provide a CLA and decorate the PR appropriately (e.g., label, comment). Simply follow the instructions provided by the bot. You will only need to do this once across all repos using our CLA.
This project has adopted the Microsoft Open Source Code of Conduct. For more information see the Code of Conduct FAQ or contact opencode@microsoft.com with any additional questions or comments.
To give feedback and/or report an issue, open a GitHub Issue.
Microsoft and any contributors grant you a license to the Microsoft documentation and other content in this repository under the Creative Commons Attribution 4.0 International Public License, see the LICENSE file, and grant you a license to any code in the repository under the MIT License, see the LICENSE-CODE file.
Microsoft, Windows, Microsoft Azure and/or other Microsoft products and services referenced in the documentation may be either trademarks or registered trademarks of Microsoft in the United States and/or other countries. The licenses for this project do not grant you rights to use any Microsoft names, logos, or trademarks. Microsoft's general trademark guidelines can be found at http://go.microsoft.com/fwlink/?LinkID=254653.
Privacy information can be found at https://privacy.microsoft.com/en-us/
Microsoft and any contributors reserve all others rights, whether under their respective copyrights, patents, or trademarks, whether by implication, estoppel or otherwise.