sam-goodwin / eventual

Build scalable and durable micro-services with APIs, Messaging and Workflows
https://docs.eventual.ai
MIT License
174 stars 4 forks source link

Design: Async Activities, Timeout, and Heartbeat #60

Closed thantos closed 1 year ago

thantos commented 1 year ago

Problem Statement:

As a developer/workflow author, I want to create activities that run for indefinite amounts of time, involve human interaction, invoke other services, or wait for the result of outside actions. I should be able to ensure inconsistencies and fault are recoverable from. I should be able to use the service to support idempotency of partial failures.

Stories:

Strawman

workflow(() => {
    const result = await act1();
});

act1 = activity<{ result: string }>({
   heartbeat: { seconds: number },
   timeout: { seconds: number }
}, (context: Context): { result: string } | AsyncToken => {
   ...doSomeWork...

   await sendToQueue({ token: context.activity.token });

   return makeAsync();
})

// lambda function consuming the queue
const workflowClient = new WorkflowClient();
export const handler = async (event) => {
    Promise.all(event.Records.map(async (record) => {
        const payload = JSON.parse(record.body);

        // complete the activity with a payload
        await workflowClient.completeActivity<typeof act1>(
            payload.token,
            { result: "done"}
        );

        // or fail
        await workflowClient.failActivity(payload.token, {result: "done"});
    }));
}

with heartbeat

workflow(() => {
    const result = await act1();
});

act1 = activity<{ result: string }>({
   heartbeat: { seconds: 20 },
   timeout: { seconds: 160 }
}, (context: Context): { result: string } | AsyncToken => {
   ...doSomeWork...

   await sendToQueue({ token: context.activity.token });

   await sendHeartbeat();

   return makeAsync();
})

// lambda function consuming the queue
const workflowClient = new WorkflowClient();
export const handler = async (event) => {
    Promise.all(event.Records.map(async (record) => {
        const payload = JSON.parse(record.body);

        while(true) {
           // some long process
           await workflowClient.heartbeatActivity(payload.token);
        }

        // complete the activity with a payload
        await workflowClient.completeActivity<typeof act1>(
            payload.token,
            { result: "done"}
        );
    }));
}

with heartbeat checkpoint - FUTURE

workflow(() => {
    const act = act1();

    act.onHeartbeat(async ({ i: 100 }) => {
        await reportProgress(i);
    });
});

const reportProceess = activity(...);

const act1 = activity<{ result: string }, { i: 100 } | undefined>({
   heartbeat: { seconds: 20 },
   timeout: { seconds: 160 }
}, (context: Context): { result: string } | AsyncToken => {
   ...doSomeWork...

   await sendToQueue({ token: context.activity.token, start: context.checkpoint });

    // should this be on the context to be typed?
   await sendHeartbeat();

   return makeAsync();
})

// lambda function consuming the queue
const workflowClient = new WorkflowClient();
export const handler = async (event) => {
    Promise.all(event.Records.map(async (record) => {
        const payload = JSON.parse(record.body);

        const items = [...];

        const start = event.start ?? 0;

        for(const i of items.slice()) {
           // some long process
           await workflowClient.heartbeatActivity<typeof act1>(
               payload.token,
               { i }
           );
        }

        // complete the activity with a payload
        await workflowClient.completeActivity<typeof act1>(
            payload.token,
            { result: "done"}
        );
    }));
}
thantos commented 1 year ago

Tech Design

  1. Orchestrator
    1. Activity Scheduled
    2. Start Timeout Timer (if configured)
    3. Wait - ActivityCompleted, ActivityFailed, ActivityHeartbeat, ActivityHeartbeatTimedOut, ActivityTimedOut
  2. Activity Worker
    1. Activity Worker Locks Activity
    2. Start Heartbeat Timer (if configured)
    3. Activity Handler Invoked
    4. If the Handler returns a value - push complete event to workflow queue
    5. If the Handler returns an AsyncToken - do nothing
  3. On Activity Heartbeat Call
    1. Send ActivityHeartbeat event to the workflow
  4. On client.completeActivity(...)
    1. Send ActivityCompleted to workflow queue
  5. On client.failActivity(...)
    1. Send ActivityFailed to workflow queue
  6. On client.heartbeatActivity(...)
    1. Send ActivityHeartbeat to workflow queue
  7. Orchestrator Wakes Up With...
    1. ActivityCompleted - if the activity has not previously timedout, completed, or failed - return result else ignore
    2. ActivityFailed - if the activity has not previously timedout, completed, or failed - throw error in the workflow else ignore
    3. ActivityHeartbeat - Create an ExtendHeartbeat command, unless the activity is completed, failed, or timedout.
    4. ActivityHeartbeatTimedOut - If the activity is completed, failed, or timedout, ignore. Throw heartbeat error and fail unless there is a heartbeat event from timestamp - heartbeat timeout.
    5. ActivityTimedOut - Throw timeout error and fail if not competed, failed, or timedout.
  8. On ExtendHeartbeat command
    1. TimerClient.updateTimer() - new API which tries to update a Schedule or creates a new SQS message.
thantos commented 1 year ago

More details on timeouts for all workflows here: https://github.com/functionless/eventual/issues/63

sam-goodwin commented 1 year ago

What is deciding that the activity is async here? The declaration or the implementation?

act1 = activity<{ result: string }>({
   heartbeat: { seconds: number },
   timeout: { seconds: number }
}, (context: Context): { result: string } | AsyncToken => {
   ...doSomeWork...

   await sendToQueue({ token: context.activity.token });

   return makeAsync();
})
sam-goodwin commented 1 year ago

What's the use case for heartbeat?

cfraz89 commented 1 year ago

Just spitballing, would a builder pattern make it more ergonomic?

act1 = ActivityBuilder({heartbeat: {seconds: 20}, timeout: {seconds: 20}})
  .activity(context: Context): { result: string } | AsyncToken => {
   ...doSomeWork...

   await sendToQueue({ token: context.activity.token });

   return makeAsync();
})
sam-goodwin commented 1 year ago

I really don't like builder patterns for constructing a function. Bottom layer should be pure and a builder can always be put on top.

Another consideration is how we use the activity/workflow functions for heuristics in the transformer

thantos commented 1 year ago

What is deciding that the activity is async here? The declaration or the implementation?

act1 = activity<{ result: string }>({
   heartbeat: { seconds: number },
   timeout: { seconds: number }
}, (context: Context): { result: string } | AsyncToken => {
   ...doSomeWork...

   await sendToQueue({ token: context.activity.token });

   return makeAsync();
})

The activity decides that it needs to be async and a single activity can support both patterns (return sync when possible and go async when necessary.

The workflow decides how long it is willing to wait for the activity to complete.

Controls the Workflow Has:

  1. Heartbeat - report back every X or fail
  2. Timeout - finish within X or fail

Controls the Activity Has:

  1. Return Sync or Async
  2. Succeed or Fail
  3. Use Heartbeat to store checkpoints
  4. Use heartbeat to determine if the workflow is still alive

An abstraction would be to support activities that are explicitly async from the workflow like Step Functions does, but it would be basically the same under the hood.

workflow(() => {
   await asyncEventActivity((token) => {}); // create an event which contains a token and waits on the response
});
// or maybe a special activity type?
const myActivity = eventActivity<string>((token, input) => ({ type: "myEvent", token, input }));

And then the other way to do it would be like SFN's Activities which provide a queue to poll on from anywhere.

Which again could just be a special activity type that is called by the workflow like any other activity.

const myActivity = queueActivity<string>(myQueue); // a queue that contains activity requests to resolve using the token they contain.
thantos commented 1 year ago

What's the use case for heartbeat?

Heartbeat is important in durable systems. Let say you have a long running activity that may take up to a week, so you set it's timeout to 2 weeks just in case. That means if something goes wrong and the message is lost, the workflow won't wake up for 2 weeks just to find it failed. Now you could set a hourly or daily heartbeat which allows the activity's system to report back to the workflow to say it is still alive.

Yehuda expressed how important that this is in his systems when long running processes are involved.

From Temporal's Docs:

An Activity Heartbeat is a ping from the Worker that is executing the Activity to the Temporal Cluster. Each ping informs the Temporal Cluster that the Activity Execution is making progress and the Worker has not crashed. For long-running Activities, we recommend using a relatively short Heartbeat Timeout and a frequent Heartbeat. That way if a Worker fails it can be handled in a timely manner.

Step Functions:

It's a good practice to set a timeout value and a heartbeat interval for long-running activities. This can be done by specifying the timeout and heartbeat values, or by setting them dynamically.

thantos commented 1 year ago

Use Cases:

  1. Health Ping from Activity to Workflow
  2. Activity checking if the workflow is still alive - Future use case
  3. Checkpointing (activity can save partial data in the heatbeat) - Future use case
sam-goodwin commented 1 year ago

I could have been clearer, I do know why they are important. Just not sure why it's important right now.

thantos commented 1 year ago

Yehuda will ask about them and I think we can get the basic impl done quickly.

sam-goodwin commented 1 year ago

Yehuda will ask about them and I think we can get the basic impl done quickly.

So low effort high roi? Sounds good. Let's try and think of some examples when we implement it and add them to the test app?

I may be being pedantic, just trying to learn the lesson of functionless and focus on examples and features, not just features.

thantos commented 1 year ago

Totally agree. From chats with people, timeout and heartbeat are important parts of long running workflows that would make the service look more legit/complete. Because timers are implemented from sleep, it is easy to create timeouts now and the only new part about heartbeat is adding the client operation.

Will start to work on this and if it proves to be high effort, will push off.

thantos commented 1 year ago

Was looking at how to avoid the context argument.

Option 1: context method

activity((...args) => {
    const { asyncToken } = getActivityContext(); // fails when called from outside of an activity.
    async sqs.send(new SendMessageCommand(... token ... ));
    return makeAsync();
});

Option 2: token is provided by the makeAsync function via a callback.

activity((...args) => {
    return makeAsync(async (token) => {
       async sqs.send(new SendMessageCommand(... token ... ));
    });
});

Option 3: context parameter

activity((...args, context) => {
    async sqs.send(new SendMessageCommand(... context.asyncToken ... ));

    return makeAsync();
})
sam-goodwin commented 1 year ago

What's wrong with the context parameter? I think we wanted to update activities to only allow a single input argument just like a workflow so that it aligns with a lambda contract. There was another reason I think too, but can't remember.

Context argument is preferable because it's discoverable.

sam-goodwin commented 1 year ago

While writing the documentation, I found myself confused about why heartbeat is a global. have we closed on our decision to change activities to be single argument only and then add a context parameter? We could then provide the heartbeat function on the context parameter instead.

thantos commented 1 year ago

How do you decide what is a context method and what is an intrinsic? Is the difference that heartbeat is specific to activities (and systems acting on behalf of an async activity)?

Would we apply the same logic to workflow only things, sleep, Promise.*, signal, etc?

Heartbeat for an activity can be done by anything with access to the token. I see it as the same as completeActivity and failActivity, an operation performed by an activity or by something acting as an activity. For example, when an activity is async, a workflow, event handler, or some random lambda using the client will need to call heartbeat.

Options:

  1. No intrinsic - Move all intrinsics (sleep, heartbeat, etc) to their respective objects and/or context variables
  2. Move only heartbeat for activities to context (add activity.heartbeat, keep workflowclient.heartbeatActivity)
  3. Rename to heartbeatActivity, add intrinsic for completeActivity and failActivity.
sam-goodwin commented 1 year ago

Was building something today and found myself really wanting a context variable in an activity so i can get the execution ID without having to explicitly pass it through from the workflow.