redpanda-data / connect

Fancy stream processing made operationally mundane
https://docs.redpanda.com/redpanda-connect/about/
8.09k stars 824 forks source link

Generic AWS SDK/API processor #865

Open razor-x opened 3 years ago

razor-x commented 3 years ago

I find myself needing to hit a lot of AWS services in my pipelines, but since Benthos does not support most of them, I end up deploying lots of little lambda adapter functions and then using aws_lambda. (See below for a full example of how I have to call HeadObject for S3.)

It would be nice to have a way to call out to the AWS SDK, something like

processors:
  - bloblang: |
      root = { "Key": "key", "Bucket": "bucket" }
  - aws_sdk:
      method: HeadObject

If using the AWS SDK generically like this is an issue, an alternative might be to support signed requests to the AWS HTTP APIs.

Current HeadObject Example

First I deploy this Lambda

handlers/s3-head-object.js

const { S3 } = require('aws-sdk')

const s3 = new S3()

module.exports.default = (event, context, callback) => {
  s3.headObject(event, callback)
}

Serverless:

headObject:
  runtime: nodejs14.x
  handler: handlers/s3-head-object.default
  package:
    individually: true
    include:
      - handlers/s3-head-object.js
    exclude:
      - '*'
      - '*/**'

Then I add this boilerplate to my resources (this input happens to be from a bucket notification)

processor_resources:
  - label: head_object
    try:
      - bloblang: |
          let s3Bucket = this.s3.bucket.name
          let s3Key = this.s3.object.key
          meta "s3BucketArn" = this.s3.bucket.arn
          meta "s3Bucket" = $s3Bucket
          meta "s3Key" = $s3Key
          root = {
            "Bucket": $s3Bucket,
            "Key": $s3Key
          }
      - resource: head_object_lambda
      - resource: throw_lambda_function_error
      - bloblang: |
          meta "contentType" = this.ContentType
          meta "contentEncoding" = this.ContentEncoding
          meta "reqId" = this.Metadata."req-id"
          root = {
            "key": meta("s3Key")
          }
  - label: head_object_lambda
    aws_lambda:
      region: ${AWS_REGION}
      function: ${HEAD_OBJECT_LAMBDA:'null'}
  - label: throw_lambda_function_error
    bloblang: |
      root = if meta().exists("lambda_function_error") {
        throw("Invocation failed due to %v: %v".format(this.errorType, this.errorMessage))
      } else {
        content()
      }

Some meta context:

I'm working mostly with Benthos on AWS Lambda, but I feel this feature could be useful in all environments that interact with AWS. Perhaps there are similar options for other cloud providers, but the AWS SDK interfaces are pretty consistent across the different clients, and the SDK tends to mimic the HTTP APIs one to one.

Jeffail commented 3 years ago

Hey @razor-x, I like it, might need a bit of exploration to work out the implementation.