elastic / beats

:tropical_fish: Beats - Lightweight shippers for Elasticsearch & Logstash
https://www.elastic.co/products/beats
Other
12.06k stars 4.89k forks source link

Cannot use a custom Endpoint with SQS #39706

Open strawgate opened 1 month ago

strawgate commented 1 month ago

Specifying a custom Endpoint for SQS does not work.

With the introduction of https://github.com/elastic/beats/pull/36208 we started creating an endpoint resolver that causes every sdk request, regardless of S3 or SQS, to go to endpoint. There's a series of bugs related to this. Investigation done on 8.12.1 and Main.

TL;DR : When an endpoint is specified, we ask the user to provide the S3 endpoint but this endpoint actually gets applied to all SDK calls and not just S3 calls. This is not a problem when S3 bucket polling but breaks SQS -> S3: https://github.com/elastic/beats/pull/36208/files#diff-589195ecff44ed2c6bf097da676822765ae86c2e73fc5789aaf0a64d8bb26ff1R73-R82

Here we go. First, a couple bugs that got us into this mess:

First, we load the region from the provided credential (not from the beats config)

    awsConfig, _ := GetAWSCredentials(beatsConfig)
    if awsConfig.Region == "" {
        if beatsConfig.DefaultRegion != "" {
            awsConfig.Region = beatsConfig.DefaultRegion
        } else {
            awsConfig.Region = "us-east-1"
        }
    }

And then we never insert the actual region provided by the beats config.

When the user specifies a queue url that lives on a custom endpoint, we cannot parse the region from these domains. Our region resolution then returns an empty string and overwrites our region with this invalid value:

        regionName, err := getRegionFromQueueURL(in.config.QueueURL, in.config.AWSConfig.Endpoint)
        if err != nil {
            return fmt.Errorf("failed to get AWS region from queue_url: %w", err)
        }

        in.awsConfig.Region = regionName # <------- BUG

This was first introduced in https://github.com/elastic/beats/pull/36034 but this was fixed by @faec about 3 weeks ago: https://github.com/elastic/beats/pull/39327

This shouldn't matter as when we have no region set, the SDK will use the default region from the endpoint. However, we have another bug that causes the default region to be ignored.

We end up ignoring the default_region entirely because we call getRegionFromQueueURL and pass the region to the third parameter, instead of the default region:

    if in.config.QueueURL != "" {
        regionName, err := getRegionFromQueueURL(in.config.QueueURL, in.config.AWSConfig.Endpoint, in.config.RegionName <------ THIS SHOULD BE DEFAULT_REGION )
        if err != nil && in.config.RegionName == "" {
            return fmt.Errorf("failed to get AWS region from queue_url: %w", err)
        }
        var warn regionMismatchError
        if errors.As(err, &warn) {
            // Warn of mismatch, but go ahead with configured region name.
            inputContext.Logger.Warnf("%v: using %q", err, regionName)
        }
        in.awsConfig.Region = regionName

This bug was introduced in https://github.com/elastic/beats/pull/36034. This is fixed on Main by @faec, three weeks ago https://github.com/elastic/beats/pull/38958

This bug covers up another bug where parsing the queue_url can work just fine but it will parse to a region that doesn't match the default_region which will cause a warning to print. If there is no region configured at all then we hit an issue where we exit with failed to get AWS region from queue_url. This should print the warning on the next line not exit

    if err != nil && configRegion == "" { <--- enter when region parsing works but doesn't match `default_region`  
        // Only report an error if we don't have a configured region
        // to fall back on.
        return fmt.Errorf("failed to get AWS region from queue_url: %w", err) 
    } else if configRegion != "" && configRegion != urlRegion {
        inputContext.Logger.Warnf("configured region disagrees with queue_url region (%q != %q): using %q", configRegion, urlRegion, urlRegion)
    }

Next, there is a part of getRegionFromQueueURL that is meant to support pulling the region from endpoint but it also does not work due to our custom endpoint having a schema (https://) while our parsed hostname has no schema. This means we can never pull the region from the endpoint field.

func getRegionFromQueueURL(queueURL string, endpoint string) (string, error) {
    // get region from queueURL
    // Example: https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs
    url, err := url.Parse(queueURL)
    if err != nil {
        return "", fmt.Errorf(queueURL + " is not a valid URL")
    }
    if url.Scheme == "https" && url.Host != "" {
        queueHostSplit := strings.Split(url.Host, ".")
        if len(queueHostSplit) > 2 && (strings.Join(queueHostSplit[2:], ".") == endpoint <------ BUG: When an endpoint is specified, it's an https://url, not a hostname. These two can never be equal.
            || (endpoint == "" && queueHostSplit[2] == "amazonaws")) {
            return queueHostSplit[1], nil
        }
    }
    return "", fmt.Errorf("QueueURL is not in format: https://sqs.{REGION_ENDPOINT}.{ENDPOINT}/{ACCOUNT_NUMBER}/{QUEUE_NAME}")
}

This bug was introduced with this PR https://github.com/elastic/beats/pull/24861 and is still an issue on main: https://github.com/elastic/beats/blob/809e00139db7225d18eac26186493e6b79b4a039/x-pack/filebeat/input/awss3/sqs.go#L44-L57.

So when these 8.12.1 bugs are combined: s3sqs input does not honor the value in default_region and only the value in region is considered. If the user supplied an endpoint, it's almost certainly in a format we can't parse, so when we try to get the region from the URL, we ruin the region value. This broken region ensures that default endpoint resolution will fail.

So, in this PR https://github.com/elastic/beats/pull/36208 when we wanted to support custom endpoints for local testing, we applied an EndpointResolver to the S3 input to work around the above unknown issues, but that EndpointResolver applied to all calls, which works around all of the above issues as an Endpoint Resolver returns an exact URL for the service so the missing region doesn't matter anymore. This has the side effect of forcing everything to a single URL and then our SQS call ends up getting directed to the the S3 endpoint.

if config.AWSConfig.Endpoint != "" {
        // Add a custom endpointResolver to the awsConfig so that all the requests are routed to this endpoint
        awsConfig.EndpointResolverWithOptions = awssdk.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (awssdk.Endpoint, error) {
            return awssdk.Endpoint{
                PartitionID:   "aws",
                URL:           config.AWSConfig.Endpoint, <------- Now the S3 endpoint is used for 100% of SDK calls
                SigningRegion: awsConfig.Region,
            }, nil
        })
    }

Now, instead of getting S3 bucket notifications from the SQS Queue URL specified by the user, the EndpointResolver, when called by the SQS service, replaces the entire hostname with the value in endpoint resulting in the SQS receive message going to https://s3.us-east-1.amazonaws.com/...... :

So when these bugs are combined: s3sqs input does not honor the value in default_region and only the value in region is considered. If the user supplied an endpoint, it's almost certainly in a format we can't parse, so when we try to get the region from the URL, we ruin the region value. This broken region ensures that default endpoint resolution will fail. We then applied an EndpointResolver to the S3 input to work around these issues, but that EndpointResolver applied to all calls, which works around all of the above issues for customizing the s3 endpoint, but has the side effect of forcing everything to a single URL and then our SQS call ends up going to the S3 endpoint.

Option 1 - Fix overwritting region with a bad value, support default_region, and fix the Endpoint Resolver,

So to fix custom endpoint usage with SQS we need to scope the endpointresolver to only return an sdkendpoint when we're resolving an endpoint for the S3 service and when the endpoint field in the config is not empty. When a non-S3 service wants an endpoint we need the EndpointResolver to return return awssdk.Endpoint{}, &awssdk.EndpointNotFoundError{} so that the default endpoint resolution can occur (which is what will actually swap the S3 with SQS when called by the SQS service.

    // Add a custom endpointResolver to the awsConfig so that all the requests are routed to this endpoint
    awsConfig.EndpointResolverWithOptions = awssdk.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (awssdk.Endpoint, error) {

        // If the service is S3 and the endpoint is set, return the custom endpoint AS-IS to the S3 service
        if service == s3.ServiceID && config.AWSConfig.Endpoint != "" {
            return awssdk.Endpoint{
                PartitionID:   "aws",
                Source:        awssdk.EndpointSourceCustom,
                URL:           config.AWSConfig.Endpoint,
                SigningRegion: awsConfig.Region,
            }, nil
        }

        // If the service is not S3, return an EndpointNotFoundError to let the SDK use the default endpoint resolver
        return awssdk.Endpoint{}, &awssdk.EndpointNotFoundError{}
    })

Required:

Optional:

Option 2 - Fix parsing custom endpoints and remove the Endpoint Resolver

We remove the Endpoint Resolver and fix the other bugs that caused it to be necessary. This is a more invasive change.

Required work on Main:

Option:

Keeping the current behavior:

        if !strings.HasPrefix(endpointUri.Hostname(), "s3") {
            // Get the resolver from the endpoint url
            awsConfig.EndpointResolverWithOptions = awssdk.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (awssdk.Endpoint, error) {
                return awssdk.Endpoint{
                    PartitionID:       "aws",
                    Source:            awssdk.EndpointSourceCustom,
                    URL:               config.AWSConfig.Endpoint,
                    SigningRegion:     awsConfig.Region,
                    HostnameImmutable: true,
                }, nil
            })
        }

This means we cannot use custom domains on AWS with the AWS SDK. The functionality we implemented is basically BaseEndpoint instead of Endpoint. The right way to implement this behavior would probably be something like:

    sqsAPI := &awsSQSAPI{
        client: sqs.NewFromConfig(in.awsConfig, func(o *sqs.Options) {
            if in.config.AWSConfig.FIPSEnabled {
                o.EndpointOptions.UseFIPSEndpoint = awssdk.FIPSEndpointStateEnabled
            }

            // "The Right Way Start"
            if in.config.AWSConfig.Endpoint != "" {
                o.EndpointResolver = sqs.EndpointResolverFromURL(in.config.AWSConfig.Endpoint, func (e *awssdk.Endpoint) {
                    e.HostnameImmutable = true
                })
            }
            // "The Right Way End"
        }),
strawgate commented 1 month ago

proposed PR https://github.com/elastic/beats/pull/39709

elasticmachine commented 1 month ago

Pinging @elastic/obs-ds-hosted-services (Team:obs-ds-hosted-services)