Workflow is an event driven workflow that allows for robust, durable, and scalable sequential business logic to be executed in a deterministic manner.
To start using workflow you will need to add the workflow module to your project. You can do this by running:
go get github.com/luno/workflow
Some adapters dont come with the core workflow module such as kafkastreamer
, reflexstreamer
, sqlstore
, and sqltimeout
. If you
wish to use these you need to add them individually based on your needs or build out your own adapter.
go get github.com/luno/workflow/adapters/kafkastreamer
go get github.com/luno/workflow/adapters/reflexstreamer
go get github.com/luno/workflow/adapters/sqlstore
go get github.com/luno/workflow/adapters/sqltimeout
package usage
import (
"context"
"github.com/luno/workflow"
)
type Step int
func (s Step) String() string {
switch s {
case StepOne:
return "One"
case StepTwo:
return "Two"
case StepThree:
return "Three"
default:
return "Unknown"
}
}
const (
StepUnknown Step = 0
StepOne Step = 1
StepTwo Step = 2
StepThree Step = 3
)
type MyType struct {
Field string
}
func Workflow() *workflow.Workflow[MyType, Step] {
b := workflow.NewBuilder[MyType, Step]("my workflow name")
b.AddStep(StepOne, func(ctx context.Context, r *workflow.Run[MyType, Step]) (Step, error) {
r.Object.Field = "Hello,"
return StepTwo, nil
}, StepTwo)
b.AddStep(StepTwo, func(ctx context.Context, r *workflow.Run[MyType, Step]) (Step, error) {
r.Object.Field += " world!"
return StepThree, nil
}, StepThree)
return b.Build(...)
}
---
title: The above defined workflow creates the below Directed Acyclic Graph
---
stateDiagram-v2
direction LR
[*]-->One
One-->Two
Two-->Three
Three-->[*]
wf := usage.Workflow()
ctx := context.Background()
wf.Run(ctx)
Stop: To stop all processes and wait for them to shut down correctly call
wf.Stop()
foreignID := "82347982374982374"
runID, err := wf.Trigger(ctx, foreignID, StepOne)
if err != nil {
...
}
Awaiting results: If appropriate and desired you can wait for the workflow to complete. Using context timeout (cancellation) is advised.
foreignID := "82347982374982374"
runID, err := wf.Trigger(ctx, foreignID, StepOne)
if err != nil {
...
}
ctx, cancel := context.WithTimeout(ctx, 10 * time.Second)
defer cancel()
record, err := wf.Await(ctx, foreignID, runID, StepThree)
if err != nil {
...
}
Head on over to ./_examples to get familiar with callbacks, timeouts, testing, connectors and more about the syntax in depth 😊
RunState is the state of a Run and can only exist in one state at any given time. RunState is a finite state machine and allows for control over the Run. A Run is every instance of a triggered workflow.
---
title: Diagram the run states of a workflow
---
stateDiagram-v2
direction LR
Initiated-->Running
Running-->Completed
Running-->Paused
Paused-->Running
Running --> Cancelled
Paused --> Cancelled
state Finished {
Completed --> RequestedDataDeleted
Cancelled --> RequestedDataDeleted
DataDeleted-->RequestedDataDeleted
RequestedDataDeleted-->DataDeleted
}
Hooks allow for you to write some functionality for Runs that enter a specific RunState. For example when
using PauseAfterErrCount
the usage of the OnPause hook can be used to send a notification to a team to notify
them that a specific Run has errored to the threshold and now has been paused and should be investigated. Another
example is handling a known sentinel error in a Workflow Run and cancelling the Run by calling (where r is *Run)
r.Cancel(ctx) or if a Workflow Run is manually cancelled from a UI then a notifgication can be sent to the team for visibility.
Hooks run in an event consumer. This means that it will retry until a nil error has been returned and is durable across deploys and interruptions. At-least-once delivery is guaranteed, and it is advised to use the RunID as an idempotency key to ensure that the operation is idempotent.
Hook | Parameter(s) | Return(s) | Description | Is Event Driven? |
---|---|---|---|---|
OnPause | workflow.RunStateChangeHookFunc | error | Fired when a Run enters RunStatePaused | Yes |
OnCancelled | workflow.RunStateChangeHookFunc | error | Fired when a Run enters RunStateCancelled | Yes |
OnDataDeleted | workflow.RunStateChangeHookFunc | error | Fired when a Run enters RunStateDeleted | Yes |
OnCompleted | workflow.RunStateChangeHookFunc | error | Fired when a Run enters RunStateCompleted | Yes |
This package provides several options to configure the behavior of the workflow process. You can use these options to customize the instance count, polling frequency, error handling, lag settings, and more. Each option is defined as a function that takes a pointer to an options
struct and modifies it accordingly. Below is a description of each available option:
ParallelCount
func ParallelCount(instances int) Option
instances
: The total number of parallel instances to create.b.AddStep(
StepOne,
...,
StepTwo,
).WithOptions(
workflow.ParallelCount(5)
)
PollingFrequency
func PollingFrequency(d time.Duration) Option
d
: The polling frequency as a time.Duration
.b.AddStep(
StepOne,
...,
StepTwo,
).WithOptions(
workflow.PollingFrequency(10 * time.Second)
)
ErrBackOff
func ErrBackOff(d time.Duration) Option
d
: The backoff duration as a time.Duration
.b.AddStep(
StepOne,
...,
StepTwo,
).WithOptions(
workflow.ErrBackOff(5 * time.Minute)
)
LagAlert
func LagAlert(d time.Duration) Option
d
: The duration of the lag alert as a time.Duration
.b.AddStep(
StepOne,
...,
StepTwo,
).WithOptions(
workflow.LagAlert(15 * time.Minute),
)
ConsumeLag
func ConsumeLag(d time.Duration) Option
d
: The lag duration as a time.Duration
.b.AddStep(
StepOne,
...,
StepTwo,
).WithOptions(
workflow.ConsumeLag(10 * time.Minute),
)
PauseAfterErrCount
func PauseAfterErrCount(count int) Option
RunStatePaused
. This mechanism acts similarly to a Dead Letter Queue, preventing further processing of problematic records and allowing for investigation and retry.count
: The maximum number of errors before pausing.b.AddStep(
StepOne,
...,
StepTwo,
).WithOptions(
workflow.PauseAfterErrCount(3),
)
Term | Description |
---|---|
Builder | A struct type that facilitates the construction of workflows. It provides methods for adding steps, callbacks, timeouts, and connecting workflows. |
Callback | A method in the workflow API that can be used to trigger a callback function for a specified status. It passes data from a reader to the specified callback function. |
Consumer | A component that consumes events from an event stream. In this context, it refers to the background consumer goroutines launched by the workflow. |
EventStreamer | An interface representing a stream for workflow events. It includes methods for producing and consuming events. |
Graph | A representation of the workflow's structure, showing the relationships between different statuses and transitions. |
Hooks | An event driven process that take place on a Workflow's Run's lifecycle defined in a finite number of states called RunState. |
Producer | A component that produces events to an event stream. It is responsible for sending events to the stream. |
Record | Is the "wire format" and representation of a Run that can be stored and retrieved. The RecordStore is used for storing and retrieving records. |
RecordStore | An interface representing a store for Record(s). It defines the methods needed for storing and retrieving records. The RecordStore's underlying technology must support transactions in order to prevent dual-writes. |
RoleScheduler | An interface representing a scheduler for roles in the workflow. It is responsible for coordinating the execution of different roles. |
Run | A Run is the representation of the instance that is created and processed by the Workflow. Each time Trigger is called a new "Run" is created. |
RunState | RunState defines the finite number of states that a Run can be in. This is used to control and monitor the lifecycle of Runs. |
Topic | A method that generates a topic for producing events in the event streamer based on the workflow name and status. |
Trigger | A method in the workflow API that initiates a workflow for a specified foreignID and starting status. It returns a Run ID and allows for additional configuration options. |