apache / camel-kafka-connector

Camel Kafka Connector allows you to use all Camel components as Kafka Connect connectors
https://camel.apache.org
Apache License 2.0
152 stars 100 forks source link

camel-aws-s3-sink-kafka-connector is showing class error and not pushing data to s3 #1347

Closed stuartm21 closed 2 years ago

stuartm21 commented 2 years ago

Hi @oscerd ,

I am trying to archive my Strimzi Kafka datas to AWS S3 bucket. But I am unable to do this. Kubernetes Version: 1.21

KafkaConnect:

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: strimzi-kafka
  namespace: kafka
  annotations:
    strimzi.io/use-connector-resources: "true"
spec:
  version: 3.1.0
  replicas: 1
  image: docker-hub/camel-aws-s3-kafka-connector:1.0.0
  bootstrapServers: kafka-bootstrap.myhost.com:443
  externalConfiguration:
    volumes:
      - name: aws-credentials
        secret:
          secretName: aws-credentials
  tls:
    trustedCertificates:
      - secretName: strimzi-cluster-ca-cert
        certificate: ca.crt
  authentication:
    type: tls
    certificateAndKey:
      certificate: user.crt
      key: user.key
      secretName: kafka-user
  config:
    config.providers: file
    config.providers.file.class: org.apache.kafka.common.config.provider.FileConfigProvider
    key.converter: org.apache.kafka.connect.json.JsonConverter
    value.converter: org.apache.kafka.connect.json.JsonConverter
    key.converter.schemas.enable: false
    value.converter.schemas.enable: false
    config.storage.replication.factor: 1
    offset.storage.replication.factor: 1
    status.storage.replication.factor: 1

KafkaConnector:

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: s3-sink-connector
  namespace: kafka
  labels:
    strimzi.io/cluster: "strimzi-kafka"
spec:
  class: org.apache.camel.kafkaconnector.CamelSinkConnector
  #class: org.apache.camel.kafkaconnector.awss3sink.CamelAwss3sinkSinkConnector
  tasksMax: 1
  config:
    key.converter: org.apache.kafka.connect.storage.StringConverter
    value.converter: org.apache.kafka.connect.storage.StringConverter
    topics: kafka-topic
    camel.sink.url: aws-s3://test-kafka-connect?keyName=${date:now:yyyyMMdd}/${exchangeId}
    camel.sink.maxPollDuration: 10000
    camel.component.aws-s3.configuration.autocloseBody: false
    camel.component.aws-s3.accessKey: ${file:/opt/kafka/external-configuration/aws-credentials/aws-credentials.properties:aws_access_key_id}
    camel.component.aws-s3.secretKey: ${file:/opt/kafka/external-configuration/aws-credentials/aws-credentials.properties:aws_secret_access_key}
    camel.component.aws-s3.region: ca-central-1

Available Plugins:

aws-java-sdk-core-1.11.1034.jar
aws-java-sdk-s3-1.11.1034.jar
camel-api-3.15.0.jar
camel-aws-s3-kafka-connector-0.8.0.jar
camel-aws-s3-sink-kafka-connector-1.0.0.jar
camel-aws-s3-source-kafka-connector-1.0.0.jar
camel-aws2-s3-3.15.0.jar
camel-base-3.15.0.jar
camel-cloud-3.15.0.jar
camel-core-3.15.0.jar
camel-core-catalog-3.15.0.jar
camel-core-engine-3.15.0.jar
camel-core-languages-3.15.0.jar
camel-direct-3.15.0.jar
camel-file-3.15.0.jar
camel-http-3.15.0.jar
camel-http-common-3.15.0.jar
camel-jaxp-3.0.1.jar
camel-kafka-3.15.0.jar
camel-kafka-connector-1.0.0.jar
camel-kamelets-0.7.1.jar
camel-log-3.15.0.jar
camel-main-3.15.0.jar
camel-management-3.15.0.jar
camel-management-api-3.15.0.jar
camel-rest-3.15.0.jar
camel-seda-3.15.0.jar
camel-sjms-3.15.0.jar
camel-support-3.15.0.jar
camel-timer-3.15.0.jar
camel-tooling-model-3.15.0.jar
camel-util-3.15.0.jar
camel-util-json-3.15.0.jar
commons-codec-1.15.jar
commons-logging-1.2.jar
httpcore-4.4.15.jar
jackson-annotations-2.13.2.jar
jackson-core-2.13.2.jar
jackson-databind-2.13.2.jar
jackson-dataformat-cbor-2.13.2.jar
jaxb-api-2.4.0-b180830.0359.jar
jaxb-core-3.0.2.jar
jaxb-impl-3.0.2.jar
log4j-api-2.17.2.jar
log4j-core-2.17.2.jar
log4j-jcl-2.17.2.jar
log4j-slf4j-impl-2.17.2.jar

Kafka Connect Pod Logs:

2022-03-14 09:43:09,089 ERROR Uncaught exception in REST call to /connectors/s3-sink-connector/config (org.apache.kafka.connect.runtime.rest.errors.ConnectExceptionMapper) [qtp552416003-20]
org.apache.kafka.connect.errors.ConnectException: Failed to find any class that implements Connector and which name matches org.apache.camel.kafkaconnector.CamelSinkConnector, available connectors are: PluginDesc{klass=class org.apache.kafka.connect.file.FileStreamSinkConnector, name='org.apache.kafka.connect.file.FileStreamSinkConnector', version='3.1.0', encodedVersion=3.1.0, type=sink, typeName='sink', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.file.FileStreamSourceConnector, name='org.apache.kafka.connect.file.FileStreamSourceConnector', version='3.1.0', encodedVersion=3.1.0, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.mirror.MirrorCheckpointConnector, name='org.apache.kafka.connect.mirror.MirrorCheckpointConnector', version='1', encodedVersion=1, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.mirror.MirrorHeartbeatConnector, name='org.apache.kafka.connect.mirror.MirrorHeartbeatConnector', version='1', encodedVersion=1, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.mirror.MirrorSourceConnector, name='org.apache.kafka.connect.mirror.MirrorSourceConnector', version='1', encodedVersion=1, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockConnector, name='org.apache.kafka.connect.tools.MockConnector', version='3.1.0', encodedVersion=3.1.0, type=connector, typeName='connector', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockSinkConnector, name='org.apache.kafka.connect.tools.MockSinkConnector', version='3.1.0', encodedVersion=3.1.0, type=sink, typeName='sink', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockSourceConnector, name='org.apache.kafka.connect.tools.MockSourceConnector', version='3.1.0', encodedVersion=3.1.0, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.SchemaSourceConnector, name='org.apache.kafka.connect.tools.SchemaSourceConnector', version='3.1.0', encodedVersion=3.1.0, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.VerifiableSinkConnector, name='org.apache.kafka.connect.tools.VerifiableSinkConnector', version='3.1.0', encodedVersion=3.1.0, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.VerifiableSourceConnector, name='org.apache.kafka.connect.tools.VerifiableSourceConnector', version='3.1.0', encodedVersion=3.1.0, type=source, typeName='source', location='classpath'}
  at org.apache.kafka.connect.runtime.isolation.Plugins.connectorClass(Plugins.java:200)
  at org.apache.kafka.connect.runtime.isolation.Plugins.newConnector(Plugins.java:172)
  at org.apache.kafka.connect.runtime.AbstractHerder.lambda$getConnector$4(AbstractHerder.java:653)
  at java.base/java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1705)
  at org.apache.kafka.connect.runtime.AbstractHerder.getConnector(AbstractHerder.java:653)
  at org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:426)
  at org.apache.kafka.connect.runtime.AbstractHerder.lambda$validateConnectorConfig$2(AbstractHerder.java:362)
  at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
  at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
  at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
  at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
  at java.base/java.lang.Thread.run(Thread.java:829)

I noticed that there is no errors on the pod after I created the KafkaConnect. But when I create the KafkaConnect, I am getting the above errors on the KafkaConnect pod. Can you please tell me which plugin is causing the issues and how can I resolve this issue? Also can you tell me if any unnecessary plugins added here to remove from the docker image?

oscerd commented 2 years ago

You have multiple versions of the connectors

camel-aws-s3-kafka-connector-0.8.0.jar camel-aws-s3-sink-kafka-connector-1.0.0.jar camel-aws-s3-source-kafka-connector-1.0.0.jar

If you use a version < 1.0.0 then the configuration could be like the one you posted, otherwise if the version is >= 1.0.0 then you have to use the new parameters from kamelet: https://camel.apache.org/camel-kafka-connector/1.0.x/reference/connectors/camel-aws-s3-sink-kafka-sink-connector.html

I think it is picking up the 1.0.0 version at this stage, but use just one of the versions, 0.8.0 or 1.0.0

stuartm21 commented 2 years ago

@oscerd The camel-aws-s3-kafka-connector-0.8.0.jar which I used the latest version of the camel-aws-s3-kafka-connector. I was downloaded the same from the following URL. FOr camel-aws-s3-kafka-connector =>1.0 version is available, please let me know the URL for download that. I can test that too.

https://mvnrepository.com/artifact/org.apache.camel.kafkaconnector/camel-aws-s3-kafka-connector

oscerd commented 2 years ago

What I'm saying is that you have multiple version of the same connector in the plugin.path location. You need to have 0.8.0 or 1.0.0, not both. Also you need to clean up the plugin.path location. You need to have a single folder with the content of the following archive uncompressed in it. Download the package from here: https://camel.apache.org/camel-kafka-connector/0.11.x/reference/index.html

The LTS version supported is 0.11.x, so please use that version. From 1.0 ahead we changed the underline feature and we switched to Kamelets, so the options are different and even their name https://camel.apache.org/camel-kafka-connector/1.0.x/reference/index.html

In the end in your plugin.path location you should do something like:

cd /<plugin.path-location>/
tar -xzf <downloaded-connector-tar.gz>
stuartm21 commented 2 years ago

Thanks @oscerd. But I worul like to go forward with 1.0.0 version. Because I noticed somewhere there are some vulnerability on the old version. We can go to kamlets. I prepared the connector yaml. I am pasting that here:

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: s3-sink-connector
  namespace: kafka
  labels:
    strimzi.io/cluster: "strimzi-kafka"
spec:
  class: org.apache.camel.kafkaconnector.awss3sink.CamelAwss3sinkSinkConnector
  tasksMax: 2
  config:
    key.converter: org.apache.kafka.connect.storage.StringConverter
    value.converter: org.apache.kafka.connect.storage.StringConverter
    topics: kafka-topic
    camel.kamelet.aws-s3-sink.bucketNameOrArn: test-kafka-connect
    camel.sink.maxPollDuration: 10000
    camel.kamlet.aws-s3.configuration.autocloseBody: false
    camel.kamelet.aws-s3-sink.accessKey: ${file:/opt/kafka/external-configuration/aws-credentials/aws-credentials.properties:aws_access_key_id}
    camel.kamelet.aws-s3-sink.secretKey: ${file:/opt/kafka/external-configuration/aws-credentials/aws-credentials.properties:aws_secret_access_key}
    camel.kamelet.aws-s3-sink.region: ca-central-1

But I noticed that "camel-aws-s3-kafka-connector" not in the https://camel.apache.org/camel-kafka-connector/1.0.x/reference/index.html . So where we will get the camel-aws-s3-kafka-connector 1.x.x version?

oscerd commented 2 years ago

It seems you're using only the sink connector, so you should use: https://repo.maven.apache.org/maven2/org/apache/camel/kafkaconnector/camel-aws-s3-sink-kafka-connector/0.11.0/camel-aws-s3-sink-kafka-connector-0.11.0-package.tar.gz

stuartm21 commented 2 years ago

@oscerd Page not found

oscerd commented 2 years ago

https://repo.maven.apache.org/maven2/org/apache/camel/kafkaconnector/camel-aws-s3-sink-kafka-connector/1.0.0/camel-aws-s3-sink-kafka-connector-1.0.0-package.tar.gz

oscerd commented 2 years ago

We need to update the documentation. Sorry for the inconvenience

stuartm21 commented 2 years ago

@oscerd Awesome! Thank you very much. The issue got resolved and now it is archiving the data to S3 bucket. I have few queries too. Can you please help me on that?

  1. We have multiple kafka topics for each kafka users. Also we are using MTLS for the communication. So how could I use the the KafkaConnect and KafkaConnector in our scenario? Do we need to use KafkaConnect and KafkaConnector for each user?
  2. If the kafka/topics crashed, we need to restore the topics/data from S3 bucket. So can you share the "camel-aws-s3-source-kafka-connector-1.0.0-package.tar.gz" URL here?
oscerd commented 2 years ago

Hello,

  1. Yes, you need to define multiple connectors, if a user wants to write on multiple topics he could use the field as comma separated topic names
  2. The links have been fixed so you'll find the correct archives in the connectors page https://camel.apache.org/camel-kafka-connector/1.0.x/reference/index.html
stuartm21 commented 2 years ago

@oscerd Thank you. The link is not working. Still showing as "PAGE NOT FOUND". Can you share the camel-aws-s3-source-kafka-connector-1.0.0 direct link here?

oscerd commented 2 years ago

I updated the link in the above comment

stuartm21 commented 2 years ago

@oscerd Yes. Now I can access the link. Also I need one help, How can I use the camel-aws-s3-source-kafka-connector? I meant the KafkaConnect and KafkaConnector yamls. Is that only change in the KafkaConnector like this?

class=org.apache.camel.kafkaconnector.awss3source.CamelAwss3sourceSourceConnector

or any other parameters needs to change?

oscerd commented 2 years ago

The parameter names are different so you should change them: https://camel.apache.org/camel-kafka-connector/1.0.x/reference/connectors/camel-aws-s3-source-kafka-source-connector.html

oscerd commented 2 years ago

If you have other troubles re-open.

stuartm21 commented 2 years ago

@oscerd Can you please re-open this ticket for me, I have a few requirements here. As of now the sink-connector is working and it is storing the data to S3 bucket, but the problem is that not a formatted way. So how can we organize that like below?

test-s3-bucket tree
.
└── topics
    ├── customer-1
    │   ├── 2021
    │   │   ├── 01
    │   │   │   ├── 01-02
    │   │   │   │   └── customer-1+0+0000000000.json.gz
    │   │   │   ├── 02-03
    │   │   │   ├── 03-04
    │   │   │   ├── 04-05
    │   │   │   ├── 05-06
    │   │   │   ├── 06-07
    │   │   │   ├── 07-08
    │   │   │   └── 08-09
    │   │   ├── 02
    │   │   ├── 03
    │   │   ├── 04
    │   │   ├── 05
    │   │   ├── 06
    │   │   ├── 07
    │   │   ├── 08
    │   │   ├── 09
    │   │   ├── 10
    │   │   ├── 11
    │   │   └── 12
    │   └── 2022
    │       ├── 01
    │       ├── 02
    │       ├── 03
    │       ├── 04
    │       ├── 05
    │       ├── 06
    │       ├── 07
    │       ├── 08
    │       ├── 09
    │       ├── 10
    │       ├── 11
    │       └── 12
    ├── customer-2
    │   ├── 2021
    │   └── 2022
    └── customer-3
        ├── 2021
        └── 2022

The order is like this way: test-s3-bucket -> topics -> customers -> years -> months -> days -> hours -> then store customer topics. How can we store the topics in s3 bucket like this way?

oscerd commented 2 years ago

You could do something like:

camel.sink.endpoint.keyName=topics/${headers.customerName}/${date:now:yyyy}${date:now:MM}//${headers.interval}/{headers.name}

And with each of the record you're sending you'll need to set two headers

echo "hello there" | ./kafkacat -b localhost:9092 -H "CamelHeader.customerName=customer" -H "CamelHeader.interval=01-02" -t mytopic

I didn't test this stuff, but it should be a solution

stuartm21 commented 2 years ago

@oscerd Thanks for the replay. I tried below alone in the Kafka connect

camel.sink.endpoint.keyName: topics/${headers.customerName}/${date:now:yyyy}${date:now:MM}//${headers.interval}/{headers.name}

and I could see that the data archiving the outside of topics folder in S3 with a randomly generated name like this.

C04CAF97044F24C-0000000000000
oscerd commented 2 years ago

This is because you're using the 1.0.x version based on Kamelet. There is no keyName option there.

echo "hello there" | ./kafkacat -b localhost:9092 -H "CamelHeader.customerName=customer" -H "CamelHeader.interval=01-02" -t mytopic

This command is how you should send your kafka record to your topic before the sink connector start consuming messages from it.

oscerd commented 2 years ago

I don't think, as of today, there is way with the 1.0.x version to do what you're asking directly.

stuartm21 commented 2 years ago

@oscerd -> This is because you're using the 1.0.x version based on Kamelet. There is no keyName option there. Any Solution for this? -> echo "hello there" | ./kafkacat -b localhost:9092 -H "CamelHeader.customerName=customer" -H "CamelHeader.interval=01-02" -t mytopic

Where I can place this? Inside kafkaconnect yaml ?

So what we will do for this? Downgrade the version does work?

oscerd commented 2 years ago

It should work with 0.11.x but you need to change the parameters name.

That's the command to send data to your Kafka topic. I don't know how you are ingesting data to Kafka topic.

You'll need headers to create dynamic prefix .

stuartm21 commented 2 years ago

@oscerd Ok, Can you share the working url of 0.11.x and exact parameter url also here?

oscerd commented 2 years ago

https://camel.apache.org/camel-kafka-connector/0.11.x/reference/index.html

Look for the connector and there are the parameters.

stuartm21 commented 2 years ago

@oscerd I think you shared a wrong url https://camel.apache.org/camel-kafka-connector/0.11.x/reference/index.html I couldn't find the "camel-aws-s3-sink-kafka-connector" on the above url and the above url only contain "camel-aws2-s3-kafka-connector", so I downloaded the tar file and extracted it, but the sink connector is also not available there

oscerd commented 2 years ago

It's one tar only with both sink and source connector inside. The connector class name and package is different. Look at the content and check the example configuration file.

oscerd commented 2 years ago

It's aws2 because in 1.0.x we renamed all the connector related to AWS from aws2 to aws prefix.

oscerd commented 2 years ago

This is a configuration extracted from an example

name=CamelAWS2S3SinkConnector
connector.class=org.apache.camel.kafkaconnector.aws2s3.CamelAws2s3SinkConnector
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter

topics=mytopic

camel.sink.path.bucketNameOrArn=camel-kafka-connector

camel.component.aws2-s3.accessKey=xxxx
camel.component.aws2-s3.secretKey=yyyy
camel.component.aws2-s3.region=eu-west-1

camel.sink.endpoint.keyName=${date:now:yyyyMMdd-HHmmssSSS}-${exchangeId}
stuartm21 commented 2 years ago

@oscerd I am sorry, I got confused with your queries. As of now we are planning to downgrade the from 1.x to 0.11.x becuase of we need the dates and time in the data. I was checked url which you previously provided and the inside the "camel-aws2-s3-kafka-connector" I couldn't find the sink or source jar files. These are the availabe jar file on the "camel-aws2-s3-kafka-connector" for 0.11

annotations-13.0.jar
annotations-2.16.88.jar
apache-client-2.16.88.jar
apicurio-registry-common-1.3.2.Final.jar
apicurio-registry-rest-client-1.3.2.Final.jar
apicurio-registry-utils-converter-1.3.2.Final.jar
apicurio-registry-utils-serde-1.3.2.Final.jar
arns-2.16.88.jar
auth-2.16.88.jar
avro-1.10.2.jar
aws-core-2.16.88.jar
aws-query-protocol-2.16.88.jar
aws-xml-protocol-2.16.88.jar
camel-api-3.11.5.jar
camel-aws2-s3-3.11.5.jar
camel-aws2-s3-kafka-connector-0.11.5.jar
camel-base-3.11.5.jar
camel-base-engine-3.11.5.jar
camel-core-engine-3.11.5.jar
camel-core-languages-3.11.5.jar
camel-core-model-3.11.5.jar
camel-core-processor-3.11.5.jar
camel-core-reifier-3.11.5.jar
camel-direct-3.11.5.jar
camel-jackson-3.11.5.jar
camel-kafka-3.11.5.jar
camel-kafka-connector-0.11.5.jar
camel-main-3.11.5.jar
camel-management-api-3.11.5.jar
camel-seda-3.11.5.jar
camel-support-3.11.5.jar
camel-util-3.11.5.jar
commons-codec-1.15.jar
commons-compress-1.20.jar
commons-logging-1.2.jar
connect-json-2.6.0.jar
converter-jackson-2.9.0.jar
eventstream-1.0.1.jar
gson-2.8.7.jar
http-client-spi-2.16.88.jar
httpclient-4.5.13.jar
httpcore-4.4.14.jar
jackson-annotations-2.12.3.jar
jackson-core-2.12.3.jar
jackson-databind-2.12.3.jar
jackson-dataformat-avro-2.12.2.jar
jackson-datatype-jdk8-2.12.2.jar
javax.annotation-api-1.3.2.jar
jboss-jaxrs-api_2.1_spec-2.0.1.Final.jar
jctools-core-3.3.0.jar
kafka-clients-2.8.0.jar
kotlin-reflect-1.3.20.jar
kotlin-stdlib-1.3.20.jar
kotlin-stdlib-common-1.3.20.jar
lz4-java-1.7.1.jar
medeia-validator-core-1.1.1.jar
medeia-validator-jackson-1.1.1.jar
metrics-spi-2.16.88.jar
netty-buffer-4.1.66.Final.jar
netty-codec-4.1.66.Final.jar
netty-codec-http-4.1.66.Final.jar
netty-codec-http2-4.1.66.Final.jar
netty-common-4.1.66.Final.jar
netty-handler-4.1.66.Final.jar
netty-nio-client-2.16.88.jar
netty-reactive-streams-2.0.5.jar
netty-reactive-streams-http-2.0.5.jar
netty-resolver-4.1.66.Final.jar
netty-transport-4.1.66.Final.jar
netty-transport-native-epoll-4.1.66.Final-linux-x86_64.jar
netty-transport-native-unix-common-4.1.66.Final.jar
okhttp-4.8.1.jar
okio-2.7.0.jar
profiles-2.16.88.jar
protobuf-java-3.13.0.jar
protocol-core-2.16.88.jar
reactive-streams-1.0.3.jar
regions-2.16.88.jar
retrofit-2.9.0.jar
s3-2.16.88.jar
sdk-core-2.16.88.jar
slf4j-api-1.7.30.jar
snappy-java-1.1.8.1.jar
utils-2.16.88.jar
zstd-jni-1.4.9-1.jar
oscerd commented 2 years ago

As I said in the Tar.gz there are sink and source together. In 1.0.x the connectors are in two different Tar.

In this JAR you have both the sink and source classes: camel-aws2-s3-kafka-connector-0.11.5.jar

stuartm21 commented 2 years ago

@oscerd The data are archiving to S3 bucket now with the naming of "20220324-062313122-FA895CFA87E6-0000000000000000)". So as we discussed before, we need to move the topics to a "topics" directory and inside that we need to segregate the with year -> Month -> day -> time like this way too.

KafkaConnector:

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: s3-sink-connector
  namespace: kafka
  labels:
    strimzi.io/cluster: "strimzi-kafka"
spec:
  class: org.apache.camel.kafkaconnector.aws2s3.CamelAws2s3SinkConnector
  tasksMax: 2
  config:
    key.converter: org.apache.kafka.connect.storage.StringConverter
    value.converter: org.apache.kafka.connect.storage.StringConverter
    topics: kafka-topic
    camel.sink.path.bucketNameOrArn: test-kafka-connect
    camel.sink.endpoint.keyName: ${date:now:yyyyMMdd-HHmmssSSS}-${exchangeId}
    camel.sink.maxPollDuration: 10000
    camel.component.aws2-s3.configuration.autocloseBody: false
    camel.component.aws2-s3.accessKey: ${file:/opt/kafka/external-configuration/aws-credentials/aws-credentials.properties:aws_access_key_id}
    camel.component.aws2-s3.secretKey: ${file:/opt/kafka/external-configuration/aws-credentials/aws-credentials.properties:aws_secret_access_key}
    camel.component.aws2-s3.region: ca-central-1
oscerd commented 2 years ago

Try with

camel.sink.endpoint.keyName=topics/${headers.customerName}/${date:now:yyyy}${date:now:MM}//${headers.interval}/{headers.name}

And send the headers in your Kafka topic. Another possibility is creating an SMT and enrich the message with those data and add keyName header.

stuartm21 commented 2 years ago

@oscerd Tried with that. Not working at least the data is not archiving to S3.

Will this help us: camel.sink.endpoint.headerFilterStrategy ?

Also can you explain this bit more: And send the headers in your Kafka topic. Another possibility is creating an SMT and enrich the message with those data and add keyName header.

oscerd commented 2 years ago

No, it won't help.

If you need to have different keyName for different customers, you'll need to provide the customer name dynamically. The only way to do that, is enriching the Kafka record in your topic with some headers like customerName and something like interval. Once your sink connector will start to consume the record from kafka, the header will be used to form the name.

The other solution is having a connector for each customer.

An SMT is Single Message Transformation, before the message reaches the sink connector, the SMT will join the game and you'll be able to modify the record and add more data or manipulate the current record. There are many examples if you search.

I don't see other way except using headers or creating an SMT.

oscerd commented 2 years ago

I can help, but I don't have time to check every possible configuration. Your use case is particular, saying tried and it's not working, doesn't help. What is the error? Did you check the logs? Did you try something on your side? etc.

stuartm21 commented 2 years ago

@oscerd I have made a few changes in the endpoint like below:

camel.sink.endpoint.keyName: topics/${headers.kafka-user}/${date:now:yyyy}${date:now:MM}//${headers.interval}/{headers.name}

kafka-user is my test clustomer

and the output I got like this in S3:

test-kafka-connect/topics//202203//{headers.name}
oscerd commented 2 years ago

If you don't have the headers in the record it's normal they won't appear in the file name

stuartm21 commented 2 years ago

@oscerd Can we use the same docker image(included camel-aws2-sink plugins) to build SMT plugins? We are using Strimzi kafka Also where can we find the SMT plugin to download?

https://stackoverflow.com/questions/63180653/is-there-a-way-to-add-transformers-to-kafka-strimzi-mirrormaker2
oscerd commented 2 years ago

You'll need to extend the current connector first and regenerate the archive. Then you'll need to provide the connector in your Strimzi cluster

stuartm21 commented 2 years ago

@oscerd Thanks. I found many of them are using SMT. But I couldn't where we can download that. Can you share a suitable link for my solution?

Also if I build the Kafka connect with both sink and SMT, Can I use both class inside the KafkaConnector?

oscerd commented 2 years ago

You need to write your own SMT for your usecase.

Yes, but you'll need to rebuild the connector tar including your SMT class.