Azure / azure-documentdb-changefeedprocessor-dotnet

This library provides a host for distributing change feed events in partitioned collection across multiple observers. Instances of the host can scale up (by adding) or down (by removing) dynamically, and the load will be automatically distributed among active instances in about-equal way.
Other
51 stars 22 forks source link

Have multiple partitions and hosts but only one host handles but the lib says there is only one partititon #137

Closed tarikguney closed 5 years ago

tarikguney commented 5 years ago

The monitored collection has a partition key that creates multiple virtual partitions. Also, I have created multiple hosts, but I keep getting only one partition debug message, also the work is not distributed at all.

Here is what I am seeing all the time:

image

The code I am using is very simple:

var monitoredCollectionsInfo = new DocumentCollectionInfo
{
    Uri = new Uri(settings.ObservedDatabaseUri),
    MasterKey = settings.IncidentsCollectionReadKey,
    DatabaseName = settings.ObservedCollectionDatabaseName,
    CollectionName = settings.ObservedCollectionName
};
var leasesCollectionInfo = new DocumentCollectionInfo
{
    Uri = new Uri(settings.LeaseDatabaseUri),
    MasterKey = settings.IncidentsLeasesMasterKey,
    DatabaseName = settings.LeaseCollectionDatabaseName,
    CollectionName = settings.LeaseCollectionName,
};
var processorOptions = new ChangeFeedProcessorOptions()
{
    MaxItemCount = 100,
    CheckpointFrequency = new CheckpointFrequency
    {
        ExplicitCheckpoint = true,
    },
};
return new ChangeFeedProcessorBuilder()
    .WithHostName("LNIC-" + Guid.NewGuid().ToString())
    .WithFeedCollection(monitoredCollectionsInfo)
    .WithLeaseCollection(leasesCollectionInfo)
    .WithObserverFactory(provider.GetService<IChangeFeedObserverFactory>())
    .WithProcessorOptions(processorOptions)
    .BuildAsync().Result;

I am running the code above from within different processes. What I have observed is that there is not parallelization in different work distribution among various hosts. It is only one host that takes on the work no matter how many hosts and partitions are there. The only slight work distribution I observed happens when a host fails, and one of the other hosts take over and start acting similar: Working only by itself while the other hosts are doing nothing but waiting. So I debugged your library, and the library creates a lease (which turns out to be a lease document on the given lease collection). However, I always see only one lease document is being created.

As opposed to what's actually happening, here is what the documentation says: https://docs.microsoft.com/en-us/azure/cosmos-db/read-change-feed#using-the-change-feed-processor-library

The change feed processor library hides complexity and still gives you a complete control of the change feed. The library follows the observer pattern, where your processing function is called by the library. If you have a high throughput change feed, you can instantiate multiple clients to read the change feed. Because you're using change feed processor library, it will automatically divide the load among the different clients without you having to implement this logic. All the complexity is handled by the library.

From the documentation above and the source code that I analyzed, I am supposed to see multiple lease documents being created. Nevertheless, this the response when the library makes call to Cosmos DB instance to get the pkranges of the monitored collection. This seems to be the root of the problem here as the lease documents, when missing, are created based on the response.

{
    "id": "[redacted].documents.azure.com_FEY0AA==_FEY0AM3HXRs=..0",
    "_etag": "\"01005e32-0000-0100-0000-5cb9281f0000\"",
    "PartitionId": "0",
    "Owner": "LNIC-1ac24adb-db05-4ac3-ab43-b197fb5f094d",
    "ContinuationToken": "\"488\"",
    "properties": {},
    "timestamp": "2019-04-19T01:44:59.4022292Z",
    "_rid": "FEY0ALMclHEeAAAAAAAAAA==",
    "_self": "dbs/FEY0AA==/colls/FEY0ALMclHE=/docs/FEY0ALMclHEeAAAAAAAAAA==/",
    "_attachments": "attachments/",
    "_ts": 1555638303
}

The below is where the request is made in the library's source code that returns the response above:

private async Task<List<PartitionKeyRange>> EnumPartitionKeyRangesAsync()
{
    string partitionKeyRangesPath = string.Format(CultureInfo.InvariantCulture, "{0}/pkranges", this.collectionSelfLink);
    IFeedResponse<PartitionKeyRange> response = null;
    var partitionKeyRanges = new List<PartitionKeyRange>();
    do
    {
        var feedOptions = new FeedOptions
        {
            MaxItemCount = this.maxBatchSize,
            RequestContinuation = response?.ResponseContinuation,
        };
        response = await this.documentClient.ReadPartitionKeyRangeFeedAsync(partitionKeyRangesPath, feedOptions).ConfigureAwait(false);
        IEnumerator<PartitionKeyRange> enumerator = response.GetEnumerator();
        while (enumerator.MoveNext())
        {
            partitionKeyRanges.Add(enumerator.Current);
        }
    }
    while (!string.IsNullOrEmpty(response.ResponseContinuation));
    return partitionKeyRanges;
}
ealsur commented 5 years ago

Based on https://docs.microsoft.com/en-us/azure/cosmos-db/change-feed-processor#implementing-the-change-feed-processor-library, the work can be parallelized based on the multiplicity of partition key values (https://docs.microsoft.com/en-us/azure/cosmos-db/partition-data#logical-partitions). The more diversity and cardinality of your partition key values, the further the scaling capabilities of the library.

In your case, your container seems to be either fixed (no partition key) or contains a single partition key value, thus, you only have 1 lease. The lease is the distributable unit of work for the library. If you have X leases, and 1 host, that host will own and process in parallel all leases. If you scale to more hosts, the leases will get equally distributed and compute will get also distributed. But it all depends on your data distribution.

tarikguney commented 5 years ago

I don't agree with the assumption that my container has fixed size and one partition key. It has more.

Also this the settings of the container:

image

ealsur commented 5 years ago

If your leases container only contains 1 document with a PartitionId (which is not really the partition, just the name used to reference the partitioned unit of work) and that value is 0, it means the container only contains 1 Partition Key value range so far. It could be that it was a fixed container, which you correctly dismissed, but it also could mean that the diversity and granularity of your Partition Key values is not big enough. The logic inside the library uses this as unit of parallelism and that is referenced in the graphic in the official docs, each range of values can be assigned to 1 host.

bentmar commented 2 years ago

seeing the same behavior on my part (using cosmos db change feed trigger with consumption plan functions in c#).

Im guessing this is because of the physical vs logical (partitionId) partitions.

My collection has 5mil rows, with 73k different partitionIds. The partitionkey is a guid.

A changefeed processor will only manage to create 1 partition lease, for partition 0.

Can someone confirm that this is the case? Or point me to some documentation that speaks of when you actually can paralellize workers on the same colletion (not as in creating workers with different lease-prefixes).

Thx!

ealsur commented 2 years ago

https://docs.microsoft.com/en-us/azure/cosmos-db/sql/change-feed-processor#components-of-the-change-feed-processor

image

https://docs.microsoft.com/en-us/azure/cosmos-db/partitioning-overview#physical-partitions