uber-go / cadence-client

Framework for authoring workflows and activities running on top of the Cadence orchestration engine.
https://cadenceworkflow.io
MIT License
344 stars 130 forks source link

Chained Futures do not unblock selects if "source" future IsReady #490

Open Groxx opened 6 years ago

Groxx commented 6 years ago
package experiments

import (
    "testing"
    "time"

    "github.com/stretchr/testify/assert"
    "go.uber.org/cadence/activity"
    "go.uber.org/cadence/testsuite"
    "go.uber.org/cadence/workflow"
)

func init() {
    workflow.Register(selectorWork)
    activity.Register(selectorAct)
}

func selectorWork(ctx workflow.Context, sync bool) error {
    ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
        ScheduleToStartTimeout: time.Minute,
        StartToCloseTimeout:    time.Minute,
    })

    fut, set := workflow.NewFuture(ctx)
    orig := workflow.ExecuteActivity(ctx, selectorAct, 5)

    if sync {
        orig.Get(ctx, nil)
    }
    set.Chain(orig)

    s := workflow.NewSelector(ctx)
    s.AddFuture(fut, func(f workflow.Future) {
        // nothing to do, unblocking will pass the test
    })
    s.Select(ctx)
    return nil
}

func selectorAct(arg int) (int, error) {
    return arg, nil
}

func TestSelector(t *testing.T) {
    runner := func(block bool) func(t *testing.T) {
        return func(t *testing.T) {
            s := new(testsuite.WorkflowTestSuite)
            env := s.NewTestWorkflowEnvironment()

            env.ExecuteWorkflow(selectorWork, block)

            assert.True(t, env.IsWorkflowCompleted())
            assert.NoError(t, env.GetWorkflowError())
        }
    }

    t.Run("async chains work", runner(false))
    t.Run("sync chains never unblock", runner(true))
}

If you comment out one of those two, you can see that async passes ~immediately, while sync times out.

Cause appears to be that the IsReady branch doesn't close the related channel? https://github.com/uber-go/cadence-client/blob/9c4380588cc1452dc0e84d672cb1f3dd758ae67a/internal/internal_workflow.go#L329

func (f *futureImpl) Chain(future Future) {
    if f.ready {
        panic("already set")
    }

    ch, ok := future.(asyncFuture)
    if !ok {
        panic("cannot chain Future that wasn't created with workflow.NewFuture")
    }
    if !ch.IsReady() {
        ch.ChainFuture(f)
        return
    }
    val, err := ch.GetValueAndError()
    f.value = val
    f.err = err
    f.ready = true
    return
}

Compare to e.g. Set:

func (f *futureImpl) Set(value interface{}, err error) {
    if f.ready {
        panic("already set")
    }
    f.value = value
    f.err = err
    f.ready = true
    f.channel.Close()
    for _, ch := range f.chained {
        ch.Set(f.value, f.err)
    }
}
kotenko-anton commented 3 years ago

I've faced with the same issue and came to the same conclusions. Workaround it with

newFuture, settable := workflow.NewFuture(ctx)
...........................
selector.AddFuture(newFuture, func(f workflow.Future) {.......})
...........................
var value interface{}
err := future.Get(ctx, &value)
if err != nil {
   settable.SetError(err)
} else {
  settable.SetValue(value)
}
...........................
selector.AddFuture()

Do you have plans to fix it? or if you are able to confirm that it is really source of problem I can prepare pull request that fixes the problem.