cschleiden / go-workflows

Embedded durable workflows for Golang similar to DTFx/Cadence/Temporal
https://cschleiden.github.io/go-workflows/
MIT License
233 stars 49 forks source link

Cancelling a workflow instance #160

Closed cschleiden closed 1 year ago

cschleiden commented 1 year ago

When a workflow with a finished timer is canceled, we run into an invalid state transition for the timer command.

Should probably remove the callback once the command is done, or check if the state of the command is cancelable before attempting to do so.

daadajuan commented 1 year ago

This is a similar problem. I have a MainWorkflow with 2 SubWorkflow like this:

If I cancel the MainWorkflow by CancelWorkflowInstance before SubWorkflow2 has finished(in this time SubWorkflow1 has finished), there will be a panic raised.

"invalid state transition for command ScheduleSubWorkflow: Done -> Canceled"

package main

import (
    "context"
    "github.com/cschleiden/go-workflows/workflow"
    "github.com/google/uuid"
    "sync"
    "time"
)

func MainWorkflow(ctx workflow.Context, input string) (string, error) {
    println("MainWorkflow start")

    fs := make([]workflow.Future[string], 0, 2)
    fs = append(fs, workflow.CreateSubWorkflowInstance[string](ctx, workflow.SubWorkflowOptions{InstanceID: uuid.NewString()}, SubWorkflow1, "SubWorkflow1"))
    fs = append(fs, workflow.CreateSubWorkflowInstance[string](ctx, workflow.SubWorkflowOptions{InstanceID: uuid.NewString()}, SubWorkflow2, "SubWorkflow2"))

    var wg sync.WaitGroup

    for i := range fs {
        wg.Add(1)
        go func(i int) {
            defer wg.Done()
            f := fs[i]
            workflow.Select(ctx, workflow.Await(f, func(c workflow.Context, f workflow.Future[string]) {
                res, err := f.Get(ctx)
                println(res)
                println(err)
            }))
        }(i)
    }

    wg.Wait()

    return "succeed MainWorkflow", nil
}

func SubWorkflow1(ctx workflow.Context, input string) (string, error) {
    return workflow.ExecuteActivity[string](ctx, workflow.DefaultActivityOptions, Activity1).Get(ctx)
}

func SubWorkflow2(ctx workflow.Context, input string) (string, error) {
    return workflow.ExecuteActivity[string](ctx, workflow.DefaultActivityOptions, Activity2).Get(ctx)
}

func Activity1(ctx context.Context) (string, error) {
    return "Activity1 ok", nil
}

func Activity2(ctx context.Context) (string, error) {
    time.Sleep(30 * time.Second)
    return "Activity2 ok", nil
}