uber-go / cadence-client

Framework for authoring workflows and activities running on top of the Cadence orchestration engine.
https://cadenceworkflow.io
MIT License
345 stars 131 forks source link

ExecuteActivity using activity name doesn't seem to work for me in unit tests #366

Closed ryanwalls closed 6 years ago

ryanwalls commented 6 years ago

I'm probably doing something wrong.

Here's my activity registration:

package activities

import "go.uber.org/cadence/activity"

func init() {
    activity.RegisterWithOptions(RunPartProcessor, activity.RegisterOptions{Name: "RunPartProcessor"})
}

Here's my workflow:

package partprocessor

import (
    "github.com/3dsim/workflow/activities"
    "github.com/3dsim/workflow/keys"
    "go.uber.org/cadence/activity"
    "go.uber.org/cadence/workflow"
    "go.uber.org/zap"
    "reflect"
    "time"
)

func init() {
    workflow.Register(Workflow)
}

// Workflow is the workflow function for part processor
func Workflow(ctx workflow.Context, partID int) error {
    logger := workflow.GetLogger(ctx)
    // run part processor activity
    options := workflow.ActivityOptions{
        ScheduleToStartTimeout: 60 * time.Minute,
        StartToCloseTimeout:    60 * time.Minute,
        ScheduleToCloseTimeout: 60 * time.Minute,
        HeartbeatTimeout:       5 * time.Minute,
        ActivityID:             activities.GenerateID(ctx, "partprocessor"),
    }
    partProcessorContext := workflow.WithActivityOptions(ctx, options)

    completeEntityCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
        ScheduleToStartTimeout: 5 * time.Second,
        StartToCloseTimeout:    10 * time.Minute,
        ScheduleToCloseTimeout: 10 * time.Minute,
        HeartbeatTimeout:       10 * time.Minute,
        ActivityID:             activities.GenerateID(ctx, "completeentity"),
    })

    var runPartProcessorOutput activities.RunPartProcessorOutput
    err := workflow.ExecuteActivity(partProcessorContext, "RunPartProcessor", partID).Get(ctx, &runPartProcessorOutput)
    if err != nil && err != activity.ErrResultPending {
        logger.Info("Part processor returned an error", zap.Error(err))
        completeEntityErr := workflow.ExecuteActivity(completeEntityCtx, activities.CompleteEntity, keys.PartManager, partID, 0, reflect.TypeOf(err).String()).Get(ctx, nil)
        if completeEntityErr != nil {
            logger.Error("Problem completing entity", zap.Error(err))
        }
        return err
    }
    err = workflow.ExecuteActivity(completeEntityCtx, activities.CompleteEntity, keys.PartManager, partID, 0, "").Get(ctx, nil)
    if err != nil {
        logger.Error("CompleteEntity errored", zap.Error(err))
        return err
    }

    logger.Info("Part processor completed successfully")
    return nil
}

Here's my unit test:

func TestWorkflow(t *testing.T) {
    t.Run("WhenSuccessfulExpectsWorkflowCompleted", func(t *testing.T) {
        // arrange - variables
        s := &testsuite.WorkflowTestSuite{}
        env := s.NewTestWorkflowEnvironment()
        partID := 32
        expectedRunPartProcessorOutput := &activities.RunPartProcessorOutput{}

        // arrange - mock activities
        mockPartProcessorActivity := func(ctx context.Context, partID int) (*activities.RunPartProcessorOutput, error) {
            activityInfo := activity.GetInfo(ctx)
            env.RegisterDelayedCallback(func() {
                err := env.CompleteActivity(activityInfo.TaskToken, expectedRunPartProcessorOutput, nil)
                assert.NoError(t, err)
            }, time.Second)
            return nil, activity.ErrResultPending
        }

        env.OnActivity(activities.RunPartProcessor, mock.Anything, partID).Return(mockPartProcessorActivity)
        env.OnActivity(activities.CompleteEntity, mock.Anything, keys.PartManager, partID, 0, "").Return(nil)

        // arrange - setup listeners
        env.SetOnActivityCompletedListener(func(activityInfo *activity.Info, result encoded.Value, err error) {
            logger.Log.Debug("In completed listener!")
            switch activityInfo.ActivityType.Name {
            case "github.com/3dsim/workflow/activities.RunPartProcessor":
                if err != nil && err != activity.ErrResultPending {
                    assert.FailNow(t, err.Error())
                }
                assert.NotEmpty(t, activityInfo.ActivityID, "Expected voxelization to have completed")
            case "github.com/3dsim/workflow/activities.CompleteEntity":
                assert.NotEmpty(t, activityInfo.ActivityID, "Expected completeEntity to have completed")
            default:
                t.Fatal("Unknown activity type: " + activityInfo.ActivityType.Name)
            }
        })

        // act
        env.ExecuteWorkflow(Workflow, partID)

        // assert
        assert.True(t, env.IsWorkflowCompleted(), "Expected workflow to complete")
        assert.NoError(t, env.GetWorkflowError(), "Expected no errors")
        env.AssertExpectations(t)
    })
}

Error I get:

panic: unable to find activityType=RunPartProcessor

I tried with the package name is well activities.RunPartProcessor and didn't work either.

Any ideas?

yiminc-zz commented 6 years ago

i tried your code, and it worked with slightly changes. The change is in your activity completed listener, you need to switch on the custom name if your register with a custom name.

=== Activities

package helloworld

import (
    "context"
    "go.uber.org/cadence/activity"
)

func init() {
    activity.RegisterWithOptions(RunPartProcessor, activity.RegisterOptions{Name: "RunPartProcessor"})
    activity.RegisterWithOptions(CompleteEntity, activity.RegisterOptions{Name: "CompleteEntity"})
}

type RunPartProcessorOutput struct {
}

func RunPartProcessor(ctx context.Context, partId int) (*RunPartProcessorOutput, error) {
    return &RunPartProcessorOutput{}, nil
}

func CompleteEntity(ctx context.Context, partID int, xx int, errMsg string) error {
    return nil
}

=== Workflow:

package helloworld

import (
    "go.uber.org/cadence/activity"
    "go.uber.org/cadence/workflow"
    "go.uber.org/zap"
    "reflect"
    "time"
)

func init() {
    workflow.Register(PartWorkflow)
}

// Workflow is the workflow function for part processor
func PartWorkflow(ctx workflow.Context, partID int) error {
    logger := workflow.GetLogger(ctx)
    // run part processor activity
    options := workflow.ActivityOptions{
        ScheduleToStartTimeout: 60 * time.Minute,
        StartToCloseTimeout:    60 * time.Minute,
        ScheduleToCloseTimeout: 60 * time.Minute,
        HeartbeatTimeout:       5 * time.Minute,
        ActivityID:             "partprocessor",
    }
    partProcessorContext := workflow.WithActivityOptions(ctx, options)

    completeEntityCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
        ScheduleToStartTimeout: 5 * time.Second,
        StartToCloseTimeout:    10 * time.Minute,
        ScheduleToCloseTimeout: 10 * time.Minute,
        HeartbeatTimeout:       10 * time.Minute,
        ActivityID:             "completeentity",
    })

    var runPartProcessorOutput RunPartProcessorOutput
    err := workflow.ExecuteActivity(partProcessorContext, "RunPartProcessor", partID).Get(ctx, &runPartProcessorOutput)
    if err != nil && err != activity.ErrResultPending {
        logger.Info("Part processor returned an error", zap.Error(err))
        completeEntityErr := workflow.ExecuteActivity(completeEntityCtx, CompleteEntity, partID, 0, reflect.TypeOf(err).String()).Get(ctx, nil)
        if completeEntityErr != nil {
            logger.Error("Problem completing entity", zap.Error(err))
        }
        return err
    }
    err = workflow.ExecuteActivity(completeEntityCtx, CompleteEntity, partID, 0, "").Get(ctx, nil)
    if err != nil {
        logger.Error("CompleteEntity errored", zap.Error(err))
        return err
    }

    logger.Info("Part processor completed successfully")
    return nil
}

=== Tests:

package helloworld

import (
    "context"
    "github.com/stretchr/testify/assert"
    "github.com/stretchr/testify/mock"
    "go.uber.org/cadence/activity"
    "go.uber.org/cadence/encoded"
    "go.uber.org/cadence/testsuite"
    "testing"
    "time"
)

func TestWorkflow(t *testing.T) {
    t.Run("WhenSuccessfulExpectsWorkflowCompleted", func(t *testing.T) {
        // arrange - variables
        s := &testsuite.WorkflowTestSuite{}
        env := s.NewTestWorkflowEnvironment()
        partID := 32
        expectedRunPartProcessorOutput := &RunPartProcessorOutput{}

        // arrange - mock activities
        mockPartProcessorActivity := func(ctx context.Context, partID int) (*RunPartProcessorOutput, error) {
            activityInfo := activity.GetInfo(ctx)
            env.RegisterDelayedCallback(func() {
                err := env.CompleteActivity(activityInfo.TaskToken, expectedRunPartProcessorOutput, nil)
                assert.NoError(t, err)
            }, time.Second)
            return nil, activity.ErrResultPending
        }

        env.SetTestTimeout(time.Hour)

        env.OnActivity(RunPartProcessor, mock.Anything, partID).Return(mockPartProcessorActivity)
        env.OnActivity(CompleteEntity, mock.Anything, partID, 0, "").Return(nil)

        // arrange - setup listeners
        env.SetOnActivityCompletedListener(func(activityInfo *activity.Info, result encoded.Value, err error) {
            switch activityInfo.ActivityType.Name {
            case "RunPartProcessor":
                if err != nil && err != activity.ErrResultPending {
                    assert.FailNow(t, err.Error())
                }
                assert.NotEmpty(t, activityInfo.ActivityID, "Expected voxelization to have completed")
            case "CompleteEntity":
                assert.NotEmpty(t, activityInfo.ActivityID, "Expected completeEntity to have completed")
            default:
                t.Fatal("Unknown activity type: " + activityInfo.ActivityType.Name)
            }
        })

        // act
        env.ExecuteWorkflow(PartWorkflow, partID)

        // assert
        assert.True(t, env.IsWorkflowCompleted(), "Expected workflow to complete")
        assert.NoError(t, env.GetWorkflowError(), "Expected no errors")
        env.AssertExpectations(t)
    })
}
yiminc-zz commented 6 years ago

close this for now, please reopen if you think otherwise.