Azure / azure-cosmos-dotnet-v3

.NET SDK for Azure Cosmos DB for the core SQL API
MIT License
741 stars 494 forks source link

Bulk API: New API not dependent on CosmosClient bulk execution flag #2522

Open asketagarwal opened 3 years ago

asketagarwal commented 3 years ago

Motivation

To support workloads where Bulk is not always the requirement and it can be a mix of both Bulk and Non-Bulk operations.

Requirements

Pull API

public IAsyncEnumerable<BulkOperationResponse<TContext>> ProcessBulkOperationsAsync(
                                        IAsyncEnumerable<BulkItemOperation<TContext>> inputOperations,
                                        BulkRequestOptions requestOptions,
                                        CancellationToken ct);

New Public Contracts:

public abstract class BulkItemOperation<TContext>
{
    static BulkItemOperation<TContext> GetReadItemOperation<T, TContext>(string id, PartitionKey pk, ItemRequestOptions requestOptions, TContext context);
    static BulkItemOperation<TContext> GetReadItemStreamOperation<TContext>(string id, PartitionKey pk, ItemRequestOptions requestOptions,TContext t);
    static BulkItemOperation<TContext> GetCreateItemOperation<T, TContext>(T item, ParttiionKey pk, ItemRequestOptions requestOptions,TContext t);
}

Implementation

It will be implemented using Batch (multiple batch requests).

j82w commented 3 years ago

How about a call back design. It avoids the need to correlate request to response.

public Task ProcessBulkOperationsAsync (
    IReadOnlyList<BulkItemOperation> operations, 
    BulkRequestOptions bulkRequestOptions, 
    CancellationToken cancellationToken)

public abstract class BulkItemOperation
{
    static BulkItemOperation ReadItemOperation<T>(
        string id, 
        PartitionKey pk = null, 
        BulkItemRequestOptions requestOptions = null,
        Func<ItemResponse<T>> responseCallBack);

    static BulkItemOperation ReadItemStreamOperation<T>(
        string id, 
        PartitionKey pk, 
        BulkItemRequestOptions requestOptions = null,
        Func<ResponseMessage> responseCallBack);

    static BulkItemOperation CreateItemOperation<T>(
        string id, 
        PartitionKey pk = null, 
        BulkItemRequestOptions requestOptions = null,
        Func<ItemResponse<T>> responseCallBack);
.......
.......
}
asketagarwal commented 3 years ago

With the callback method, how do we approach the Diagnostics, RequestCharge etc.

kirankumarkolli commented 1 year ago

One more alternative (offline discussion summary with @ealsur )

public class BulkExecutor 
{
    void Add<T>(Task<ItemResponse<T>>, object context);

    // Not supported ones only fail at runtime
    void Add(Task<ResponseMessage>, object context); 

    BulkResponse Execute(CancellationToken);
}

public class BulkResponse
{
    int FailCount { get; }

    // TODO: Replace with CosmosDiagnostics?
    TimeSpan GetClientElapsedTime();

    // Runtime failures in-case of in-correct casting
    ResponseMessage GetResponse(object context);
    ItemResponse<T> GetResponse<T>(object context);
}
kirankumarkolli commented 1 year ago

Above alternative fits for WaitAll() contract or still BLOCKING. Also updated requirements with beow