cschleiden / go-workflows

Embedded durable workflows for Golang similar to DTFx/Cadence/Temporal
https://cschleiden.github.io/go-workflows/
MIT License
229 stars 49 forks source link

Signal behavior #341

Closed mandeepbrar closed 5 months ago

mandeepbrar commented 5 months ago

Signal behavior in the below piece of code seems to be a problem. I am running a workflow and want to block it till an external signal is received from a URL (implemented by call to http://localhost:3000/bar/:token) below. If the token (uuid) is passed from startWorkflow function, the code is running well and the blocking behavior is good i.e. workflow moves to Activity1 and executes it. But if the channel name is generated as part of workflow execution, it doesnt work and the workflow remains stuck.

ie. it works if the following line is there workflow.NewSignalChannel[int](ctx, chanName).Receive(ctx)

but doesnt work if the chanName in the same line is changed to workflow.NewSignalChannel[int](ctx, newuid).Receive(ctx)

I am passing the chanName or newuid by copying the relevant signal channel name from the console and pasting it into the browser e.g. http://localhost:3000/bar/d811b2b9-60e1-4bab-bcd1-69ebe92a877a

package main

import (
    "context"
    "fmt"
    "log"
    "os"
    "os/signal"

    "github.com/cschleiden/go-workflows/backend"
    "github.com/cschleiden/go-workflows/backend/sqlite"
    "github.com/cschleiden/go-workflows/client"
    "github.com/cschleiden/go-workflows/diag"
    "github.com/cschleiden/go-workflows/worker"
    "github.com/cschleiden/go-workflows/workflow"
    "github.com/gin-gonic/gin"
    "github.com/google/uuid"
)

func main() {
    ctx := context.Background()

    b := GetBackend()

    // Run worker
    go RunWorker(ctx, b)

    // Start workflow via client
    c := client.New(b)

    startWorkflow(ctx, c, b)

    c2 := make(chan os.Signal, 1)
    signal.Notify(c2, os.Interrupt)
    <-c2
}

func GetBackend() diag.Backend {
    return sqlite.NewInMemoryBackend()
}

func startWorkflow(ctx context.Context, c *client.Client, b diag.Backend) {
    chanName := uuid.NewString()
    fmt.Println("channel name ", chanName)
    wf, err := c.CreateWorkflowInstance(ctx, client.WorkflowInstanceOptions{
        InstanceID: uuid.NewString(),
    }, Workflow1, "Hello world", chanName)
    if err != nil {
        panic("could not start workflow")
    }

    route := gin.Default()

    route.GET("/bar/:token", func(ctx *gin.Context) {
        log.Println("Started workflow", wf.InstanceID)
        token := ctx.Param("token")

        c.SignalWorkflow(ctx, wf.InstanceID, token, 42)

        fmt.Println("request received")
    })

    //  m.Handle("/diag/", http.StripPrefix("/diag", diag.NewServeMux(b)))

    go route.Run(":3000")

    //  log.Println("Debug UI available at http://localhost:3000/diag")

}

func RunWorker(ctx context.Context, mb backend.Backend) {
    w := worker.New(mb, nil)

    w.RegisterWorkflow(Workflow1)
    w.RegisterActivity(Activity1)
    if err := w.Start(ctx); err != nil {
        panic("could not start worker")
    }
}

func Activity1(ctx context.Context, a, b int) (int, error) {
    fmt.Println("Activity1")
    return a + b, nil
}

func Workflow1(ctx workflow.Context, msg string, chanName string) (string, error) {
    newuid := uuid.NewString()
    fmt.Println("Entering Workflow1  ", newuid)

    workflow.NewSignalChannel[int](ctx, chanName).Receive(ctx)
    fmt.Println("received signal")

    r2, err := workflow.ExecuteActivity[int](ctx, workflow.DefaultActivityOptions, Activity1, 2, 4).Get(ctx)
    if err != nil {
        panic("error getting activity 1 result")
    }

    fmt.Println("next step, wf complete", r2)

    return "result", nil
}