apache / camel-kafka-connector

Camel Kafka Connector allows you to use all Camel components as Kafka Connect connectors
https://camel.apache.org
Apache License 2.0
151 stars 100 forks source link

CAMEL ADLS Connector Aggregation Based on Timeout #1672

Open Shreya21Mishra opened 2 hours ago

Shreya21Mishra commented 2 hours ago

Hi Let's say we 10 messages. After a timer of 5 mins elapses the 10 messages should get aggregated and written as one single message into the ADLS. We have an aggregation logic already in SMT. But camel.aggregation.timeout=5000 does not seem to work along with the aggregation logic. I also tested it out without the SMT customisation but that does not work as well. Could you please point out if I am missing anything

oscerd commented 2 hours ago

You need to provide your configuration

Shreya21Mishra commented 2 hours ago

Connector Configuration:

{
  "name": "Connector-for-Custom-Timer",
  "config": {
    "connector.class": "org.apache.camel.kafkaconnector.azurestoragedatalakesink.CamelAzurestoragedatalakesinkSinkConnector",
    "tasks.max": "1",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.storage.StringConverter",
    "transforms": "sinkAsAvro",
    "errors.tolerance": "none",
    "errors.log.enable": "true",
    "errors.log.include.messages": "true",
    "errors.retry.timeout": "300000",
    "errors.retry.delay.max.ms": "60000",
    "topics": "test-topic",
    "transforms.sinkAsAvro.type": "<classname>",
    "transforms.sinkAsAvro.batchsize": "10",
    "camel.aggregation.timeout": "360000",
    "transforms.sinkAsAvro.converter.type": "value",
    "camel.kamelet.azure-storage-datalake-sink.accountName": "",
    "camel.kamelet.azure-storage-datalake-sink.clientId": "",
    "camel.kamelet.azure-storage-datalake-sink.clientSecret": "",
    "camel.kamelet.azure-storage-datalake-sink.tenantId": "",
    "camel.kamelet.azure-storage-datalake-sink.fileSystemName": "",
    "camel.sink.header.file.extension": "avro",
    "camel.sink.header.file.location.partition.directory": "yes",
    "camel.sink.header.file.location.date.directory": "YYYY/MM/dd/HH/mm",
    "camel.sink.header.file.location.directory.path": "customDirectory1,dateDirectory,partitionDirectory",
    "camel.sink.header.file.location.custom.directory.1": "adls-offers-test-ms"
  }
}

SMT for aggragation:

@Override
    public R apply(R record) {
        LOG.trace("Entering apply method - Thread: {}", Thread.currentThread().getName());
            LOG.debug("Thread {} acquired lock for applying record.", Thread.currentThread().getName());
            LOG.debug("Received record with key {} and value {}", record.key(), record.value());
            buffer.add(record);
            LOG.debug("Buffer size after adding new record: {}", buffer.size());

            // Check if buffer has reached the batch size or timeout has elapsed
            if (buffer.size() >= batchSize) {
                LOG.debug("Buffer size met");

                R aggregatedRecord = (R) applyTransformation(buffer);
                LOG.debug("Aggregated record with value: {}", aggregatedRecord.value());
                buffer.clear();
                return aggregatedRecord;
            } 
            }

        LOG.trace("Exiting apply method - Thread: {}", Thread.currentThread().getName());
        return null; // Return null if no aggregation is performed
    }
oscerd commented 27 minutes ago

Please read the documentation: https://camel.apache.org/camel-kafka-connector/next/user-guide/aggregation.html