temporalio / temporal

Temporal service
https://docs.temporal.io
MIT License
12.01k stars 850 forks source link

WorkflowTaskTimedOut if submiting a large number of activiteis within one workflow #6806

Open mikellxy opened 2 days ago

mikellxy commented 2 days ago

hi, I apologize for using an issue to seek help.

Expected Behavior

In my workflow worker, firstly I start a timer to wait for the biz data to be ready. Then 1,000 activies are submitted, i expected that all these activities can be scheduled and run asynchronously after the submission. Then I wait for the results of the 1,000 Future object like a wait group model.

This is a simplified version of my code:

func (w *Worker) Start(wfCtx workflow.Context, req *Req) (result string, err error) {
    if err := workflow.Sleep(wfCtx, 30*time.Second); err != nil {
        return err.Error(), err
    }
        // submit 1000 activities
        var futures []workflow.Future
    for i:=0;i<1000;i++ {
                 f := workflow.ExecuteActivity(wfCtx, w.Act, req, i)
             futures = append(futures, f)
    }
        //  Wait for all tasks to run successfully
    for _, f := range futures {
        res := new(Result)
        err := f.Get(wfCtx, res)
        if err != nil {
            return err.Error(), err
        }
        if res.KnownErr != "" {
            logger.Info("known error: %v, workflow can return directly")
            return res.KnownErr, nil
        }
    }
}

the activity options as follow:

func (w *Worker) GetActivityContext(wfCtx workflow.Context) workflow.Context {
    return workflow.WithActivityOptions(wfCtx, workflow.ActivityOptions{
        ScheduleToCloseTimeout: time.Hour,
        StartToCloseTimeout:    time.Hour,
        WaitForCancellation:    false,
        RetryPolicy: &temporal.RetryPolicy{
            MaximumAttempts:        20,         
            NonRetryableErrorTypes: []string{}, 
        },
    })
}

the workflow options:

"start_workflow_options": {
            "workflow_run_timeout": "1d",
            "workflow_task_timeout": "90m",
            "retry_policy": {
              "initial_interval": "1s",
              "backoff_coefficient": 2.0,
              "maximum_interval": "1h",
              "maximum_attempts": 3
            }
          }

the worker options:

"options": {
            "max_concurrent_activity_execution_size": 10
          }

Actual Behavior

It reports WorkflowTaskTimeOut, and none of the activities was scheduled. The timeline is as follow:

  1. WorkflowExecutionStarted
  2. WorkflowTaskScheduled
  3. WorkflowTaskStarted
  4. WorkflowTaskCompleted
  5. TimerStarted
  6. TimerFired
  7. WorkflowTaskScheduled (happend at time T)
  8. WorkflowTaskStarted
  9. WorkflowTaskTimedOut (happend at time T+2 minutes, timeout Type was StartToClose. but my activities' StartToCloseTimeout option is one hour)

p.s. if i reduce the number of activities to 100, all the activities run successfully.

Steps to Reproduce the Problem

1. 1. 1.

Specifications

bergundy commented 1 day ago

There's a hard coded 4 MB limit for a gRPC response. If the accumulated size of the schedule activity commands goes above that limit the RespondWorkflowTaskCompleted request will be rejected and it will manifest as a timeout.

This is a known issue without a good mitigation today. Please confirm this theory, there may be some logs emitted by the Go SDK. As a workaround, you can add a sleep after every 100 activities scheduled.

drewhoskins-temporal commented 1 day ago

There's a feature request open for a possible fix to this issue: https://github.com/temporalio/features/issues/363