qhenkart / gosqs

MIT License
40 stars 18 forks source link

Wait group for senders #6

Open lzap opened 2 years ago

lzap commented 2 years ago

Hey,

I am digging in the codebase and testing it, I have found a bug. Sending messages in separate gouroutines is not synchronized, meaning when a service is asked to shutdown and the main function finishes, all other goroutines will die immediately.

To solve this, I've implemented a simple wait group in my copy of your code:

type client struct {
    sqs      *sqs.Client
    queueURL string
    logger   log.Logger
    senderWG sync.WaitGroup // THIS

    handlers     map[string]Handler
    heartbeatSec int
    maxBeats     int
    workerPool   int
    maxMessages  int
}

func (c *client) Enqueue(ctx context.Context, jobType string, body interface{}, extraAttributes ...string) error {
    bytes, err := json.Marshal(body)
    if err != nil {
        return err
    }

    deduplicationId := generateDeduplicationId()
    attributes := []string{"dedup_id", deduplicationId}
    if len(extraAttributes) > 0 {
        attributes = append(attributes, extraAttributes...)
    }
    sqsInput := &sqs.SendMessageInput{
        MessageBody:            aws.String(string(bytes)),
        MessageAttributes:      defaultSQSAttributes(jobType, attributes...),
        MessageGroupId:         aws.String(jobType),
        MessageDeduplicationId: aws.String(deduplicationId),
        QueueUrl:               aws.String(c.queueURL),
    }

    c.senderWG.Add(1) // HERE
    go c.sendDirectMessage(ctx, sqsInput)
    return nil
}

func (c *client) sendDirectMessage(ctx context.Context, input *sqs.SendMessageInput, retryCount ...int) {
    var count int
    if len(retryCount) != 0 {
        count = retryCount[0]
    }

    if count > maxRetryCount-1 {
        c.logger.Log(ctx, log.LogLevelError, "too many failures, giving up", nil)
        c.senderWG.Done()
        return
    }

    if _, err := c.sqs.SendMessage(ctx, input); err != nil {
        if err.Error() == errDataLimit.Error() {
            c.logger.Log(ctx, log.LogLevelError, "payload limit overflow, giving up", nil)
            c.senderWG.Done()
            return
        }

        c.logger.Log(ctx, log.LogLevelWarn, "error publishing, trying again in 10 seconds: "+err.Error(), nil)
        time.Sleep(10 * time.Second)
        c.sendDirectMessage(ctx, input, count+1)
    } else {
        c.logger.Log(ctx, log.LogLevelTrace, "message sent", nil)
        c.senderWG.Done()
    }
}

func (c *client) Wait() {
    c.senderWG.Wait()
}

Then in the main function, I call Wait() at the very end to ensure all sender goroutines are finished before the service gracefully shutdowns.

Apologies I am not sending a patch, my copy of your codebase diverged quite a bit, my usecase is different. Cheers!

qhenkart commented 2 years ago

@lzap I really appreciate you taking the time to come and create an issue with your findings