google / go-cloud

The Go Cloud Development Kit (Go CDK): A library and tools for open cloud development in Go.
https://gocloud.dev/
Apache License 2.0
9.45k stars 799 forks source link

pubsub/awssnssqs: Add support for setting FIFO message metadata #3434

Closed bartventer closed 1 month ago

bartventer commented 1 month ago

Describe the bug

The gocloud.dev/pubsub package doesn't allow including the MessageGroupId parameter in the metadata (or any other metadata) when publishing to AWS SNS FIFO Topics or AWS SQS FIFO queues. This is a limitation as it's a required parameter. The current workaround, using the BeforeSend method, lacks portability. It would be beneficial to support MessageGroupId in the message metadata.

Error when publishing to an SNS FIFO topic:

Error sending message: pubsub (code=InvalidArgument): InvalidParameter: Invalid parameter: The MessageGroupId parameter is required for FIFO topics.

Error when publishing to an SQS FIFO queue:

Error sending message: pubsub (code=Unknown): InvalidParameterValue: The request must contain the parameter MessageGroupId

To Reproduce

Directory structure:

.
├── main.tf
└── main_test.go

main.tf:

terraform {
    required_version = ">= 1.7.5"

    required_providers {
      aws = {
        source  = "hashicorp/aws"
        version = "~> 5.44.0"
      }
    }
  }

  variable "region" {
    type        = string
    description = "Region to create resources in. See https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/Concepts.RegionsAndAvailabilityZones.html for valid values."
    default     = "us-west-2"
  }

  provider "aws" {
    region = var.region
  }

  resource "aws_sns_topic" "fifo" {
    name                        = "gocdk-topic.fifo"
    fifo_topic                  = true
    content_based_deduplication = true
  }

  resource "aws_sqs_queue" "fifo" {
    name                        = "gocdk-queue.fifo"
    fifo_queue                  = true
    content_based_deduplication = true
  }

  output "region" {
    value       = var.region
    description = "The region in which resources were created"
  }

  output "sns_topic_arn" {
    value       = aws_sns_topic.fifo.arn
    description = "The ARN of the SNS FIFO topic"
  }

  output "sqs_queue_url" {
    value       = substr(aws_sqs_queue.fifo.url, 8, length(aws_sqs_queue.fifo.url) - 8)
    description = "The URL of the SQS FIFO queue (formatted without the protocol prefix)"
  }

main_test.go:

package main

import (
    "context"
    "testing"

    "gocloud.dev/pubsub"
    _ "gocloud.dev/pubsub/awssnssqs"
)

// Before running go test, run in this directory:
//  $ terraform init
//  $ terraform apply
// When you're done testing run the following to clean up:
//  $ terraform destroy -auto-approve

// Update these constants with the outputs from terraform apply (or terraform output after apply).
const (
    awsRegion      = "us-west-2"
    awsSNSTopicARN = "arn:aws:sns:us-west-2:252051715350:gocdk-topic.fifo"
    awsSQSQueueURL = "sqs.us-west-2.amazonaws.com/252051715350/gocdk-queue.fifo"
)

func TestFIFOPublish(t *testing.T) {
    message := pubsub.Message{
        Body: []byte("Hello, World!"),
        Metadata: map[string]string{
            // It would be nice to be able to do this
            "DeduplicationId": "1",
            "MessageGroupId":  "1",
        },
    }
    t.Run("SNSTopic", func(t *testing.T) {
        ctx := context.Background()
        topic, err := pubsub.OpenTopic(ctx, "awssns:///"+awsSNSTopicARN+"?region="+awsRegion)
        if err != nil {
            t.Error("Error opening SNS topic:", err)
        }
        defer topic.Shutdown(ctx)
        err = topic.Send(ctx, &message)
        if err != nil {
            t.Error("Error sending message:", err)
        }
    })

    t.Run("SQSQueue", func(t *testing.T) {
        ctx := context.Background()
        queue, err := pubsub.OpenTopic(ctx, "awssqs://"+awsSQSQueueURL+"?region="+awsRegion)
        if err != nil {
            t.Error("Error opening SQS queue:", err)
        }
        defer queue.Shutdown(ctx)
        err = queue.Send(ctx, &message)
        if err != nil {
            t.Error("Error sending message:", err)
        }
    })
}

Expected behavior

Ideally, the MessageGroupId should be able to be included in the message metadata, allowing the message to be published to the SNS FIFO topic and SQS FIFO queue without any errors.

Version

gocloud.dev/pubsub v0.37.0

Additional context

N/A