microsoft / durabletask-java

Java SDK for Durable Functions and the Durable Task Framework
MIT License
13 stars 7 forks source link

Forceful Termination of Running Orchestrations #111

Open kanupriya15025 opened 1 year ago

kanupriya15025 commented 1 year ago

Today, terminate API is an async API. So if there's an instance running and we call /terminate, the framework return with "Accepted". However, in the background if the activity function is taking time, the terminate may take time depending on the activities (Please confirm my understanding here).

I need a way to terminate the orchestration instance forcefully that would stop all the running activity functions forcefully, or have a function "stop" in activity definitions that guide the behaviour of activities in case a forceful terminate is called.

Essentially, an API might look like this /terminate?force=true

cgillum commented 1 year ago

This is something we're looking into and interested in supporting. However, the ability to force an activity to terminate will require some changes to the Azure Functions host which haven't been prioritized yet. This (admittedly old) item is tracking: https://github.com/Azure/azure-functions-durable-extension/issues/506

kaibocai commented 1 year ago

Hi @kanupriya15025, to terminate your activity function when the orchestration status is TERMINATED, you can spin up a new thread polling the status of the orchestration and pass the info to your activity functions. Inside your activity functions, write corresponding logic to stop once get the info that orchestration is terminated.

For example: (this is one way to achieve this, please feel free to use any other way to achieve the same idea.)

package com.functions;

import com.microsoft.azure.functions.annotation.*;
import com.microsoft.azure.functions.*;

import java.time.Duration;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;

import com.microsoft.durabletask.*;
import com.microsoft.durabletask.azurefunctions.DurableActivityTrigger;
import com.microsoft.durabletask.azurefunctions.DurableClientContext;
import com.microsoft.durabletask.azurefunctions.DurableClientInput;
import com.microsoft.durabletask.azurefunctions.DurableOrchestrationTrigger;

/**
 * Azure Durable Functions with HTTP trigger.
 */
public class TerminateActivityWorkaround {

    private static final ExecutorService executor = Executors.newFixedThreadPool(1);
    private static final AtomicBoolean completed = new AtomicBoolean(false);
    private static final AtomicBoolean executed = new AtomicBoolean(false);
    private static final Object orchestrationLock = new Object();

    /**
     * This HTTP-triggered function starts the orchestration.
     */
    @FunctionName("StartOrchestration")
    public HttpResponseMessage startOrchestration(
            @HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
            @DurableClientInput(name = "durableContext") DurableClientContext durableContext,
            final ExecutionContext context) {
        context.getLogger().info("Java HTTP trigger processed a request.");

        DurableTaskClient client = durableContext.getClient();
        String instanceId = client.scheduleNewOrchestrationInstance("Cities");
        context.getLogger().info("Created new Java orchestration with instance ID = " + instanceId);

        pollingOrchestrationStatus(instanceId, client, context);

        return durableContext.createCheckStatusResponse(request, instanceId);
    }

    private void pollingOrchestrationStatus(String instanceId, DurableTaskClient client, ExecutionContext context) {
        //Ensure the polling logics is only executed once, to avoid spinning up too many threads.
        //Although this is not a problem for this sample, we use AtomicBoolean to guarantee both atomicity and visibility.
        if (!executed.get()) {
            synchronized (orchestrationLock) {
                if (!executed.get()) {
                    executor.submit(() -> {
                        while (true) {
                            try {
                                context.getLogger().info("Polling status for orchestration with instance ID = " + instanceId + "...");
                                OrchestrationMetadata orchestrationMetadata = client.waitForInstanceCompletion(instanceId, Duration.ofMillis(100), false);
                                if (orchestrationMetadata != null && orchestrationMetadata.isCompleted()) {
                                    context.getLogger().info("Orchestration with instance ID = " + instanceId + " has completed/terminated.");
                                    completed.set(true);
                                    return;
                                }
                            } catch (TimeoutException e) {
                                context.getLogger().info("Polling timed out. Orchestration not completed yet.  Retrying...");
                            }
                        }
                    });
                    executed.set(true);
                }
            }
        }
    }

    /**
     * This is the orchestrator function, which can schedule activity functions, create durable timers,
     * or wait for external events in a way that's completely fault-tolerant.
     */
    @FunctionName("Cities")
    public String citiesOrchestrator(
            @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
        String result = "";
        result += ctx.callActivity("Capitalize", "Austin", String.class).await();
        return result;
    }

    /**
     * This is the activity function that gets invoked by the orchestration.
     */
    @FunctionName("Capitalize")
    public String capitalize(
            @DurableActivityTrigger(name = "name") String name,
            final ExecutionContext context) {

        //activity logics part 1
        stopIfOrchestrationCompleted(context);

        //activity logics part 2
        stopIfOrchestrationCompleted(context);

        //activity logics part 3
        stopIfOrchestrationCompleted(context);

        return name.toUpperCase();
    }

    private void stopIfOrchestrationCompleted(ExecutionContext context) {
        if (completed.get()) {
            //clean up work, eg. close resources. 
            context.getLogger().info("Orchestration completed, stop the activity");
            throw new RuntimeException("Orchestration completed, stop the activity");
        }
    }
}
jviau commented 6 months ago

or have a function "stop" in activity definitions that guide the behaviour of activities in case a forceful terminate is called.

This requirement is an important distinction. It is what separates forceful termination from cooperation cancellation. These two are quite different designs and implementations on our end. The former only provides a means to terminate active work, but no ability to perform reliable cleanup work. The latter will give the implementer flexibility to acknowledge cancellation on their own terms and to reliably perform cleanup actions.