Azure / azure-functions-durable-extension

Durable Task Framework extension for Azure Functions
MIT License
714 stars 270 forks source link

At-most-once support for function calls #1555

Open cgillum opened 3 years ago

cgillum commented 3 years ago

Is your feature request related to a problem? Please describe.

This is an ask that came in from one of our customers that has built a customer facing integration platform on top of Durable Functions. Their customers use visual tools to build workflows that integrate with 3rd party SaaS services. It's very important for these customers that interruptions in storage availability do not result in duplicate executions without explicit permission from the customer.

Today, we advertise that Durable Functions guarantees at-least-once execution. This means transient network or compute failures could result in functions executing more than once. Customers are thus required to make their functions idempotent, which is not always possible or practical. This proposed feature would allow customers to opt-out of at-least-once and instead opt into at-most-once execution guarantees.

Describe the solution you'd like

There are two parts to the proposed solution:

  1. A programming model for declaring that a function (activity, orchestrator, or entity) must be executed with at-most-once guarantees. In other words, if we receive a message with DequeueCount > 1, then we discard it and fail the orchestration. This ensures that any function that would have been triggered by this message will never be executed more than once.

  2. An API to rewind an orchestration to its last known healthy state. We actually already have this feature, but it's in preview. We would need to finish making it production-ready.

These two features together would allow an orchestration to fail if a potential duplicate execution is detected. A support engineer (i.e. the app owner) could then investigate the failure and then decide whether to resume the orchestration using the rewind API or to just let it stay terminated if replaying is not safe.

Describe alternatives you've considered

As an alternative to automatically failing an orchestration if duplicates are detected (or perhaps in addition), we could expose a new property in the context interfaces that exposes the retry count of the current message. With this property, customer code can make its own decision about how to handle duplicate executions. This could be a simpler solution that is more flexible. However, it would still require a way to stop and resume the orchestration via some manual intervention.

Another option to consider would be to introduce a proper suspend/resume feature. Customers have asked for a feature like this before, and it could also help us implement poison message handling. If a duplicate message is detected, instead of failing the orchestration, the orchestration could be "suspended". A new API could then be introduced that resumes a suspended orchestration. This would be overall a cleaner and potentially safer design since failed orchestrations have the downside of permanently discarding any new messages that arrive. However, it would potentially be a much larger work item.

EDIT: 5/17/2023: I think we arrived at a better/simpler solution than what I've proposed above. See this comment for the most up-to-date thinking.

davidmrdavid commented 3 years ago

Yes! Yes! Yes! I'm all in for relaxing the programming constraints.

Before I can provide any feedback though, I have a few questions about the points you brought up.

On exposing the current message's retry-count

You mentioned that we could expose the current message's retry-count to the user and let them decide how to handle repeated executions with it.

I'm trying to make sure I understand this the same way you do. So let's say I have a 2-activity orchestrator where I want to exit the program on a network failure. Do you imagine something like the snippet below?

import azure.functions as func
import azure.durable_functions as df

def orchestrator_function(context: df.DurableOrchestrationContext):

     # assume `context.retry_count` tracks the current message's deque count
     if context.retry_count != 0: # I'm assuming the current message is to call Hello-Tokyo
         return -1
    context.call_activity("Hello", "Tokyo")

     # assume `context.retry_count` tracks the current message's deque count
     if context.retry_count != 0:  # I'm assuming the current message is to call Hello-Seattle
         return -1
    context.call_activity("Hello", "Seattle")

This kind of reminds me of using the is_replaying flag, but I'm not sure if that's deliberate.

On the difference between the suspend-and-resume API and the rewind API

The usage of a rewind API does make sense to me as a way to recover from failures. However, the suspend-and-resume API proposal confuses me because I do not understand how an orchestrator that's gotten into a "bad state" would recover via a temporary pause. Could you elaborate on what the expected usage of suspend-and-resume would look like? Do users have a way of redirecting their messages to a new orchestrator or something like that?

Thanks!

ConnorMcMahon commented 3 years ago

The retry count is probably more relevant on activities, to prevent duplicate executions there. Duplicate message detection for orchestrations feel decidedly less useful to me, as by design they are idempotent.

davidmrdavid commented 3 years ago

That makes sense.

I suppose that means we'd need to equip activities with an implicit "context" parameter as well, so they can use it to read their own retry count

ConnorMcMahon commented 3 years ago

At least for C#, the default parameter actually is IDurableActivityContext, so that shouldn't be a problem. I believe JS already has an equivalent class as well.

cgillum commented 3 years ago

@davidmrdavid Connor is correct that the retry count property is mainly intended for activity triggered functions, but would also be very useful for entities.

Suspend/resume probably deserves its own issue with a detailed description, but consider the following scenario:

  1. The orchestration runs and schedules an activity function, but we fail to checkpoint the completion of the activity function due to some failure.
  2. When the message that scheduled the activity function becomes visible again (i.e. after ~5 minutes) the DequeueCount will be set to 2 (this is done automatically by the Azure Storage service).
  3. After we dequeue the message and prepare to execute the activity function, we check the policy for that function to see if it is at-most-once. If it is, we stash the message somewhere safe (blob storage? history table? Details TBD) and put the orchestration in the "Suspended" state.
  4. Any new messages that arrive for a suspended orchestration similarly get stashed away. We continue to do this until something (details on what that "something" is are also TBD) causes the orchestration to be "resumed".
  5. Once resumed, we process all the stashed messages and continue execution as normal.

Some details would still need to be worked out, but it effectively gives us a feature for "pausing" an orchestration and then resuming it if/when the app owner thinks it is safe to do so.

olitomlinson commented 3 years ago

@cgillum

If the queues dequeueCount property was exposed to the Activity, I would so something like this

if (activityContext.DequeueCount >= 1) {
   var alreadyPerformed = // Call 3rd party to see if that operation has already been formed
   if (alreadyPerformed)
      return;
   PerformAction();
}
else {
   PerformAction();
}

To abstract the underlying queue delivery count, you could offer an extension point for the user-code to supply a delegate for resolving if the operation can retry or not.

In this delegate I could then choose to

[FunctionName("Orchestration")]
public async Task RunOrchestratorAsync([OrchestrationTrigger] IDurableOrchestrationContext context)
{
    var guaranteeOptions = new ActivityGauranteeOptions()
    {
        Guarantee = ActivityGuarantee.AtMostOnce,
        AtMostOnceHandler = "ActivityA-AtMostOnceHandler"
    }

    await context.CallActivityAsync("Activity-A", guaranteeOptions, data);
}

[FunctionName("Activity-A")]
public async Task<FooResult> Run([ActivityTrigger] JObject request)
{
    // do the important operation in 3rd party system/app/datastore...

    return new FooResult{ Foo = "bar" };
}

[FunctionName("ActivityA-AtMostOnceHandler")]
public async Task<OrchestrationHint<FooResult>> Run([ActivityAtMostOnceHandlerTrigger] JObject request)
{
    bool alreadyPerformed = // Call 3rd party to see if that operation has already been done, take this result as a source of truth.

    if (alreadyPerformed) {
        return OrchestrationHint.SuspendAndKeepExistingDurableTimers; 
        return OrchestrationHint.SuspendAndInvalidateExistingDurableTimers; 
        // Put the orchestration into an indefinite suspended state, can be resumed with DF HTTP API. Need to be aware of what happens to any DurableTimers when going into suspended mode?!

        return OrchestrationHint.ContinueWithActivityResult(new FooResult{ Foo = "bar" }); 
        // or, provide the neccessary data back to the orchestration to continue, and don't try the activity again
    }

    if (!alreadyPerformed)
        return OrchestrationHint.ContinueOnce; // try the activity again once more (deliveryCount increments)
}
sebastianburckhardt commented 3 years ago

I love the idea of extending the API to give activities and/or orchestrations a way to check if the current work item (i.e. current orchestration step, current activity, or current entity operation) is a potential duplicate. This will go a long way to improve the utility of our workflows, and help us explain the guarantees to our users.

I also like the idea of just exposing "potential duplication" and then let users decide on how to handle it. This allows to get all variations with a single mechanism: at-least-once (user ignores potential duplication), at-most-once (user cancels execution of potential duplicates), and a wide variety of custom solutions (on duplication, user can log warning, can stall execution, or can run some application-specific deduplication check, e.g. if the API of the external service supports testing of prior operations).

I am not sure if simply exposing the dequeue count is enough though to implement this though. Two thoughts on this:

cgillum commented 3 years ago

Thanks for the feedback!

Our new backends don't use Azure Storage Queues, thus there is no equivalent to the dequeue count.

I implemented my own a "dequeue count" semantic in the MS SQL backend by updating a delivery count column value in the same transaction as the database read. I think it's important for backends to maintain this information so that they have the opportunity to do poison message handling. I'm not sure to what extent Netherite could do this efficiently, but it would be nice if this could be a property rather than an async method. Otherwise customers would have to introduce additional I/O into every activity call where they care about duplication, and that could potentially have a significant performance impact.

Duplicates can be caused upstream. For example, if an orchestration step fails after enqueueing an activity message, but before deleting the triggering message, then it may run again and enqueue the same activity message again.

That's a good point. This is not a problem for the MS SQL backend since we have transactions that ensure all-or-nothing updates, but it's absolutely a problem for the Azure Storage backend. I agree with your suggestion to consider propagating dequeue counts from the orchestration message to the eventual activity/timer/sub-orchestration messages. Given the different backends have different transactional guarantees, the underlying "dequeue count" calculation and propagation logic would likely need to be backend-specific.

sebastianburckhardt commented 3 years ago

It would be nice if this could be a property rather than an async method. Otherwise customers would have to introduce additional I/O into every activity call where they care about duplication, and that could potentially have a significant performance impact.

Making it an async method does not imply that there is I/O - it just means there may be I/O if the backend requires it. The point is exactly that we don't want to pay the cost of it if it is not necessary - most code does not care about it. Making it a simple property would force some backend architectures to pay an unreasonable cost on every single work item.

For example, EventHubs has no concept of dequeue count, so in Netherite there is necessarily some I/O needed to support this feature. That's what I mean by saying this feature cannot be "free" on all architectures. Whether or not this I/O happens is backend-specific, and users should not be exposed to the details of how that even works. But since the feature is not always "free", we need to design the API in a way that does not force us to pay the cost on every single work item.

So, to support all current and future backend architectures, I would strongly oppose making this a simple property.

Technically, I would argue that offering a dequeue count is not really "free" on Azure Queues either, it's just a latency you cannot opt out of because Azure Queues internally always already pays that cost on each dequeue - in the sense that it has to durably persist something to storage before handing the client a message. Similarly, SQL has to commit a transaction on every dequeue before processing the work item. In contrast, EventHubs does not need to write to storage or commit a transaction on dequeue, since client messages are identified by a sequence number, so deduplication can happen on the client side.

the underlying "dequeue count" calculation and propagation logic would likely need to be backend-specific.

Yes, the logic would be highly specific to the backend, which makes sense and allows for optimizations in the future. Also, for that reason I would perhaps not call it "dequeue count" since that is already architecture specific (it assumes that we are counting dequeues). I would go for something that more directly relates to the guarantee we are making, such as MayBeDuplicate. Or perhaps its inverse, IsFirstAttempt, which is probably my favorite.

sebastianburckhardt commented 3 years ago

Using an async method has several disadvantages (e.g. it doe not play well outside C#). To preserve the backend-independence of the feature, we could also do one of the following:

espray commented 3 years ago

@cgillum I have the same scenario as the original customer. What can the community do to help move this forward?

sebastianburckhardt commented 3 years ago

@cgillum, FYI, with regards to this question:

I'm not sure to what extent Netherite could do this efficiently, but it would be nice if this could be a property rather than an async method.

I have a prototype implementation that maintains a dequeue count for work items in Netherite. However, for the dequeue count to be accurate, it introduces some delay: I have to wait prior to invoking the user code for the dequeue count to hit storage (otherwise the node could crash before saving the dequeue count to storage, and then the count does not increase next time). From a throughput perspective this is no problem (it does not generate extra I/O ops) but it introduces latency into the processing, so I would prefer if this functionality (i.e. the accurate dequeue counting) is opt-in, as opposed to being a delay that is paid on every work item.

cgillum commented 3 years ago

@espray thanks for chiming in - it's helpful to know that more folks could benefit from this. We discussed this briefly today internally and agreed to continue working on the proposal.

@sebastianburckhardt that's great to hear! To make this opt-in on a per-work-item basis for Netherite, I assume we need to store some metadata in DT.Core's TaskMessage class that signals whether a dequeue count needs to be fetched. Some other changes, like adding new optional parameters to OrchestrationContext.ScheduleTask may also be required in order to populate this metadata.

I like your suggestion of creating an IExecutionAttemptCounter to surface this information without requiring an attribute, however, we still need some way of knowing when to inject the metadata in the TaskMessage before it gets written into the queue. An attribute or even a new property on the existing trigger attribute (like [ActivityTrigger(FetchDequeueCount = true)] could help because we should be able to look that up at the time the user calls CallActivityAsync(activityName) (the extension already knows the names of all the activity functions that exist in the app). Adding a property to the existing binding attribute also makes the feature accessible to non-.NET languages since binding attribute properties can also be set in function.json.

Thoughts?

sebastianburckhardt commented 3 years ago

I think I would prefer to attach the information to the activity, not the task message. Since it doesn't really have to go through storage on each message. Maybe something along the lines:

  1. extend the TaskActivity abstract class with a property

    public virtual bool RequiresPreciseDequeueCount => false;

    In DF, we can override this property to resolve to true iff there is an attribute [ActivityTrigger(PreciseDequeueCount = true)], exactly like you suggested. As a side effect of doing it this way, the functionality can also be used by DT without DF.

  2. extend the class TaskActivityWorkItem with a method

    Task DequeueCountBarrier {get; set;} = Task.CompletedTask;

    which is then implemented by the backend to complete only after there is certainty that the dequeue count is not too low. This may involve waiting for the in-flight storage update to complete, and perhaps also re-checking for lease expiration, to make sure the window for an incorrect dequeue count becomes as small as possible. By the way, important question for semantics: can we ensure that we never start a new activity execution attempt with dequeue count n before all execution attempts with count < n have crashed or completed, i.e. are not still executing somewhere? I am currently assuming no (because how would you reliably know some process has terminated), but it would be an awesome guarantee if we can make it.

  3. inside TaskActivityDispatcher, do something like

    if (taskActivity.RequiresPreciseDequeueCount)
    {
    await workItem.DequeueCountBarrier;
    }
alexsegura1 commented 3 years ago

Can you clarify?

"This means transient network or compute failures could result in functions executing more than once."

Does that mean the activity throws an exception? I thought we could utilize RetryOptions for that, or am I misunderstanding?

I have a scenario where something goes wrong and no exception is thrown, in fact the WebJob running the orchestration crashes. When the machine comes back online it enters into the Activity again and tries again.

Would the proposed solution handle that scenario?

cgillum commented 3 years ago

Hi @alexsegura1, regarding your questions:

"This means transient network or compute failures could result in functions executing more than once."

Does that mean the activity throws an exception? I thought we could utilize RetryOptions for that, or am I misunderstanding?

Sorry, I wasn't being clear about the "transient network" failure part. The cases I'm talking about won't be covered by RetryOptions because they happen outside of the function execution. For example, suppose the activity function completes successfully but a network issue causes us to fail to write the results to Azure Storage. The orchestration is never made aware of this failure. Instead, the activity function just tries again after 5 minutes (the message visibility delay). This is the duplicate execution that we want your activity function code to be made aware of.

I have a scenario where something goes wrong and no exception is thrown, in fact the WebJob running the orchestration crashes. When the machine comes back online it enters into the Activity again and tries again.

Would the proposed solution handle that scenario?

Yes, the proposed solution is designed specifically to handle exactly the type of case you're running into.

alexsegura1 commented 3 years ago

@cgillum thank you for clarifying. I would +1 this feature we would love to at least be able to know that an activity is being entered again. We are semi-okay with this behavior, as long as we can limit it programmatically or with some setting. An at-most-once flag would do the trick as well.

Thank you again.

sebastianburckhardt commented 2 years ago

Trying to remember where we are at here. To keep it as simple as possible I think this could work:

All three backends have the concept of 'dequeue count'. We could expose this number (a positive integer) to activity and entity functions. The guarantee we can make is: no two attempts at executing the same activity or entity operation ever have the same dequeue count. The first attempt has dequeue count 1.

At-most once execution can be guaranteed by testing if the dequeue count is 1, and not doing anything if it is larger than 1.

Note that there is a subtlety here. We cannot 100% prevent multiple execution attempts to be underway concurrently on multiple nodes. However, we can still guarantee that no two of them have the same dequeue count.

davidmrdavid commented 2 years ago

Yep, I think we had mostly agreed this was probably the simplest implementation. I'm curious, @sebastianburckhardt - do you think any of the backends has the ability to actually provide the at-most-once guarantee, or is this simply not possible irrespective of backend?

gorillapower commented 1 year ago

I have several scenarious where this would be useful. Besides making our code idempotent, we are keeping a duplicate counter/cache to prevent duplicates where needed.

EG) I have a durable entity that acts as a metrics counter, aggregating metrics from our app. When signallying the counter from an activity, the activity could potentially send the same signal twice (ie at-least-once behaviour). So to protect the counter, I am maintaining a duplicate list, that makes sure the same metric is not counted more than once, by using deterministic guids as keys. This was a quick fix to get around this issue at first, but at-most-once support would potentially alleviate this.

Are there any more updates on the 'At-most-once' feature?

davidmrdavid commented 1 year ago

Hi @gorillapower,

Thanks for sharing your usecase, that's the kind of information that helps us prioritize/advocate for these work items. At this time, this is not within our priority list, but we've discussed it recently and have some ideas on how to implement this. In general, this is a tricky item because it may require support from the Functions Host/runtime team, which adds complexity to our planning.

At this time, it would be most helpful for folks interested in this feature to communicate that in this thread, either as a comment or through a reaction/emoji on the original post. With a strong signal, we would be able to prioritize this.

olitomlinson commented 1 year ago

I have several scenarious where this would be useful. Besides making our code idempotent, we are keeping a duplicate counter/cache to prevent duplicates where needed.

EG) I have a durable entity that acts as a metrics counter, aggregating metrics from our app. When signallying the counter from an activity, the activity could potentially send the same signal twice (ie at-least-once behaviour). So to protect the counter, I am maintaining a duplicate list, that makes sure the same metric is not counted more than once, by using deterministic guids as keys. This was a quick fix to get around this issue at first, but at-most-once support would potentially alleviate this.

Are there any more updates on the 'At-most-once' feature?

FWIW, We did exactly this, maintained a list of deterministic GUIDs inside the Entity to ensure it wasn't reprocessed on seeing the same message again. It wasn't particularly glamorous, but it was simple and effective.

gorillapower commented 2 weeks ago

Just checking in to see if this feature is something that would be considered in the near future?

Potential improvement on the workaround already suggested.

This article (https://hughfdjackson.com/engineering/counters,-idempotence-and-forgetful-bloom-filters/) suggests storing a list of idempotent keys, but to reduce the size of list in large volume scenarios, they suggest maintaining only a moving window keys "so long as that window is longer than the period the client will keep re-trying the operation in. "

cgillum commented 2 weeks ago

@gorillapower it's still on our radar, but we've unfortunately not been able to prioritize it yet.