Azure / azure-cosmos-dotnet-v3

.NET SDK for Azure Cosmos DB for the core SQL API
MIT License
736 stars 491 forks source link

FeedRange - Scaling out support #1680

Open ealsur opened 4 years ago

ealsur commented 4 years ago

With the new Change Feed pull model approaching and the use of FeedRange, one pending item is support for scaling out running iterators.

Currently, the flow is:

  1. User gets FeedRanges
  2. User starts 1 iterator per FeedRange on each machine/instance
  3. Some time later, some of the machines are running hot on CPU because possibly the ranges they are handling are hotter than the other machines (or those ranges had splits)

How do we allow users to scale out if they want to?

Open to discussion

One idea would be to have a method on the FeedRange itself, possibly List<FeedRange> FeedRange.Scale(int? ranges) that will attempt to split the range into the required # of ranges. If the parameter is not passed, we can either split using physical partition affinity or other limits.

Another idea would be to use it on the Continuation tokens. Since this is an scenario for when an iterator has been already running, we have a Continuation token. We could then take them as input and return new continuations after the split: List<string> FeedRange.Scale(string currentContinuationToken, int? expectedRanges).

Limits

Scale out operations can always face a limit. When EPK filtering is in place in the backend, a FeedRange cannot be split further than a single Partition Key value. Without EPK filtering in place, the limit is on the physical partition.

In any case, the API can always return a negative or "cannot scale out further" semantics.

bchong95 commented 4 years ago

I believe we talked about this and voted to not go for a Scale method.

Instead you can have:

bool FeedRangeFactory.TrySplit(FeedRange source, out (FeedRange target1 FeedRange target2))
bool FeedRangeFactory.TryMerge((FeedRange source1, FeedRange source2), out FeedRange target)

The Try Pattern will naturally imply to the user that the operation may fail. You can add monadic overloads to return a reason for the failure:

TryCatch<(FeedRange, FeedRange)> FeedRangeFactory.Monadic.Split(FeedRange source)
TryCatch<FeedRange> FeedRangeFactory.Monadic.Merge((FeedRange source1, FeedRange source2))
FabianMeiswinkel commented 4 years ago

I like the pattern Brandon proposed of just exposing Split/Merge - it provides the necessary building blocks for customers to achieve different scenarios. Any abstraction - like the Scale method - would just result in more confusion when it isn't possible - and the higher the level of abstraction the harder to explain the reasons why you can scale up to 8 but not 12 now - and a couple of hours later it works etc. (because more physical partitions exist) - Adding higher level abstractions might be a next step when Split/Merge/EPK support etc. have settled some more

j82w commented 4 years ago

Is there any scenario where user actually need to scale beyond a single range for a physical partition? I can't think of scenario where the user would want to scale to a range to the point of a single partition key value. The other question is can the service even handle that many connections or will that hit some restriction that will cause more CRIs.

My only suggestion with Brandon's proposal would be to make it a method rather than a static factory. It's easier to discover and unit test.

bool myCurrentFeedRange.TrySplit(out FeedRange target1, out FeedRange target2)
bool myCurrentFeedRange.TryMerge(FeedRange source1, FeedRange source2, out FeedRange target)

One other possibility is to have a call back method when a split\merge happens. That way the user doesn't have to constantly check if one or the other operation is possible.

myCurrentFeedRange.ScalingCallback(Action <FeedRange, ScaleType> scalingCallback); 
FabianMeiswinkel commented 4 years ago

I don't think a callback would address the most commons scenarios. Those - IMO - are when Compute is also dynamically scaling - for example in the Spark connector the number of workers you have in Spark could change - or when you realize that the change feed processing for a certain feedRange lacks behind others you might want to have more workers for that feedRange -s o just Split that particular FeedRange. Callback would only be triggered when the number of physcial partition changes - but the main job often is do establish the right balance between compute resources available and number of FeedRanges available in the backend etc.

sivethe commented 4 years ago

For Merge() api, is the expectation that customers will change it on all possible combinations to find out which ranges can be merged? Excluding the intra-partition feed range case, how can customer deterministically split/merge feed ranges to match physical layouts

bchong95 commented 4 years ago

@sivethe depending on the implementation we can allow for two ranges that are not adjacent to be merged together. We would just need to add a FeedRange implementation that can compose other FeedRanges. Early iterations won't support this, but that is the nice part of the Try based API .. as we get more advanced in the scale out story our users should see more of their try split/merge calls succeed.

zbynek001 commented 3 years ago

Any news here? I'm interested in a simple scenario, where we want to end with one FeedIterator per physical partition. Currently we're missing some functionality to split the FeedIterator in case of partition split. Also some notification that the split happened would be nice, but not necessary. Is there currently any workaround for splitting it?

bchong95 commented 3 years ago

This has been solved with ChangeFeedCrossFeedRangeState:

https://github.com/Azure/azure-cosmos-dotnet-v3/blob/master/Microsoft.Azure.Cosmos/src/ChangeFeed/ChangeFeedCrossFeedRangeState.cs

And the PR has documentation (refer to "Feed Range Scale Out")

https://github.com/Azure/azure-cosmos-dotnet-v3/pull/1978

zbynek001 commented 3 years ago

How do you use the ChangeFeedCrossFeedRangeState.TrySplit, when it's all internal? Am I missing something?

bchong95 commented 3 years ago

The code uses the following macro pattern:

#if INTERNAL
#pragma warning disable CS1591 // Missing XML comment for publicly visible type or member
#pragma warning disable SA1600 // Elements should be documented
#pragma warning disable SA1601 // Partial elements should be documented
    public
#else
    internal
#endif 

You can build your project with the following:

<DefineConstants>$(DefineConstants);INTERNAL</DefineConstants>

And you should access to all these types. Note that the type isn't actually GA'd, so you may encounter build breaks in the future, but you should be able to use it as a decent stop gap solution.

j82w commented 3 years ago

This needs to be part of the public API before it can be resolved. Asking user to do a custom build to access an internal API is not a reasonable ask. Also internal APIs are can have breaking changes in the future making it not a sustainable path for customers.

zbynek001 commented 3 years ago

just checking if there is anything new here. Is there really no plan to support scale-out scenarios for long running change feed queries? This was possible to do in v2 but it's not possible in v3, because partition splits are hidden inside FeedIterator. Which is nice in most cases, but not so good for long running change feed queries. Because with data growth, that ends up with few FeedIterators having quite many physical partitions inside and no way to split that.

j82w commented 3 years ago

@zbynek001 as a workaround is there any reason you can't stop all the changefeed iterators. Then get a list of the new FeedRanges, then from the list create new changefeed iterators using the start from date?

zbynek001 commented 3 years ago

@j82w short answer: no, that's not really possible. The application is running distributed across several servers, each one is processing a set of partitions, and each partition is processed independently to some extend. The result is, the processing state of each partition is at different position in the changefeed. So we'd have to wait for everything to be at the same position. And even if we could do that, how would we translate this to some datetime to start from?

In v2, we were able to detect the partition splits via the exception with special StatusCode/SubStatusCode, then enumerate the PartitionKeyRanges, from it detect what are the new partition it was split into, and start new changefeed processing.

In v3, we cannot detect the split, and even if we could, there is currently no way to e.g. split the FeedRange/FeedIterator

j82w commented 3 years ago

We will have a update next week with a plan to support this scenario.

ealsur commented 2 years ago

@zbynek001 @j82w Should we open a separate thread to discuss this particular scenario? This is mostly a Design Issue to track Design iterations.

@zbynek001 When you create the new Issue, could you explain the nature of your workload and how scaling out to more machines would benefit it? Is the workload CPU bound and hence it benefits from adding more and more machines? Or is it I/O bound? Would the Change Feed Processor help?

zbynek001 commented 2 years ago

@ealsur @j82w Here is the new ticket: #2759