google / go-cloud

The Go Cloud Development Kit (Go CDK): A library and tools for open cloud development in Go.
https://gocloud.dev/
Apache License 2.0
9.57k stars 812 forks source link

pubsub/all: Using long polling when consuming from a FIFO SQS queue ends up blocking requests #3456

Closed mitsos1os closed 3 months ago

mitsos1os commented 3 months ago

Describe the bug

The pubsub package has trouble consuming from a FIFO enabled SQS queue when using long polling on messages with the same MessageGroupID and blocks requests by the amount of wait time configured for long polling.

The reason this is happening is due to the way the pubsub library is structured and also due to the basic operation mode of FIFO SQS queues.

Details about SQS FIFO queue operation

When an SQS queue has FIFO enabled, then as the name suggests, it guarantees that the messages belonging to the same group (using the same MessageGroupID), will be delivered in-order and with an exactly-once delivery guarantee. This means, that unless a message is acknowledged, the next in-line will not be made available when reading from the queue. Sending a poll request when the last message has not been acknowledged, will return 0 messages from the queue. More info here

Long Polling is a mode during which you configure a Wait Time, which shows how much time a poll request will wait before returning until it gets a message or the configured wait time expires. This is used to reduce the number of repeated requests that might return 0 messages. For example, if you configure a wait time of 10 seconds and then send a poll request, it will wait up to 10 seconds, either returning as soon as a message is available or when the 10 seconds have passed by (returning 0 messages again). If short polling is used and a poll request is sent when 0 messages, are available the request will return immediately, sending an other request right after, repeating until a message is available. It is generally advised to use long polling. More info can be found here

pubsub operation

The package supports an internal auto-scaling mechanism that tries to increase the message consumption throughput based on the current speed of message processing (time from the message received, until it is acknowledged).

When the message rate is increased, the so-called batch size of messages requested is increased and requests are split into batches depending on the allowed limits of each driver/cloud-provider.

For example, SQS supports up to 10 messages returned in a single request. So, if the driver determines that we can process more than 10 messages it will increase the batch size to eg: 12. This will have to be split into two batches, one requesting 10 (the max SQS limit) and one requesting the remaining 2. These requests will be sent at the same time from the function getNextBatch and once resolved they will be passed over to the execution flow.

Now, this here is the problematic part:

This leads to the following results:

  1. 1st request for 10 messages will be served immediately returning up to 10 messages from the queue
  2. the 2nd request of the remaining 2 will be sent at the same time
  3. However, since the 10 messages from the 1st request have not been acknowledged (message acknowledgment, is after we exit this flow), the second request will not receive any messages
  4. With long polling enabled, the second request will not return immediately, but wait up to the configured WaitTime until it eventually returns 0, because the first batch was not acked, blocking the whole flow.
  5. After that the flow will continue normally (only with the messages from the 1st request) and repeat the cycle continuously

To Reproduce

We have to create an SQS queue with the following:

Steps to reproduce the behavior

  1. Publish quite some messages to the FIFO SQS queue (to have room for consuming enough) using the same MessageGroupId
  2. Open a connection to the queue using long polling:
    
    subURL := <OUR_QUEUE's URL>

urlMux := &pubsub.URLMux{}

u := &awssnssqs.URLOpener{ UseV2: true, SubscriptionOptions: awssnssqs.SubscriptionOptions{ WaitTime: 10 time.Second, // enable long-polling ReceiveBatcherOptions: batcher.Options{ MaxBatchSize: 2, // we reduce batch size to speed up bug appearance }, }, } urlMux.RegisterSubscription(awssnssqs.SQSScheme, u) // Open a pubsub.Subscription using the URL. sub, err := urlMux.OpenSubscription(ctx, subURL)

3. Start consuming the messages using some logging to be able to see the added delay:
```go
for {
  now := time.Now()
  fmt.Println("Start message reception", now)
  m, err := sub.Receive(ctx)
  fmt.Println("End of message reception", time.Since(now))
  if err != nil {
    log.Print(err)
    return err
  }
  fmt.Printf("%s\n", m.Body)
  m.Ack()
}

Almost immediately, you will start noticing that you will be getting 10 seconds delay (the configured WaitTime) between start and end of the Message Receival (due to the 2nd batch request being blocked)

Expected behavior

I would expect that the batches are processed independently and the flow not being blocked. Since, Subscription.Receive returns a single message, it could work without getting blocked by independent requests

Version

v0.38.0

Additional context

The problematic code lies in the pubsub basic package (that's why I used all in the issue name) and not in the awssnssqs one, but because the bug is based on AWS operation, I am not sure if it is entirely transferrable to other cloud providers.