Nordstrom / kafka-connect-sqs

The SQS connector plugin provides the ability to use AWS SQS queues as both a source (from an SQS queue into a Kafka topic) or sink (out of a Kafka topic into an SQS queue).
Apache License 2.0
68 stars 40 forks source link

kafka-connect-sqs

The SQS connector plugin provides the ability to use AWS SQS queues as both a source (from an SQS queue into a Kafka topic) or sink (out of a Kafka topic into an SQS queue).

Compatibility matrix

Connector version Kafka Connect API AWS SDK
1.4 3.1.1 1.12.241
1.5 3.3.2 1.12.409
1.6 3.4.1 1.12.669

Building the distributable

You can build the connector with Maven using the standard lifecycle goals:

mvn clean
mvn package

Source connector

SQS source connector reads from an AWS SQS queue and publishes to a Kafka topic.

Required properties:

Optional properties:

Sample IAM policy

When using this connector, ensure the authentication principal has privileges to read messages from the SQS queue.

{
  "Version": "2012-10-17",
  "Statement": [{
    "Sid": "kafka-connect-sqs-source",
    "Effect": "Allow",
    "Action": [
      "sqs:DeleteMessage",
      "sqs:GetQueueUrl",
      "sqs:ListQueues",
      "sqs:ReceiveMessage"
    ],
    "Resource": "arn:aws:sqs:*:*:*"
  }]
}

Sink connector

SQS sink connector reads from a Kafka topic and publishes to an AWS SQS queue.

Required properties:

Optional properties:

Sample SQS queue policy

Define a corresponding SQS queue policy that allows the connector to send messages to the SQS queue:

{
  "Version": "2012-10-17",
  "Id": "arn:aws:sqs:us-west-2:<AWS_ACCOUNT>:my-queue/SQSDefaultPolicy",
  "Statement": [
    {
      "Sid": "kafka-connect-sqs-sink",
      "Effect": "Allow",
      "Principal": {
        "AWS": "<Your principal ARN>"
      },
      "Action": "sqs:SendMessage",
      "Resource": "arn:aws:sqs:us-west-2:<AWS_ACCOUNT>:my-queue"
    }
  ]
}

Sample IAM policy

When using this connector, ensure the authentication principal has privileges to read messages from the SQS queue.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "kafka-connect-sqs-sink",
      "Effect": "Allow",
      "Action": [
        "sqs:SendMessage"
      ],
      "Resource": "arn:aws:sqs:*:*:*"
    }
  ]
}

AWS authentication

By default, the connector uses the AWS SDK DefaultAWSCredentialsProviderChain to determine the identity of the connector. This works well in simple scenarios when the connector gains privileges granted to the Kafka Connect worker (i.e., environment variables, EC2 instance metadata, etc.)

When the identity of the connector must be separate from the worker, supply an implementation of sqs.credentials.provider.class in the worker's classpath. There are two implementations directly included within this library:

AWSUserCredentialsProvider

Use this credentials provider to cause the connector to authenticate as a specific IAM user.

Required properties:

AWSAssumeRoleCredentialsProvider

Use this credentials provider to cause the connector to assume an IAM role.

Required properties:

Optional properties:

The IAM role will have a corresponding trust policy. For example:

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "AWS": "arn:aws:iam::<AWS_ACCOUNT>:root"
      },
      "Action": "sts:AssumeRole",
      "Condition": {
        "StringEquals": {
          "sts:ExternalId": "my-external-id"
        }
      }
    }
  ]
}

Running the connector

This example demonstrates using the sink connector to send a message to an SQS queue from Kafka.

Now, start the sink connector in standalone mode:

$KAFKA_HOME/bin/connect-standalone.sh \
  config/connect-worker.properties config/sink-connector.properties

Use a tool to produce messages to the Kafka topic.

bin/kafka-console-producer --bootstrap-server localhost:9092 \
    --topic hello-sqs-sink \
    --property parse.headers=true \
    --property 'headers.delimiter=\t'
>test:abc\t{"hello":"world"}