uber / cadence

Cadence is a distributed, scalable, durable, and highly available orchestration engine to execute asynchronous long-running business logic in a scalable and resilient way.
https://cadenceworkflow.io
MIT License
8.26k stars 796 forks source link

Support for workflow rollbacks #1435

Closed nklijia2011 closed 7 months ago

nklijia2011 commented 5 years ago

@mfateev as we discuss in the meeting In case of unretryable error for activity execution, or canceling a running workflow; we would like to be able rollback the already executed activities for cleaning up the system state.

This requires recording the executed activities, and doing the rollback of each activity in the reverse order. I should also be able to specify the rollback activities parameters, or at least getting what passed in for the canonical activity.

As a customer, I could define the corresponding revert activities corresponding to my canonical activities. By default, that activity will be no-op is no revert action is needed. Also I would like to trigger the rollback in my workflow explicitly, when I catch a Cancellation exception, or some non-retriable error.

dmetzgar commented 5 years ago

It helps to think of Cadence as a dispatcher of tasks and timers. Error handling should involve compensation (not transactional 2PC). It doesn't know how to associate an activity with that activity's rollback. But it's straightforward enough to code in your workflow. For instance, let's assume my workflow looks like this:

public class AllOrNothingWorkflowImpl implements AllOrNothingWorkflow {
  @Override
  public void processFile(URL source, URL destination) {
    ActivityOptions activityOptions = new ActivityOptions.Builder().build();
    MyActivities myActivities = Workflow.newActivityStub(MyActivities.class, activityOptions);

    myActivities.activity1();
    myActivities.activity2();
  }
}

Each activity has a rollback activity and I just need to execute those on exceptions.

public class AllOrNothingWorkflowImpl implements AllOrNothingWorkflow {
  @Override
  public void processFile(URL source, URL destination) {
    ActivityOptions activityOptions = new ActivityOptions.Builder().build();
    MyActivities myActivities = Workflow.newActivityStub(MyActivities.class, activityOptions);

    try {
      myActivities.activity1();
      myActivities.activity2();
    } catch (Activity1Exception e) {
      myActivities.activity1Rollback();
    } catch (Activity2Exception e) {
      myActivities.activity2Rollback();
      myActivities.activity1Rollback();
    }
  }
}

There are many different ways of writing this. Does this answer your question?

mfateev commented 5 years ago

Let's keep it open. I think it should be possible to come up with a more generic model. The example @dmetzgar gave is too simplistic and is not going to work for complex workflows that can take a lot of code paths in the try block.

The proposal is to implement some sort of interceptor that accumulates all the calls to the activity and then executes rollback operations in reverse order on abort.

VladimirMilenko commented 5 years ago

@mfateev What i'm doing just for investigation of Cadence is following(note, i'm really, not a Java developer and wrote this as a plain execution tracking idea):

import com.uber.cadence.activity.ActivityMethod;
import com.uber.cadence.workflow.Workflow;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Stack;

class CallResultEntry {
    private String methodName;
    private Object result;

    public CallResultEntry(String methodName, Object result) {
        this.methodName = methodName;
        this.result = result;
    }

    public String getMethodName() {
        return methodName;
    }

    public Object getResult() {
        return result;
    }
}

class CallProxy implements java.lang.reflect.InvocationHandler {

    private Stack<CallResultEntry> executionStack;
    private Object obj;

    public static Object newInstance(Object obj, Stack<CallResultEntry> executionStack) {
        return java.lang.reflect.Proxy.newProxyInstance(
                obj.getClass().getClassLoader(),
                obj.getClass().getInterfaces(),
                new CallProxy(obj, executionStack));
    }

    private CallProxy(Object obj, Stack<CallResultEntry> executionStack) {
        this.obj = obj;
        this.executionStack = executionStack;
    }

    public Object invoke(Object proxy, Method m, Object[] args)
            throws Throwable
    {
        Object result;
        try {
            result = m.invoke(obj, args);
            if(m.isAnnotationPresent(ActivityMethod.class)) {
                executionStack.push(new CallResultEntry(m.getName(), result));
            }
        } catch (InvocationTargetException e) {
            throw e.getTargetException();
        } catch (Exception e) {
            throw new RuntimeException("unexpected invocation exception: " +
                    e.getMessage());
        }

        return result;
    }
}

public class AdGroupCreateWorkflowImpl implements AdGroupCreateWorkflow {

    @Override
    public String createAdGroup(String name, String campaignId) throws Exception {
        final Stack<CallResultEntry> executionStack = new Stack<>();
        AdGroupActivities adGroupActivities = (AdGroupActivities) CallProxy.newInstance(Workflow.newActivityStub(AdGroupActivities.class), executionStack);
        try {
            adGroupActivities.createAdGroup(name);
            adGroupActivities.attachAdGroupToCampaign(name, campaignId);
        } catch (Exception e) {

            while (!executionStack.empty()) {
                CallResultEntry callResultEntry = executionStack.pop();

                Method method = adGroupActivities.getClass().getMethod(callResultEntry.getMethodName() + "Revert", callResultEntry.getResult().getClass());
                method.invoke(adGroupActivities, callResultEntry.getResult());
            }

            throw new Exception("Repeat please");
        }

        return this.adGroupId;
    }
}

I'm just curious, how deep Cadence is able to clone workflows? Will execution stack be copied and then reconstructed in case of worker failure?

In the code, i'm assuming, that my activies always have methodA and methodARevert methods available and they are always returning some value, just a theoretical assumption

mfateev commented 5 years ago

Yes, the execution stack is reconstructed and the code you wrote at the first glance looks reasonable. See the webcast that explains how the state is reconstructed. Minutes 7 to 20 contain the explanation.