apache / camel-kafka-connector

Camel Kafka Connector allows you to use all Camel components as Kafka Connect connectors
https://camel.apache.org
Apache License 2.0
152 stars 101 forks source link

Ensure Kafka metadata is avaiable in Sink task as in a standalone Kafka Camel source #1292

Open jaapterwoerds opened 2 years ago

jaapterwoerds commented 2 years ago

I was surprised I don't seem to be able to use the key, offset, partition and topic like I could when using Camel standalone. Maybe I'm missing something that 's already there somehow. I think it would be nice to have the same functionality available in the camel kafka connector.

I've got some code laying around so I can use this in a dynamic file name. I couldn't get the escaping of the Kafka header names to work(these constant values contain a dot, so that's why I l replace it by underscores here. I read somewhere this is a know limitation in camel, by I couldn't find the reference to that doc back.

@Override
public void put(Collection<SinkRecord> sinkRecords) {
<..>
    for (SinkRecord record : sinkRecords) {
<..>

        exchange.getMessage().setHeader(KafkaConstants.PARTITION.replace('.','_'), record.kafkaPartition());
        exchange.getMessage().setHeader(KafkaConstants.TOPIC.replace('.','_'), record.topic());
        exchange.getMessage().setHeader(KafkaConstants.OFFSET.replace('.','_'), record.kafkaOffset());
        exchange.getMessage().setHeader(KafkaConstants.HEADERS.replace('.','_'), record.headers());
        exchange.getMessage().setHeader(KafkaConstants.TIMESTAMP.replace('.','_'), record.timestamp());

And in the connector configuration:

"camel.sink.path.fileName":"${header[kafka_TOPIC]}-${header[kafka_PARTITION]}-${header[kafka_OFFSET]}"
oscerd commented 2 years ago

@valdar I think this would make sense.

oscerd commented 2 years ago

@jaapterwoerds thanks for reporting this, I guess we should do something like this in the general Sink connector.

valdar commented 2 years ago

AH nice! +1

valdar commented 2 years ago

@jaapterwoerds would you be interested in contributing this feature (with all guidance and support I gang give) ?