Azure / azure-cosmos-dotnet-v3

.NET SDK for Azure Cosmos DB for the core SQL API
MIT License
731 stars 488 forks source link

Allow consumption of Change Feed as a pull model #831

Closed mark-manticore closed 4 years ago

mark-manticore commented 4 years ago

Is your feature request related to a problem? Please describe. We are using the ChangeFeedProcessorBuilder in our WebAPI to update an in-memory cache with recent changes to a container as outlined in the change feed use cases documentation. We have multiple instances of our WebAPI that each have their own cache so each instance is working on a single processor and doesn't need to share updates.

Describe the solution you'd like Allow us to bypass the need for a lease container to read from the change feed. One solution might be to make ChangeFeedProcessorBuilder.WithInMemoryLeaseContainer() public with a way of specifying a starting continuation Token if desired.

Another would be to expose a container method that allows us to make a direct request for the change feed with our own continuation token and leave it to the caller to write the timer interval and storage mechanism (if desired).

Describe alternatives you've considered Giving a unique processor name per WebAPI does work but it is unnecessarily creating lease documents and updating them often (we are seeing ~22 requests per minute on our lease container per WebAPI. This multiplies based on number of logical partitions). WithLeaseConfiguration() seems to allow overriding the defaults to be much higher but there is no guidance for what appropriate values would be.

Additional context Failing to provide a WithLeaseContainer() call on the ChangeFeedProcessorBuilder results in an exception saying to add it or use WithInMemoryLeaseContainer which is not accessible.

ealsur commented 4 years ago

The option to provide a Continuation Token to the Change Feed Processor would not work. The Continuation Token would be limited to a particular lease, while the CFP process all leases in parallel.

Would something like https://github.com/Azure/azure-cosmos-dotnet-v3/pull/105 (a pull model) work? Or is your CFP working as a BackgroundWorker? How are you maintaining the CFP within the Web API?

mark-manticore commented 4 years ago

A direct pull model would definitely work for my use case and my 2nd suggestion above. Right now the CFP is initialized and started in service initialization (not a background worker). With a direct pulling method we could put it in an IHostedService (background worker) and have it read from the feed at a defined interval.

Our cache is of documents in a container that have a short TTL so starting to read from the beginning of time doesn't matter as much but I could see that being useful for other things. Rereading an existing document in the change feed also isn't a problem.

We would probably do something like this:

  1. Perform a normal query to fetch all documents and bring them into my cache
  2. Create iterator from container.GetStandByFeedIterator() (or whatever the public version will be called) passing in the start time based on the fetch all query.
  3. Call iterator.FetchNextSetAsync() adding any changes to my cache until it has no more results
  4. Store the global continuation token from the iterator for later
  5. Wait for X seconds
  6. Go to step 2 but use the global continuation token.

The CFP is very powerful because it makes it easy for multiple workers to process changes to documents in a container but acquiring/updating/releasing a lease for only 1 worker seems only necessary if you need resilience in processing that change feed. Allowing us to query the change feed directly gives us the option to not worry about leases in scenarios where continuing from an absolute point in time is not a requirement. Thanks.

AndreasHassing commented 4 years ago

Just chiming in to say that this is something I am interested in as well. Identically to @mark-manticore, we want to update in-memory caches using CFP and would like to avoid polluting the Cosmos-stored leases with per-process leases (they would need to be cleaned up recurrently). The in-memory lease container enables a cleaner approach for per-process CFP (per my understanding).

@ealsur any insights into why WithInMemoryLeaseContainer and GetStandByFeedIterator are internal and not public? It does not seem like they're used anywhere beside testing. Are they in a pre-release state?

ealsur commented 4 years ago

@AndreasHassing is your requirement for a pull model mechanism, like the OP, or the in-memory processor (push model)?

AndreasHassing commented 4 years ago

@ealsur does the in-memory processor add entries to the leases collection in Cosmos? If so, what happens when the in-memory processor restarts? Is a new entry added, or does it re-use the ContinuationToken? (Am I understanding the in-memory processor correctly, even?).

Push is more ideal than polling, I'd say. But if the in-memory push mechanism does not work like I think it does, then we'd settle for polling.

ealsur commented 4 years ago

The in-memory processor maintains leases in-memory. It's a push model where the state (Continuation Tokens and so on) is stored in memory. It allows a push model to run in a single instance without needing a lease container, but if the instance is restarted, all the state is obviously lost, since it is not stored anywhere and the Push model doesn't provide any sort of continuation (there is no need to).

AndreasHassing commented 4 years ago

@ealsur it sounds like the in-memory processor is exactly what we need. Is there a reason for it being internal only?

ealsur commented 4 years ago

It was an experiment and we didn't receive any scenario request that needed until now, that was mainly the reason.

AndreasHassing commented 4 years ago

Do I/we need to file a formal scenario request to get this API publicized? šŸ˜Š

ealsur commented 4 years ago

Could you create a separate Issue for tracking? This one seems to focus on a Pull model request.

AndreasHassing commented 4 years ago

@ealsur before I create it: can the in-memory CFP start at the end of the feed? Or will it always start at the beginning? I'm asking because you said:

The option to provide a Continuation Token to the Change Feed Processor would not work.

And that seems to imply that the in-memory CFP cannot start at any point other than the beginning of the change feed. If that is the case, we'll need to use the pull model instead and won't need to create a new issue šŸ˜Š.

ealsur commented 4 years ago

In-memory would work like the current CFP, either start from the beginning, or start at some point in time, or from now on. No way to pass any other state.

WimVergouwe commented 4 years ago

I'm trying to upgrade a V2 to a V3 sdk and am running into this missing feature. We expose a pull model feed, based on the efficient change feed. But GetStandByFeedIterator seems to be internal. Could this please be made public? I would also like to be able to read and the specify the partition key ranges in order to allow parallel feed processing.

ealsur commented 4 years ago

@WimVergouwe We are working on the pull model feature including support for parallel feed processing

WimVergouwe commented 4 years ago

@ealsur, thanks! Any rough ETA, so that we can plan this a bit?

ealsur commented 4 years ago

@WimVergouwe please see linked PR