The Azure Cosmos DB BulkExecutor library for .NET acts as an extension library to the Cosmos DB .NET SDK and provides developers out-of-the-box functionality to perform bulk operations in Azure Cosmos DB.
This project includes samples, documentation and performance tips for consuming the BulkExecutor library. You can download the official public NuGet package from here.
We provide two overloads of the bulk import API - one which accepts a list of JSON-serialized documents and the other a list of deserialized POCO documents.
With list of JSON-serialized documents
Task<BulkImportResponse> BulkImportAsync(
IEnumerable<string> documents,
bool enableUpsert = false,
bool disableAutomaticIdGeneration = true,
int? maxConcurrencyPerPartitionKeyRange = null,
int? maxInMemorySortingBatchSize = null,
CancellationToken cancellationToken = default(CancellationToken));
With list of deserialized POCO documents
Task<BulkImportResponse> BulkImportAsync(
IEnumerable<object> documents,
bool enableUpsert = false,
bool disableAutomaticIdGeneration = true,
int? maxConcurrencyPerPartitionKeyRange = null,
int? maxInMemorySortingBatchSize = null,
CancellationToken cancellationToken = default(CancellationToken));
The result of the bulk import API call contains the following attributes:
Initialize DocumentClient set to Direct TCP connection mode
ConnectionPolicy connectionPolicy = new ConnectionPolicy
{
ConnectionMode = ConnectionMode.Direct,
ConnectionProtocol = Protocol.Tcp
};
DocumentClient client = new DocumentClient(
new Uri(endpointUrl),
authorizationKey,
connectionPolicy)
Initialize BulkExecutor with high retry option values for the client SDK and then set to 0 to pass congestion control to BulkExector for its lifetime
// Set retry options high during initialization (default values).
client.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 30;
client.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 9;
IBulkExecutor bulkExecutor = new BulkExecutor(client, dataCollection); await bulkExecutor.InitializeAsync();
// Set retries to 0 to pass complete control to bulk executor. client.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 0; client.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 0;
* Call BulkImportAsync API
```csharp
BulkImportResponse bulkImportResponse = await bulkExecutor.BulkImportAsync(
documents: documentsToImportInBatch,
enableUpsert: true,
disableAutomaticIdGeneration: true,
maxConcurrencyPerPartitionKeyRange: null,
maxInMemorySortingBatchSize: null,
cancellationToken: token);
You can find the complete sample application program consuming the bulk import API here - which generates random documents to be then bulk imported into an Azure Cosmos DB collection. You can configure the application settings in appSettings here.
You can download the Microsoft.Azure.CosmosDB.BulkExecutor nuget package from here.
Let us compare the performace of the bulk import sample application against a multi-threaded application which utilizes point writes (CreateDocumentAsync API in DocumentClient)
Both the applications are run on a standard DS16 v3 Azure VM in East US against a Cosmos DB collection in East US with 1 million RU/s allocated throughput.
The bulk import sample is executed with NumberOfDocumentsToImport set to 25 million and NumberOfBatches set to 25 (in App.config) and default parameters for the bulk import API. The multi-threaded point write application is set up with a DegreeOfParallelism set to 2000 (spawns 2000 concurrent tasks) which maxes out the VM's CPU.
We observe the following performance for ingestion of 25 million (~1KB) documents into a 1 million RU/s Cosmos DB collection:
Time taken (sec) | Writes/second | RU/s consumed | |
---|---|---|---|
Bulk import API | 262 | 95528 | 494186 |
Multi-threaded point write | 2431 | 10280 | 72481 |
As seen, we observe >9x improvement in the write throughput using the bulk import API while providing out-of-the-box efficient handling of throttling, timeouts and transient exceptions - allowing easier scale-out by adding additional BulkExecutor client instances on individual VMs to achieve even greater write throughputs.
When a bulk import API is triggered with a batch of documents, on the client-side, they are first shuffled into buckets corresponding to their target Cosmos DB partition key range. Within each partiton key range bucket, they are broken down into mini-batches and each mini-batch of documents acts as a payload that is committed transactionally.
We have built in optimizations for the concurrent execution of these mini-batches both within and across partition key ranges to maximally utilize the allocated collection throughput. We have designed an AIMD-style congestion control mechanism for each Cosmos DB partition key range to efficiently handle throttling and timeouts.
These client-side optimizations augment server-side features specific to the BulkExecutor library which together make maximal consumption of available throughput possible.
The bulk update (a.k.a patch) API accepts a list of update items - each update item specifies the list of field update operations to be performed on a document identified by an id and parititon key value.
Task<BulkUpdateResponse> BulkUpdateAsync(
IEnumerable<UpdateItem> updateItems,
int? maxConcurrencyPerPartitionKeyRange = null,
int? maxInMemorySortingBatchSize = null,
CancellationToken cancellationToken = default(CancellationToken));
Definition of UpdateItem
class UpdateItem
{
public string Id { get; private set; }
public string PartitionKey { get; private set; }
public IEnumerable<UpdateOperation> UpdateOperations { get; private set; }
public UpdateItem(
string id,
string partitionKey,
IEnumerable<UpdateOperation> updateOperations)
{
this.Id = id;
this.PartitionKey = partitionKey;
this.UpdateOperations = updateOperations;
}
}
Supports incrementing any numeric document field by a specific value
class IncUpdateOperation<TValue>
{
IncUpdateOperation(string field, TValue value)
}
Supports setting any document field to a specific value
class SetUpdateOperation<TValue>
{
SetUpdateOperation(string field, TValue value)
}
Supports removing a specific document field along with all children fields
class UnsetUpdateOperation
{
SetUpdateOperation(string field)
}
Supports appending an array of values to a document field which contains an array
class PushUpdateOperation
{
PushUpdateOperation(string field, object[] value)
}
Supports removing a specific value (if present) from a document field which contains an array
class RemoveUpdateOperation<TValue>
{
RemoveUpdateOperation(string field, TValue value)
}
Note: For nested fields, use '.' as the nesting separtor. For example, if you wish to set the '/address/city' field to 'Seattle', express as shown:
SetUpdateOperation<string> nestedPropertySetUpdate = new SetUpdateOperation<string>("address.city", "Seattle");
The result of the bulk update API call contains the following attributes:
Initialize DocumentClient set to Direct TCP connection mode
ConnectionPolicy connectionPolicy = new ConnectionPolicy
{
ConnectionMode = ConnectionMode.Direct,
ConnectionProtocol = Protocol.Tcp
};
DocumentClient client = new DocumentClient(
new Uri(endpointUrl),
authorizationKey,
connectionPolicy)
Initialize BulkExecutor with high retry option values for the client SDK and then set to 0 to pass congestion control to BulkExector for its lifetime
// Set retry options high during initialization (default values).
client.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 30;
client.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 9;
IBulkExecutor bulkExecutor = new BulkExecutor(client, dataCollection); await bulkExecutor.InitializeAsync();
// Set retries to 0 to pass complete control to bulk executor. client.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 0; client.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 0;
* Define the update items along with corresponding field update operations
```csharp
SetUpdateOperation<string> nameUpdate = new SetUpdateOperation<string>("Name", "UpdatedDoc");
UnsetUpdateOperation descriptionUpdate = new UnsetUpdateOperation("description");
List<UpdateOperation> updateOperations = new List<UpdateOperation>();
updateOperations.Add(nameUpdate);
updateOperations.Add(descriptionUpdate);
List<UpdateItem> updateItems = new List<UpdateItem>();
for (int i = 0; i < 10; i++)
{
updateItems.Add(new UpdateItem(i.ToString(), i.ToString(), updateOperations));
}
BulkUpdateResponse bulkUpdateResponse = await bulkExecutor.BulkUpdateAsync(
updateItems: updateItems,
maxConcurrencyPerPartitionKeyRange: null,
maxInMemorySortingBatchSize: null,
cancellationToken: token);
You can find the complete sample application program consuming the bulk update API here. You can configure the application settings in appSettings here.
In the sample application, we first bulk import documents and then bulk update all the imported documents to set the Name field to a new value and unset the description field in each document.
You can download the Microsoft.Azure.CosmosDB.BulkExecutor nuget package from here.
When the given sample application is run on a standard DS16 v3 Azure VM in East US against a Cosmos DB collection in East US with 1 million RU/s allocated throughput - with NumberOfDocumentsToUpdate set to 25 million and NumberOfBatches set to 25 (in App.config) and default parameters for the bulk update API (as well as bulk import API), we observe the following performance for bulk update:
Updated 25000000 docs @ 53796 update/s, 491681 RU/s in 464.7 sec
The bulk update API is designed similar to bulk import - look at the implementation details of bulk import API for details.
The bulk delete API accepts a list of <partitionKey, documentId> tuples to delete in bulk.
Task<BulkDeleteResponse> BulkDeleteAsync(
List<Tuple<string, string>> pkIdTuplesToDelete,
int? deleteBatchSize = null,
CancellationToken cancellationToken = default(CancellationToken));
The result of the bulk delete API call contains the following attributes:
Initialize DocumentClient set to Direct TCP connection mode
ConnectionPolicy connectionPolicy = new ConnectionPolicy
{
ConnectionMode = ConnectionMode.Direct,
ConnectionProtocol = Protocol.Tcp
};
DocumentClient client = new DocumentClient(
new Uri(endpointUrl),
authorizationKey,
connectionPolicy)
Initialize BulkExecutor with high retry option values for the client SDK and then set to 0 to pass congestion control to BulkExector for its lifetime
// Set retry options high during initialization (default values).
client.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 30;
client.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 9;
BulkExecutor bulkExecutor = new BulkExecutor(client, dataCollection); await bulkExecutor.InitializeAsync();
// Set retries to 0 to pass complete control to bulk executor. client.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 0; client.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 0;
* Define the list of <PartitionKey, DocumentId> tuples to delete
```csharp
List<Tuple<string, string>> pkIdTuplesToDelete = new List<Tuple<string, string>>();
for(int i=0; i < NumberOfDocumentsToDelete; i++)
{
pkIdTuplesToDelete.Add(new Tuple<string, string>(i.ToString(), i.ToString()));
}
BulkDeleteResponse bulkDeleteResponse = await bulkExecutor.BulkDeleteAsync(pkIdTuplesToDelete);
You can find the complete sample application program consuming the bulk delete API here. You can configure the application settings in appSettings here.
In the sample application, we first bulk import documents and then bulk delete a portion of the imported documents.
You can download the Microsoft.Azure.CosmosDB.BulkExecutor nuget package from here.
<runtime>
<gcServer enabled="true" />
</runtime>
<system.diagnostics>
<trace autoflush="false" indentsize="4">
<listeners>
<add name="logListener" type="System.Diagnostics.TextWriterTraceListener" initializeData="application.log" />
<add name="consoleListener" type="System.Diagnostics.ConsoleTraceListener" />
</listeners>
</trace>
</system.diagnostics>
This project has adopted the Microsoft Open Source Code of Conduct. For more information see the Code of Conduct FAQ or contact opencode@microsoft.com with any additional questions or comments.
See CONTRIBUTING.md for contribution guidelines.
To give feedback and/or report an issue, open a GitHub Issue.