vmware / vmware-go-kcl-v2

vmware-go-kcl is a vmware originated open-source project for AWS Kinesis Client Library in Go. It has been widely used by many external companies and internally by Carbon Black. vmware-go-kcl-v2 is its companion project by utilizing AWS Go SDK V2 which introduces lots of breaking changes. To keep the repo clean, it is better to have a separated repo vmware-go-kcl-v2 with better golang project structure improvement.
MIT License
23 stars 17 forks source link

Empty KinesisEndpoint doesn't default to generated endpoint #5

Closed errorhandler closed 1 year ago

errorhandler commented 2 years ago

Describe the bug

The comments for the AWS endpoint fields state that:

        // KinesisEndpoint is an optional endpoint URL that overrides the default generated endpoint for a Kinesis client.
        // If this is empty, the default generated endpoint will be used.
...
        // DynamoDBEndpoint is an optional endpoint URL that overrides the default generated endpoint for a DynamoDB client.
        // If this is empty, the default generated endpoint will be used.

However, if they are empty then the AWS clients are configured with the endpoints "", rather than defaulting to the default endpoint resolver.

Reproduction steps

1.Configure the KCL worker without specifying the DynamoDB and Kinesis endpoints

Expected behavior

The default endpoint resolver is used.

Additional context

No response

pandar00 commented 2 years ago

I noticed the same thing. The DynamoDB checkpointer implementation does the empty string check and make sure to only override the Endpoint if the config's URL length is greater than 1

https://github.com/vmware/vmware-go-kcl-v2/blob/main/clientlibrary/checkpoint/dynamodb-checkpointer.go#L94

            if service == dynamodb.ServiceID && len(checkpointer.kclConfig.DynamoDBEndpoint) > 0 {
                return aws.Endpoint{
                    PartitionID:   "aws",
                    URL:           checkpointer.kclConfig.DynamoDBEndpoint,
                    SigningRegion: checkpointer.kclConfig.RegionName,
                }, nil
            }
            // returning EndpointNotFoundError will allow the service to fallback to it's default resolution
            return aws.Endpoint{}, &aws.EndpointNotFoundError{}

But the worker which reads from Kinesis doesn't do this https://github.com/vmware/vmware-go-kcl-v2/blob/main/clientlibrary/worker/worker.go#L165

        resolver := aws.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (aws.Endpoint, error) {
            return aws.Endpoint{
                PartitionID:   "aws",
                URL:           w.kclConfig.KinesisEndpoint,
                SigningRegion: w.regionName,
            }, nil
        })

AWS specifies resolver to return EndpointNotFoundError to fall back to default implementation https://github.com/aws/aws-sdk-go-v2/blob/c214cb61990441aa165e216a3f7e845c50d21939/aws/endpoints.go#L187

// EndpointResolverWithOptions is an endpoint resolver that can be used to provide or
// override an endpoint for the given service, region, and the service client's EndpointOptions. API clients will
// attempt to use the EndpointResolverWithOptions first to resolve an endpoint if
// available. If the EndpointResolverWithOptions returns an EndpointNotFoundError error,
// API clients will fallback to attempting to resolve the endpoint using its
// internal default endpoint resolver.
pranay-1995 commented 2 years ago

I managed to achieve this by doing workaround like this,

func CreateKinesisClient(kclConfig *cfg.KinesisClientLibConfiguration) *kinesis.Client {
    ctx := context.Background()
    resolver := aws.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (aws.Endpoint, error) {
        if service == kinesis.ServiceID && len(kclConfig.KinesisEndpoint) > 0 {
            return aws.Endpoint{
                PartitionID:   "aws",
                URL:           kclConfig.KinesisEndpoint,
                SigningRegion: kclConfig.RegionName,
            }, nil
        }
        // returning EndpointNotFoundError will allow the service to fallback to it's default resolution
        return aws.Endpoint{}, &aws.EndpointNotFoundError{}
    })

    cfg, err := config.LoadDefaultConfig(ctx, config.WithEndpointResolverWithOptions(resolver))
    if err != nil {
        log.Panic().Err(err).Msg("Failed loading default config for kinesis client")
    }

    kc := kinesis.NewFromConfig(cfg)
    return kc
}
worker := wk.NewWorker(&RecordProcessorFactory{EventsProcessor: processor}, kclConfig).WithKinesis(CreateKinesisClient(kclConfig))

But ideally this code should be added to the part mentioned by pandar00 - https://github.com/vmware/vmware-go-kcl-v2/blob/main/clientlibrary/worker/worker.go#L165