The offending code from coordinated_worker.go with my comments inline with RW:
cancel := make(chan error) // RW: Note this is unbuffered
stopHeartbeating := make(chan struct{})
go c.heartbeat(activityTask, stopHeartbeating, cancel)
defer close(stopHeartbeating)
ticks := time.NewTicker(c.tickMinInterval)
defer ticks.Stop()
for {
select {
case cause := <-cancel:
c.cancel(activityTask, input)
return nil, cause
case <-ticks.C:
cont, res, err := c.handler.Tick(activityTask, input)
if !cont {
return res, err
}
if res != nil {
//send an activity update when the result is not null, but we are continuing
if err := c.worker.signalUpdate(activityTask, res); err != nil {
Log.Printf("workflow-id=%s activity-id=%s activity-id=%s at=signal-update-error error=%q", LS(activityTask.WorkflowExecution.WorkflowId), LS(activityTask.ActivityType.Name), LS(activityTask.ActivityId), err)
cancel <- err // RW: We block here because sends are synchronous on unbuffered channels.
continue // go pick up the cancel message
}
Log.Printf("workflow-id=%s activity-id=%s activity-id=%s at=signal-update", LS(activityTask.WorkflowExecution.WorkflowId), LS(activityTask.ActivityType.Name), LS(activityTask.ActivityId))
}
}
}
I can send a PR to fix this one, just depends on what approach is preferred. I could either make the cancel channel buffered, or could run cancel <- err in a newly spawned goroutine.
The offending code from
coordinated_worker.go
with my comments inline withRW
:I can send a PR to fix this one, just depends on what approach is preferred. I could either make the
cancel
channel buffered, or could runcancel <- err
in a newly spawned goroutine.Thoughts?