cschleiden / go-workflows

Embedded durable workflows for Golang similar to DTFx/Cadence/Temporal
https://cschleiden.github.io/go-workflows/
MIT License
233 stars 49 forks source link

workflow panic (workflow completed, but there are still pending futures) #226

Closed Robinmxc closed 1 year ago

Robinmxc commented 1 year ago

Here is my code func workflowRollout(ctx workflow.Context, cluster ClusterAZV2, opt StrategyOptions, pipelineID, rolloutID string) (string, error) {

// business code

cancelFlag := false
for index := range opt.Steps {
    a := workflow.ExecuteActivity[bool](ctx, activityOptions, activityExecuteStep, cluster, pipelineID, rolloutID, index, false)
    workflow.Select(
        ctx,
        workflow.Await(a, func(ctx workflow.Context, f workflow.Future[bool]) {
            _, err = f.Get(ctx)
            if err != nil {
                log.Logger().WithFields(log.Fields{"cluster": cluster, "rolloutID": rolloutID,
                    "stepSliceIndex": index}).WithError(err).Info("activityExecuteStep failed")
                cancelFlag = true
                return
            }
            log.Logger().WithFields(log.Fields{"cluster": cluster, "rolloutID": rolloutID,
                "stepSliceIndex": index}).Info("activityExecuteStep success")
        }),
        workflow.Receive(workflow.NewSignalChannel[string](ctx, signalCancel), func(ctx workflow.Context, v string, ok bool) {
            cancelFlag = true
        }),
    )

    // business code

    if !cancelFlag && pipeline.Strategy.StageInterval > 0 && index < len(opt.Steps)-1 {
        duration := time.Duration(pipeline.Strategy.StageInterval)
        t := workflow.ScheduleTimer(ctx, duration*time.Second)
        workflow.Select(
            ctx,
            workflow.Await(t, func(ctx workflow.Context, f workflow.Future[struct{}]) {
                // business code
            }),
            workflow.Receive(workflow.NewSignalChannel[string](ctx, signalCancel), func(ctx workflow.Context, v string, ok bool) {
                cancelFlag = true
            }),
        )
    }

    if cancelFlag {
        // business code

        return "", nil
    }
}

// business code

return "", nil

}

while execute in workflow.ScheduleTimer and in the same time send signalCancel, the progress panic

2023/08/18 22:47:11 |DEBUG| Executing event workflows.instance.id=67df27c81f3a579dbdbc6e3f0794b371 workflows.event.id=2bac0b04-211b-4525-ae2c-d0d4953797dd workflows.seq_id=0 workflows.event.type=SignalReceived workflows.event.schedule_event_id=0 workflows.is_replaying=false workflows.signal.name=SIGNAL_CANCEL 2023/08/18 22:47:11 |PANIC| workflow completed, but there are still pending futures panic: |PANIC| workflow completed, but there are still pending futures goroutine 2383672 [running]: log.Panicln({0xc0001ff980?, 0x1cf2eec?, 0x5?}) /usr/local/go/src/log/log.go:399 +0x65 github.com/cschleiden/go-workflows/internal/logger.(defaultLogger).Panic(0xc0033ea160?, {0x1d5b609?, 0x10?}, {0x0?, 0x10?, 0xc001e20000?}) /root/go/pkg/mod/git.garena.com/shopee/devops/toc-executor-workflow@v0.16.12/internal/logger/logger.go:34 +0x45 github.com/cschleiden/go-workflows/internal/workflow.(executor).executeNewEvents(0xc0033ea160, {0xc00257ea50, 0x2, 0x2}) /root/go/pkg/mod/git.garena.com/shopee/devops/toc-executor-workflow@v0.16.12/internal/workflow/executor.go:262 +0x118 github.com/cschleiden/go-workflows/internal/workflow.(executor).ExecuteTask(0xc0033ea160, {0x2086208, 0xc000044118}, 0xc00252b560) /root/go/pkg/mod/git.garena.com/shopee/devops/toc-executor-workflow@v0.16.12/internal/workflow/executor.go:176 +0x7fa github.com/cschleiden/go-workflows/internal/worker.(WorkflowWorker).handleTask(0xc0001bdd50, {0x2086208, 0xc000044118}, 0xc00252b560) /root/go/pkg/mod/git.garena.com/shopee/devops/toc-executor-workflow@v0.16.12/internal/worker/workflow.go:200 +0x17d github.com/cschleiden/go-workflows/internal/worker.(WorkflowWorker).handle(0xc0001bdd50, {0x2086208, 0xc000044118}, 0xc00252b560) /root/go/pkg/mod/git.garena.com/shopee/devops/toc-executor-workflow@v0.16.12/internal/worker/workflow.go:157 +0x34b github.com/cschleiden/go-workflows/internal/worker.(WorkflowWorker).runDispatcher.func1() /root/go/pkg/mod/git.garena.com/shopee/devops/toc-executor-workflow@v0.16.12/internal/worker/workflow.go:126 +0x7a created by github.com/cschleiden/go-workflows/internal/worker.(*WorkflowWorker).runDispatcher /root/go/pkg/mod/git.garena.com/shopee/devops/toc-executor-workflow@v0.16.12/internal/worker/workflow.go:121 +0x92

image

cschleiden commented 1 year ago

I think you need to cancel the timer, they aren't automatically cleaned up (on purpose).