aws / aws-sdk-go

AWS SDK for the Go programming language.
http://aws.amazon.com/sdk-for-go/
Apache License 2.0
8.64k stars 2.07k forks source link

SQS: ReceiveMessage get same msg repeatedly #185

Closed jostyee closed 9 years ago

jostyee commented 9 years ago

While benchmarked my SQS codes, I found it would send messages repeatedly & couldn't be deleted from the queue.

src codes:

func (c *Sqs) ReceiveBatch(max int64) ([]*sqs.Message, error) {
    out, err := c.ReceiveMessage(&sqs.ReceiveMessageInput{
        QueueURL:            aws.String(c.Url),
        MaxNumberOfMessages: &max,
    })
    if err != nil {
        return nil, err
    }

    return out.Messages, nil
}

func (c *Sqs) DeleteBatch(entries []*sqs.DeleteMessageBatchRequestEntry) (failIdx []string, err error) {
    glog.V(1).Infof("delete msgs size:%d\n", len(entries))
    out, err := c.DeleteMessageBatch(&sqs.DeleteMessageBatchInput{
        QueueURL: aws.String(c.Url),
        Entries:  entries,
    })
    if err != nil {
        return nil, err
    }

    if nil != out.Failed {
        for _, entry := range out.Failed {
            failIdx = append(failIdx, *entry.ID)
        }
    }
    return failIdx, nil
}

benchmark codes:

func BenchmarkWorker(b *testing.B) {
    for n := 0; n < b.N; n++ {
        _, err := q.Send(msgBody+strconv.Itoa(n), 0)
        if err != nil {
            b.Errorf("non-service error occurred:", err)
        }
    }
}

debug logs:

...
I0407 13:11:42.500535   26032 handler.go:21] msg_id:8402aaa1-3a8c-43e8-a769-227ab8206c44, msg:Send with ♥ through SQS48
I0407 13:11:42.500570   26032 worker.go:39] worker: Spawned worker goroutine
I0407 13:11:42.500577   26032 handler.go:21] msg_id:8402aaa1-3a8c-43e8-a769-227ab8206c44, msg:Send with ♥ through SQS48
I0407 13:11:42.500663   26032 worker.go:39] worker: Spawned worker goroutine
I0407 13:11:42.500673   26032 handler.go:21] msg_id:8402aaa1-3a8c-43e8-a769-227ab8206c44, msg:Send with ♥ through SQS48
I0407 13:11:42.500719   26032 worker.go:39] worker: Spawned worker goroutine
I0407 13:11:42.500731   26032 handler.go:21] msg_id:8402aaa1-3a8c-43e8-a769-227ab8206c44, msg:Send with ♥ through SQS48
I0407 13:11:42.500799   26032 worker.go:39] worker: Spawned worker goroutine
I0407 13:11:42.500814   26032 handler.go:21] msg_id:8402aaa1-3a8c-43e8-a769-227ab8206c44, msg:Send with ♥ through SQS48
I0407 13:11:42.500857   26032 worker.go:39] worker: Spawned worker goroutine
I0407 13:11:42.500878   26032 handler.go:21] msg_id:8402aaa1-3a8c-43e8-a769-227ab8206c44, msg:Send with ♥ through SQS48
I0407 13:11:46.187016   26032 sqs.go:186] delete msgs size:10
I0407 13:11:46.272747   26032 worker.go:64] batch delete failed msgs:[]
...
lsegal commented 9 years ago

Can you provide debugging output to see if the DeleteMessageBatch operation was successful?

In general, this would not be a "concurrency" problem, as these requests are independent of one another. It's possible that there is an eventual consistency issue in the service, but this would be out of the SDK's control. The other possibility is that the SDK is simply not successfully issuing the request and then incorrectly reporting its result. A debug log would tell the full story.

jostyee commented 9 years ago

It can be split to 2 parts of problems: the first is the duplicate sent messages, logs: https://gist.github.com/iyee/cc96147eaf9d167267ca ReceiveMessage get same msgs multiple times.

the second is the batchdelete func not working, logs: https://gist.github.com/iyee/a942c3d6b25782b91679 after invoke it, msg still can be reached at the next receive loop. _[edited]_I've looked into the logs myself, seems the delete action is fine, but SQS continues to produce more same msgs to consume.

lsegal commented 9 years ago

ReceiveMessage get same msgs multiple times.

@iyee as mentioned in #174, ReceiveMessage will always continue to receive a message until it's been deleted from the queue, and marked as such by a successful 200 response from a Delete operation request (simply sending a request does not imply that it was necessarily deleted yet). My guess is you have a race condition between your producer and consumer goroutines, and this is something you will have to managed in your codebase. Or messages aren't being deleted... see below.

I've looked into the logs myself, seems the delete action is fine, but SQS continues to produce more same msgs to consume.

If the SDK is receiving 200 OK responses then there is not much else the SDK can be doing here-- the service is responsible for doing the correct handling of messages once the request has been received. In other words, we cannot control how the service behaves, only how we construct the requests. If the requests are constructed and received correctly, the issue is not with the SDK but with the operational behavior of the service or application.

That said, I'm seeing quite a few 400 responses in your second set of logs. In fact, it seems as though all of your batch delete requests are failing, which would explain why you're still receiving the same messages.

The error message seems to look something like:

AWS.SimpleQueueService.BatchEntryIdsNotDistinct: Id 96e07041-1681-464f-9bc7-1e4fddb57590 repeated.

Which implies to me that you are not constructing the correct request. You must use unique IDs when constructing the batch of items to delete. You cannot duplicate any of those IDs in the list or the request will be rejected.

Are you properly handling the err response value from the delete operation? If so, can you confirm that the SDK is returning the above error? If it is not doing this, that would be a bug.

jostyee commented 9 years ago

Checked again, sent from both aws-sdk-java and aws-sdk-go client are normal, 'messages available' number is equal to sent ones.

But the aws-sdk-go ReceiveMessage doesn't deal with them correctly, will repeatedly consumes same msgs:

I0407 18:30:43.679225   32532 worker.go:24] worker: Received 3 messages
I0407 18:30:43.679320   32532 worker.go:39] worker: Spawned worker goroutine
I0407 18:30:43.679346   32532 handler.go:21] **msg_id:0caf0f3e-17a0-4326-825c-ff0d840d7f4a**, msg:Send text from Java: 5
I0407 18:30:43.679441   32532 worker.go:39] worker: Spawned worker goroutine
I0407 18:30:43.679455   32532 handler.go:21] **msg_id:0caf0f3e-17a0-4326-825c-ff0d840d7f4a**, msg:Send text from Java: 5
I0407 18:30:43.679513   32532 worker.go:39] worker: Spawned worker goroutine
I0407 18:30:43.679534   32532 handler.go:21] **msg_id:0caf0f3e-17a0-4326-825c-ff0d840d7f4a**, msg:Send text from Java: 5

@lsegal Tried to use DeleteMessage instead of DeleteMessageBatch without luck codes:

func handleMsg(q *gaea.Sqs, h *Handler, m *sqs.Message) error {
    if err := (*h).Handle(m); err != nil {
        return err
    }
    err := q.Delete(m.ReceiptHandle)
    if err == nil {
        glog.Infof("DeleteMessage okay: ", *m.ReceiptHandle)
    } else {
        glog.Errorf("DeleteMessage fail: ", *m.ReceiptHandle, err)
    }
    return err
}

output:

Log file created at: 2015/04/07 18:46:25
Running on machine: worktop
Binary: Built with gc go1.4.2 for darwin/amd64
Log line format: [IWEF]mmdd hh:mm:ss.uuuuuu threadid file:line] msg
I0407 18:46:25.951708   33239 worker.go:15] Start consumer:https://sqs.cn-north-1.amazonaws.com.cn//alerting_slack
I0407 18:46:25.976558   33239 worker.go:24] worker: Received 3 messages
I0407 18:46:25.976611   33239 worker.go:39] worker: Spawned worker goroutine
I0407 18:46:25.976624   33239 handler.go:21] msg_id:b2efa2a7-f3d4-4859-8e22-17ca5d4d6b6b, msg:Send with ♥ through SQS10
I0407 18:46:25.976903   33239 worker.go:39] worker: Spawned worker goroutine
I0407 18:46:25.976954   33239 handler.go:21] msg_id:b2efa2a7-f3d4-4859-8e22-17ca5d4d6b6b, msg:Send with ♥ through SQS10
I0407 18:47:14.838530   33239 worker.go:64] DeleteMessage okay: %!(EXTRA string=AQEBy+KwAUmjb6FwKsBq5y1JvDeeT4sWViuGmkGy3LgSfwzjwCbNt2P1HB2uWV6APl7CnHkMij175Yhm3xZR9dQoRyzZeen0LWYJqFKJyWm5JKF4rfT02c0KrjfY0TI5sEVubPMaaBdLJnH8YEmxDSMESLzvZ2SN2ZmFHd+pchBYXKXpCdw8XyjWnUgqCJybSttpP7lhf6lgUddMmdeTn5LkrABrBEokXwy/3Zwz+jmgXuN/dWIl90ksJGg9zczgfO/==)
I0407 18:47:14.977719   33239 worker.go:64] DeleteMessage okay: %!(EXTRA string=AQEBy+KwAUmjb6FwKsBq5y1JvDeeT4sWViuGmkGy3LgSfwzjwCbNt2P1HB2uWV6APl7CnHkMij175Yhm3xZR9dQoRyzZeen0LWYJqFKJyWm5JKF4rfT02c0KrjfY0TI5sEVubPMaaBdLJnH8YEmxDSMESLzvZ2SN2ZmFHd+pchBYXKXpCdw8XyjWnUgqCJybSttpP7lhf6lgUddMmdeTn5LkrABrBEokXwy/3Zwz+jmgXuN/dWIl90ksJGg9zczgfO/==)

12 sent but 31 received, no error occured.

PS: Through the SQS Management Console, I can find the messages consumed slowly

jostyee commented 9 years ago
ReceiveMessage will always continue to receive a message until it's been deleted from the queue, and marked as such by a successful 200 response from a Delete operation request (simply sending a request does not imply that it was necessarily deleted yet).

@lsegal My problem is not only receive a message multiple times, but also receive multiple same message at one time.

I doubt there is any race condition, since I just start a thread to infinitely for loop result from ReceiveBatch(), send msgs to a goroutine to consume, and then sleep 2 seconds to void the undeleted msgs.

lsegal commented 9 years ago

@iyee based on the following sample log data:

I0407 18:47:43.917988   33239 worker.go:24] worker: Received 4 messages
I0407 18:47:43.918022   33239 worker.go:39] worker: Spawned worker goroutine
I0407 18:47:43.918028   33239 handler.go:21] msg_id:059e10d7-633a-421b-a927-0d660d6a9567, msg:Send with ♥ through SQS5
I0407 18:47:43.918072   33239 worker.go:39] worker: Spawned worker goroutine
I0407 18:47:43.918091   33239 handler.go:21] msg_id:059e10d7-633a-421b-a927-0d660d6a9567, msg:Send with ♥ through SQS5
I0407 18:47:43.918154   33239 worker.go:39] worker: Spawned worker goroutine
I0407 18:47:43.918163   33239 handler.go:21] msg_id:059e10d7-633a-421b-a927-0d660d6a9567, msg:Send with ♥ through SQS5
I0407 18:47:43.918199   33239 worker.go:39] worker: Spawned worker goroutine
I0407 18:47:43.918204   33239 handler.go:21] msg_id:059e10d7-633a-421b-a927-0d660d6a9567, msg:Send with ♥ through SQS5

It seems like you are processing the same msg multiple times instead of all 4, and then deleting only that single message of the 4.

Can you provide the code from handler.go that actually loops over the messages to process them? I've verified that the SDK is correctly unmarshalling all the received messages from the ReceiveMessage request, and it looks like it's pulling all of them out, and in my tests they are unique messages.

jostyee commented 9 years ago

figured out. I've reference a defferent msg obj to consume. invoke DeleteMessege() after Receive one by one is OK, but DeleteMessegeBatch() would still get duplicate messages.