apache / camel-kafka-connector

Camel Kafka Connector allows you to use all Camel components as Kafka Connect connectors
https://camel.apache.org
Apache License 2.0
152 stars 100 forks source link

Information required on Camel file sink connector configurations #1211

Open sammy445 opened 3 years ago

sammy445 commented 3 years ago

I have used CamelFileSInk connector in one of our project and I would like to write each messages that is being read from a Kafka topic by the connector into a new file, meaning each message should be written in new file. I went thru the configuration document and couldn't find much information to achieve my requirement. Could you please let me know if this can be achieved if yes then which configurations to tune?

orpiske commented 3 years ago

I have used CamelFileSInk connector in one of our project and I would like to write each messages that is being read from a Kafka topic by the connector into a new file, meaning each message should be written in new file. I went thru the configuration document and couldn't find much information to achieve my requirement. Could you please let me know if this can be achieved if yes then which configurations to tune?

Hi, have you tried setting the CamelHeader.CamelFileName header for each message produced on you Kafka endpoint? For example, suppose you have 2 messages "a" and "b". For each of those, you set that header with a different file name.

Here's one example of how it do it for the AWS S3 component: https://github.com/apache/camel-kafka-connector/blob/main/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/sink/CamelSinkAWSS3ITCase.java#L74. It's a different component, but the approach should be similar.

sammy445 commented 3 years ago

Hi @orpiske ,

Sorry. I couldn't get the answer completely. Headers are present in the Kafka topic for every message. Here is the configuration of a connector that I am using. How do I set the header configuration? I need to have a file name created along with the Kafka offset. So, what's the parameter to get the Kafka offset? { "name": "camel-file-sink-connector", "connector.class": "org.apache.camel.kafkaconnector.file.CamelFileSinkConnector", "tasks.max": "1", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter", "topics": "camel-file-sink-kafka-topic", "camel.sink.contentLogLevel": "DEBUG", "camel.idempotency.enabled": "false", "camel.sink.path.directoryName": "/user/app/folder", "camel.sink.endpoint.charset": "UTF-8", "camel.sink.endpoint.fileName": "messages-${date:now:yyyyMMdd}.txt",, "camel.sink.endpoint.forceWrites": "true", "camel.sink.endpoint.autoCreate": "true" }

oscerd commented 3 years ago

You need to use a different name for each message. You can either add an header to your message or change the filename configuration in your file. For example you can use the exchangeId