uber / cadence

Cadence is a distributed, scalable, durable, and highly available orchestration engine to execute asynchronous long-running business logic in a scalable and resilient way.
https://cadenceworkflow.io
MIT License
8.14k stars 786 forks source link

Getting corrupted workflow when sending workflow with high concurrency #3884

Closed hulucc closed 3 years ago

hulucc commented 3 years ago

The list view saying its open but actually completed.

image image

It can be easily reproduced by send 2000 workflow to cadence server concurrencyly

package main

import (
    "context"
    "fmt"
    "os"
    "time"

    "github.com/google/uuid"
    "go.uber.org/cadence/activity"
    "go.uber.org/cadence/worker"
    "go.uber.org/yarpc/transport/tchannel"

    // "go.uber.org/yarpc/api/transport"
    "go.uber.org/cadence"
    "go.uber.org/cadence/.gen/go/cadence/workflowserviceclient"
    "go.uber.org/cadence/client"
    "go.uber.org/cadence/workflow"
    "go.uber.org/yarpc"
)

var cc client.Client

func init() {
    workflow.Register(SimpleWorkflow)
    workflow.Register(CronWorkflow)
    activity.Register(SimpleActivity)
}

func SimpleWorkflow(ctx workflow.Context) error {
    ao := workflow.ActivityOptions{
        ScheduleToStartTimeout: time.Minute,
        ScheduleToCloseTimeout: time.Minute,
        StartToCloseTimeout:    time.Minute,
        WaitForCancellation:    false,
        RetryPolicy: &cadence.RetryPolicy{
            InitialInterval:    time.Second,
            BackoffCoefficient: 2,
            MaximumInterval:    time.Minute,
            MaximumAttempts:    3,
            // ExpirationInterval: time.Hour,
            NonRetriableErrorReasons: []string{
                "cadenceInternal:Panic",
                "cadenceInternal:Generic",
            },
        },
    }
    ctx = workflow.WithActivityOptions(ctx, ao)
    err := workflow.ExecuteActivity(ctx, SimpleActivity).Get(ctx, nil)
    return err
}

func CronWorkflow(ctx workflow.Context) error {
    ao := workflow.ActivityOptions{
        ScheduleToStartTimeout: time.Minute,
        ScheduleToCloseTimeout: time.Minute,
        StartToCloseTimeout:    time.Minute,
        WaitForCancellation:    false,
        RetryPolicy: &cadence.RetryPolicy{
            InitialInterval:    time.Second,
            BackoffCoefficient: 2,
            MaximumInterval:    time.Minute,
            MaximumAttempts:    3,
            // ExpirationInterval: time.Hour,
            NonRetriableErrorReasons: []string{
                "cadenceInternal:Panic",
                "cadenceInternal:Generic",
            },
        },
    }
    ctx = workflow.WithActivityOptions(ctx, ao)
    workflow.ExecuteActivity(ctx, SimpleActivity).Get(ctx, nil)
    workflow.Sleep(ctx, time.Second*10)
    return workflow.NewContinueAsNewError(ctx, CronWorkflow)

}

func SimpleActivity(ctx context.Context) error {
    time.Sleep(time.Second * 1)
    println("done")
    return nil
}

func main() {
    hostPort := "cadence-frontend.cadence:7933"
    clientName := "simple-worker"
    domain := "play"
    taskList := "simple-worker"
    cadenceService := "cadence-frontend"
    ch, err := tchannel.NewChannelTransport(tchannel.ServiceName(clientName))
    if err != nil {
        panic(err)
    }
    dispatcher := yarpc.NewDispatcher(yarpc.Config{
        Name: clientName,
        Outbounds: yarpc.Outbounds{
            cadenceService: {Unary: ch.NewSingleOutbound(hostPort)},
        },
    })
    err = dispatcher.Start()
    if err != nil {
        panic(err)
    }

    service := workflowserviceclient.New(dispatcher.ClientConfig(cadenceService))
    worker := worker.New(service, domain, taskList, worker.Options{})
    cc = client.NewClient(service, domain, &client.Options{})

    retry := &cadence.RetryPolicy{
        InitialInterval:    time.Second,
        BackoffCoefficient: 2,
        MaximumInterval:    time.Minute,
        ExpirationInterval: time.Minute * 10,
        MaximumAttempts:    10,
        NonRetriableErrorReasons: []string{
            "cadenceInternal:Panic",
            "cadenceInternal:Generic",
        },
    }
    _ = retry

    start := func() {
        s := time.Now()
        wf, err := cc.StartWorkflow(context.Background(), client.StartWorkflowOptions{
            ID:                           uuid.New().String(),
            TaskList:                     taskList,
            ExecutionStartToCloseTimeout: time.Hour,
        }, SimpleWorkflow)
        if err != nil {
            panic(err)
        }
        fmt.Printf("duration: %+v, start: %v, end: %v, id: %s\n", time.Since(s).Seconds(), s.Format("15:04:05"), time.Now().Format("15:04:05"), wf.ID)
    }
    cron := func() {
        _, err := cc.StartWorkflow(context.Background(), client.StartWorkflowOptions{
            ID:                           "CronWorkflow",
            TaskList:                     taskList,
            ExecutionStartToCloseTimeout: time.Hour,
            CronSchedule:                 "* * * * *",
        }, CronWorkflow)
        if err != nil {
            panic(err)
        }
    }

    if len(os.Args) > 1 {
        switch os.Args[1] {
        case "start":
            for i := 0; i < 2000; i++ {
                go start()
            }
            select {}
        case "cron":
            cron()
            select {}
        default:
            fmt.Printf("%+v\n", "role: start/cron")
        }
    } else {
        err = worker.Run()
        if err != nil {
            panic(err)
        }
    }
}
hulucc commented 3 years ago

It's caused by system.enableVisibilitySampling. You can disable visibility sampling and use standalone db with visibility, or use elasticsearch to avoid this problem.

zhangkyou commented 1 year ago

The default value of system.enableVisibilitySampling in dynamicconfig is FALSE. I didn't write this config key in my dynamicconfig file, but I still have the problem.