apache / camel-kafka-connector

Camel Kafka Connector allows you to use all Camel components as Kafka Connect connectors
https://camel.apache.org
Apache License 2.0
152 stars 100 forks source link

Using kinesis sink with STS assumeRole credentials? #1272

Closed duanasq closed 2 years ago

duanasq commented 2 years ago

Hi there. Any tips on how I could configure/extend the connector to use sts assumeRole credentials?

The camel kinesis component says you can "pass in a different AWSCredentialsProvider when calling createClient(…​)" (see docs here) but I can't figure out how to do that.

Another option looked like I could specify a custom awsKinesisClient with the config option camel.sink.endpoint.amazonKinesisClient but it's not clear how I can build one, as I need to use a builder, and I don't know how autowiring should work.

I also tried the instructions here to try to extend the connector somehow, but I think that is only if I wanted to add converters or transforms.

Grateful for any recommendations, thank you.

oscerd commented 2 years ago

You can use the credential provider chain of all the AWS Client, by enabling the defaultCredentialProvider option and adding the correct jars in an extended connector. There is no way of doing the assumeRole operation while building the client, the option to specify an instance of the Kinesis client is only related to a single instance (through constructor) but since the client is using builder, it's not possible to do that through options. If you need to do assumeRole operation before doing anything else, my suggestion is using plain camel with Kafka, instead of using camel-kafka-connector.

duanasq commented 2 years ago

You can use the credential provider chain of all the AWS Client, by enabling the defaultCredentialProvider option and adding the correct jars in an extended connector.

Are you saying that I could maybe use an aws credential provider that supports assumeRole and package it using an extended connector?

If you need to do assumeRole operation before doing anything else, my suggestion is using plain camel with Kafka, instead of using camel-kafka-connector.

In that case it might be easier for me to build a basic kafka consumer.

Thanks heaps for your replies!

oscerd commented 2 years ago

You can use the credential provider chain of all the AWS Client, by enabling the defaultCredentialProvider option and adding the correct jars in an extended connector.

Are you saying that I could maybe use an aws credential provider that supports assumeRole and package it using an extended connector?

No, I'm saying that you can use one of the following credentials provider https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/credentials.html#credentials-chain

You cannot invoke an AssumeRole operation, through STS client, directly in the connector configuration. It's not feasible.

If you need to do assumeRole operation before doing anything else, my suggestion is using plain camel with Kafka, instead of using camel-kafka-connector.

In that case it might be easier for me to build a basic kafka consumer.

Thanks heaps for your replies!

gaddam1987 commented 2 years ago

@oscerd Can i create a pull request with a new CredentialsProvider which can handle AssumedRole. It needs to be dont on camel-aws-components and also it makes sense to add STS library to all the camel aws components

oscerd commented 2 years ago

I don't quite get where do you want to add the sts bits

gaddam1987 commented 2 years ago

We added this : - "mvn:software.amazon.awssdk:sts:2.17.152" in the dependencies of the kamlets and maintainng our version now and also we need to update the kamlet to convert the body into byte[] as there was issue converting aByteArrayInputStream (https://github.com/apache/camel/blob/main/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java#L208) as there was no default converters with in kafka which can convert InputStream to byte[].

# ---------------------------------------------------------------------------
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ---------------------------------------------------------------------------

apiVersion: camel.apache.org/v1alpha1
kind: Kamelet
metadata:
  name: aws-kinesis-source
  annotations:
    camel.apache.org/kamelet.support.level: "Stable"
    camel.apache.org/catalog.version: "main-SNAPSHOT"
    camel.apache.org/kamelet.icon: "data:image/svg+xml;base64,PHN2ZyBpZD0iTGF5ZXJfMSIgZGF0YS1uYW1lPSJMYXllciAxIiB4bWxucz0iaHR0cDovL3d3dy53My5vcmcvMjAwMC9zdmciIHZpZXdCb3g9IjAgMCAyNTYgMzA4LjIzNDAxIj48dGl0bGU+YXdzLWtpbmVzaXM8L3RpdGxlPjxwYXRoIGQ9Ik0wLDE3Mi4wODdsMTI3Ljc1NCw1OC44MSwxMjcuNzUyLTU4LjgxLTEyNy43NTItNS4yOTNaIiB0cmFuc2Zvcm09InRyYW5zbGF0ZSgwIDAuMDAwMDUpIiBmaWxsPSIjZmNiZjkyIi8+PHBhdGggZD0iTTEyOC4xNDcsMCwuMDU5LDYzLjg4MXY5MC4xMzZIMTUzLjY0OFYxMi43NTFaIiB0cmFuc2Zvcm09InRyYW5zbGF0ZSgwIDAuMDAwMDUpIiBmaWxsPSIjOWQ1MDI1Ii8+PHBhdGggZD0iTS4wNTksMjE3LjU1OWwxMjguMTYyLDkwLjY3NUwyNTYsMjE3LjU1OSwxMjcuOTQ1LDE5OC45MjZaIiB0cmFuc2Zvcm09InRyYW5zbGF0ZSgwIDAuMDAwMDUpIiBmaWxsPSIjZmNiZjkyIi8+PHBhdGggZD0iTTEyOC4xNDYsMTU0LjAxN2g2Ny41NzdWNTcuODM2TDE3NS45OSw0OS45NDMsMTI4LjE0Niw2My44OThaIiB0cmFuc2Zvcm09InRyYW5zbGF0ZSgwIDAuMDAwMDUpIiBmaWxsPSIjOWQ1MDI1Ii8+PHBhdGggZD0iTTE3NS45OSwxNTQuMDE3aDUyLjIzM1Y5MS42MzJsLTE0Ljk0LTQuNDgxLTM3LjI5Myw2LjMzWiIgdHJhbnNmb3JtPSJ0cmFuc2xhdGUoMCAwLjAwMDA1KSIgZmlsbD0iIzlkNTAyNSIvPjxwYXRoIGQ9Ik0yMTMuMjgyLDgyLjI2djcxLjc1N2g0Mi4yMjRMMjU2LDgxLjk0MWwtMTIuODI2LTUuMTI0WiIgdHJhbnNmb3JtPSJ0cmFuc2xhdGUoMCAwLjAwMDA1KSIgZmlsbD0iIzlkNTAyNSIvPjxwYXRoIGQ9Ik0xMjguMTQ3LDBWMTU0LjAxN2gyNS41VjEyLjc1MVoiIHRyYW5zZm9ybT0idHJhbnNsYXRlKDAgMC4wMDAwNSkiIGZpbGw9IiNmNjg1MzQiLz48cGF0aCBkPSJNMTk1LjcyNCw1Ny44MzZsLTE5LjczMy03Ljg5NFYxNTQuMDE3aDE5LjczMloiIHRyYW5zZm9ybT0idHJhbnNsYXRlKDAgMC4wMDAwNSkiIGZpbGw9IiNmNjg1MzQiLz48cGF0aCBkPSJNMjI4LjIyNCw5MS42MzJsLTE0Ljk0MS00LjQ4djY2Ljg2NWgxNC45NFoiIHRyYW5zZm9ybT0idHJhbnNsYXRlKDAgMC4wMDAwNSkiIGZpbGw9IiNmNjg1MzQiLz48cGF0aCBkPSJNMjQzLjE3NCwxNTQuMDE3SDI1NlY4MS45NDFsLTEyLjgyNi01LjEyNFoiIHRyYW5zZm9ybT0idHJhbnNsYXRlKDAgMC4wMDAwNSkiIGZpbGw9IiNmNjg1MzQiLz48cGF0aCBkPSJNMTI3Ljc1NCwxODQuODYzdjQ2LjAzM2wxMjcuNzUyLTMxLjg0NFYxNzIuMDg3WiIgdHJhbnNmb3JtPSJ0cmFuc2xhdGUoMCAwLjAwMDA1KSIgZmlsbD0iI2Y2ODUzNCIvPjxwYXRoIGQ9Ik0xMjcuNzU0LDI2Mi43ODF2NDUuNDUzTDI1NiwyNDQuMTE0VjIxNy41NloiIHRyYW5zZm9ybT0idHJhbnNsYXRlKDAgMC4wMDAwNSkiIGZpbGw9IiNmNjg1MzQiLz48cGF0aCBkPSJNLjA1OSwyNDQuMzlsMTI3LjY5NSw2My44NDRWMjYyLjQ0OEwuMDU4LDIxNy41NThaIiB0cmFuc2Zvcm09InRyYW5zbGF0ZSgwIDAuMDAwMDUpIiBmaWxsPSIjOWQ1MDI1Ii8+PHBhdGggZD0iTTAsMTk5LjA1MWwxMjcuNzU0LDMxLjg0NVYxODQuODYyTDAsMTcyLjA4NloiIHRyYW5zZm9ybT0idHJhbnNsYXRlKDAgMC4wMDAwNSkiIGZpbGw9IiM5ZDUwMjUiLz48L3N2Zz4="
    camel.apache.org/provider: "Apache Software Foundation"
    camel.apache.org/kamelet.group: "AWS Kinesis"
  labels:
    camel.apache.org/kamelet.type: "source"
spec:
  definition:
    title: "AWS Kinesis Source"
    description: |-
      Receive data from AWS Kinesis.

      The basic authentication method for the Kinesis service is to specify an access key and a secret key. These parameters are optional because the Kamelet provides a default credentials provider.

      If you use the default credentials provider, the Kinesis client loads the credentials through this provider and doesn't use the basic authentication method.
    required:
      - stream
      - region
    type: object
    properties:
      stream:
        title: Stream Name
        description: The Kinesis stream that you want to access. The Kinesis stream that you specify must already exist.
        type: string
      accessKey:
        title: Access Key
        description: The access key obtained from AWS.
        type: string
        format: password
        x-descriptors:
        - urn:alm:descriptor:com.tectonic.ui:password
        - urn:camel:group:credentials
      secretKey:
        title: Secret Key
        description: The secret key obtained from AWS.
        type: string
        format: password
        x-descriptors:
        - urn:alm:descriptor:com.tectonic.ui:password
        - urn:camel:group:credentials
      region:
        title: AWS Region
        description: The AWS region to access.
        type: string
        example: eu-west-1
        enum: ["af-south-1", "ap-east-1", "ap-northeast-1", "ap-northeast-2", "ap-northeast-3", "ap-south-1", "ap-southeast-1", "ap-southeast-2", "ap-southeast-3", "ca-central-1", "eu-central-1", "eu-north-1", "eu-south-1", "eu-west-1", "eu-west-2", "eu-west-3", "fips-us-east-1", "fips-us-east-2", "fips-us-west-1", "fips-us-west-2", "me-south-1", "sa-east-1", "us-east-1", "us-east-2", "us-west-1", "us-west-2", "cn-north-1", "cn-northwest-1", "us-gov-east-1", "us-gov-west-1", "us-iso-east-1", "us-iso-west-1", "us-isob-east-1"]
      useDefaultCredentialsProvider:
        title: Default Credentials Provider
        description: If true, the Kinesis client loads credentials through a default credentials provider. If false, it uses the basic authentication method (access key and secret key).
        type: boolean
        x-descriptors:
        - 'urn:alm:descriptor:com.tectonic.ui:checkbox'
        default: false
      uriEndpointOverride:
        title: Overwrite Endpoint URI
        description: The overriding endpoint URI. To use this option, you must also select the `overrideEndpoint` option.
        type: string
      overrideEndpoint:
        title: Endpoint Overwrite
        description: Select this option to override the endpoint URI. To use this option, you must also provide a URI for the `uriEndpointOverride` option.
        type: boolean
        x-descriptors:
          - 'urn:alm:descriptor:com.tectonic.ui:checkbox'
        default: false
      delay:
        title: Delay
        description: The number of milliseconds before the next poll of the selected stream.
        type: integer
        default: 500
  types:
    out:
      mediaType: application/octet-stream
  dependencies:
    - "camel:aws2-kinesis"
    - "camel:kamelet"
    - "camel:core"
    **_- "mvn:software.amazon.awssdk:sts:2.17.152"_**
  template:
    from:
      uri: aws2-kinesis:{{stream}}
      parameters:
        secretKey: "{{?secretKey}}"
        accessKey: "{{?accessKey}}"
        region: "{{region}}"
        useDefaultCredentialsProvider: "{{useDefaultCredentialsProvider}}"
        uriEndpointOverride: "{{?uriEndpointOverride}}"
        overrideEndpoint: "{{overrideEndpoint}}"
        delay: "{{delay}}"
      steps:
      **_- convert-body-to:
          type: byte[]_**
      - to: "kamelet:sink"
gaddam1987 commented 2 years ago

I would like to add STS lib in this file https://github.com/apache/camel/blob/main/components/camel-aws/camel-aws2-kinesis/pom.xml (and also for all the aws components) because its component which exposes the property useDefaultCredentialsProvider (https://github.com/apache/camel/blob/main/components/camel-aws/camel-aws2-kinesis/src/main/docs/aws2-kinesis-component.adoc#usage) and its the reponsibility of the component to support all the supported credentials by DefaultCredentialsProvider. I can create a pull request for tha but just want to be sure if thats okie

oscerd commented 2 years ago

No, it's not responsability of the component to support all the credentials provider. Why the component should bring another jar, if not used? The point is having the dependency in case of usage and you have all the possibility of adding it in the kamelet, in your POM, in your connector classpath, so no, definitely it's not responsability of the component. The component should support it and it is doing it.

For the byte[] payload: the component should return an inputStream because it's a good way to pass data between different endpoint. Kafka is just one of the component and camel kafka connector is one of the runtime, we cannot change the payload just because a converter for kafka runtime doesn't exist. I would write a different converter and places it somewhere.

It's not a big deal to add a dependency, but to me stating "it's responsibility of the component to support all the credentials provider" doesn't make sense. It must provide the feature, not making an uber-jar with all AWS SDKs just because potentially someone could use it.

gaddam1987 commented 2 years ago

But the problem is in the context of kafka-connect (any other application we can just add a dependency in pom for sts) as there is an uberjar provided so may be there should be way to add STS in that uber jar (Just thinking loudly). English is my second language so i agree the statement it's responsibility of the component to support all the credentials provider looks not so nice. But what i just thinking is whether is there a way to make it work without adding that additional step i would like to contribute if you have any suggestions. And also for the byte[] payload there used to be transformer (https://github.com/apache/camel-kafka-connector/blob/camel-kafka-connector-0.11.5/connectors/camel-aws2-kinesis-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2kinesis/transformers/KinesisRecordDataTransforms.java) which knows how to convert Record -> byte[] but is there a way to handle that now. We could write our custom transformer but if there is a way to contribute that back i will happy to do that.