apache / camel-kafka-connector

Camel Kafka Connector allows you to use all Camel components as Kafka Connect connectors
https://camel.apache.org
Apache License 2.0
152 stars 101 forks source link

idempotency support in Kafka AWS s3 source connector #1236

Closed arjun180 closed 3 years ago

arjun180 commented 3 years ago

I've been working on setting up a Kafka S3 source connector, where data needs to flow from an s3 bucket to a Kafka topic. I need to make sure that the data is retained in the s3 bucket. So I set this condition in my Kafka connector camel.component.aws2-s3.deleteAfterRead: false

Based on this page https://camel.apache.org/blog/2020/12/CKC-idempotency-070/ , my Kafka connector looks like this :

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: s3-source-connector
  namespace : my-kafka
  labels:
    strimzi.io/cluster: my-dev-kafka-connect-cluster
spec:
  class: org.apache.camel.kafkaconnector.aws2s3.CamelAws2s3SourceConnector
  tasksMax: 1
  config:
    topics: test-topic
    camel.source.path.bucketNameOrArn: my-kakfa-connect
    #camel.source.endpoint.useDefaultCredentials Provider: true
    camel.component.aws2-s3.useDefaultCredentialsProvider: true
    camel.component.aws2-s3.deleteAfterRead: false
    camel.source.endpoint.region: <region>
    camel.source.endpoint.prefix: 'source_folder/'
    camel.idempotency.enabled: true
    camel.idempotency.repository.type: kafka
    camel.idempotency.expression.type: body
    camel.idempotency.kafka.topic : my.idempotency.topic
    camel.idempotency.kafka.max.cache.size: 1500
    camel.idempotency.kafka.poll.duration.ms: 150

The error I get is this :

2021-07-21 04:59:46,527 WARN Error processing exchange. Exchange[AB7DE03FAF78DF0-00000000000630CC]. Caused by: [java.lang.IllegalStateException - Queue full] (org.apache.camel.component.aws2.s3.AWS2S3Consumer) [Camel (camel-35) thread #67 - aws2-s3://my-kakfa-connect]
java.lang.IllegalStateException: Queue full
    at java.base/java.util.AbstractQueue.add(AbstractQueue.java:98)
    at org.apache.camel.component.seda.SedaProducer.addToQueue(SedaProducer.java:251)
    at org.apache.camel.component.seda.SedaProducer.process(SedaProducer.java:149)
    at org.apache.camel.processor.SendDynamicProcessor.lambda$process$0(SendDynamicProcessor.java:197)
    at org.apache.camel.support.cache.DefaultProducerCache.doInAsyncProducer(DefaultProducerCache.java:318)
    at org.apache.camel.processor.SendDynamicProcessor.process(SendDynamicProcessor.java:182)
    at org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$SimpleTask.run(RedeliveryErrorHandler.java:439)
    at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:181)
    at org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:62)
    at org.apache.camel.processor.Pipeline.process(Pipeline.java:167)
    at org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:388)
    at org.apache.camel.component.aws2.s3.AWS2S3Consumer.processBatch(AWS2S3Consumer.java:289)
    at org.apache.camel.component.aws2.s3.AWS2S3Consumer.poll(AWS2S3Consumer.java:164)
    at org.apache.camel.support.ScheduledPollConsumer.doRun(ScheduledPollConsumer.java:190)
    at org.apache.camel.support.ScheduledPollConsumer.run(ScheduledPollConsumer.java:107)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)

I couldn't find too much information about this, especially because the Seda queue is mentioned. I was wondering if I could get any help on this?

oscerd commented 3 years ago

You need to either drain the kafka topic pointed by the kafka-idempotent-repository (my.idempotency.topic) or increase the camel.idempotency.kafka.max.cache.size to allow more entries into the queue.

arjun180 commented 3 years ago

Thanks @oscerd. I tried increasing the cache size, and I'm getting a strange error on the broker side of things. I'm wondering if it's got something to do with the Kafka Connect plugin or something.

2021-07-21 05:14:45,886 INFO Kafka version: 2.8.0 (org.apache.kafka.common.utils.AppInfoParser) [task-thread-s3-source-connector-0]
2021-07-21 05:14:45,886 INFO Kafka commitId: ebb1d6e21cc92130 (org.apache.kafka.common.utils.AppInfoParser) [task-thread-s3-source-connector-0]
2021-07-21 05:14:45,886 INFO Kafka startTimeMs: 1626844485886 (org.apache.kafka.common.utils.AppInfoParser) [task-thread-s3-source-connector-0]
2021-07-21 05:14:45,887 INFO Warming up cache from topic my.idempotency.topic (org.apache.camel.processor.idempotent.kafka.KafkaIdempotentRepository) [task-thread-s3-source-connector-0]
2021-07-21 05:14:45,888 INFO [Consumer clientId=consumer-7948c941-10d7-4120-82f2-e550ba0d477b-6, groupId=7948c941-10d7-4120-82f2-e550ba0d477b] Subscribed to topic(s): my.idempotency.topic (org.apache.kafka.clients.consumer.KafkaConsumer) [Camel (camel-38) thread #74 - KafkaIdempotentRepository]
2021-07-21 05:14:45,889 WARN [Producer clientId=producer-6] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) [kafka-producer-network-thread | producer-6]
2021-07-21 05:14:45,889 WARN [Producer clientId=producer-6] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient) [kafka-producer-network-thread | producer-6]
2021-07-21 05:14:45,889 WARN [Consumer clientId=consumer-7948c941-10d7-4120-82f2-e550ba0d477b-6, groupId=7948c941-10d7-4120-82f2-e550ba0d477b] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) [Camel (camel-38) thread #74 - KafkaIdempotentRepository]

The broker connection is not a problem without the idempotency configurations.

oscerd commented 3 years ago

Never seen something like this with idempotency. Can you please show the configuration?

arjun180 commented 3 years ago

Sure, my configuration looks like this. I realized that there was an option to add the broker config, so I did that as well :

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: s3-source-connector
  namespace :my-kakfa
  labels:
    strimzi.io/cluster: my-kafka-connect-cluster
spec:
  class: org.apache.camel.kafkaconnector.aws2s3.CamelAws2s3SourceConnector
  tasksMax: 1
  config:
    topics: my-test-topic
    camel.source.path.bucketNameOrArn: my-kakfa-connect
    #camel.source.endpoint.useDefaultCredentials Provider: true
    camel.component.aws2-s3.useDefaultCredentialsProvider: true
    camel.component.aws2-s3.deleteAfterRead: false
    camel.source.endpoint.region: <region>
    camel.source.endpoint.prefix: 'my-connect/'
    camel.idempotency.enabled: true
    camel.idempotency.repository.type: kafka
    camel.idempotency.expression.type: body
    camel.idempotency.kafka.topic : my.idempotency.topic
    camel.idempotency.kafka.max.cache.size: 30000
    camel.idempotency.kafka.poll.duration.ms: 150
    camel.idempotency.kafka.bootstrap.servers : my-dev.com:9093

The error from the connector looks like this

    at org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$SimpleTask.run(RedeliveryErrorHandler.java:439)
    at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:181)
    at org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:62)
    at org.apache.camel.processor.Pipeline.process(Pipeline.java:167)
    at org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:388)
    at org.apache.camel.component.aws2.s3.AWS2S3Consumer.processBatch(AWS2S3Consumer.java:289)
    at org.apache.camel.component.aws2.s3.AWS2S3Consumer.poll(AWS2S3Consumer.java:164)
    at org.apache.camel.support.ScheduledPollConsumer.doRun(ScheduledPollConsumer.java:190)
    at org.apache.camel.support.ScheduledPollConsumer.run(ScheduledPollConsumer.java:107)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)
2021-07-21 18:25:50,017 WARN [Producer clientId=producer-12] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) [kafka-producer-network-thread | producer-12]
2021-07-21 18:25:50,017 WARN [Producer clientId=producer-12] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient) [kafka-producer-network-thread | producer-12]
2021-07-21 18:25:50,044 WARN [Producer clientId=producer-15] Bootstrap broker my-dev.com:9093 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient) [kafka-producer-network-thread | producer-15]
2021-07-21 18:25:50,045 WARN [Consumer clientId=consumer-b3c6ac69-f646-4a35-88a3-086f914a64df-17, groupId=b3c6ac69-f646-4a35-88a3-086f914a64df] Bootstrap broker my-dev.com:9093 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient) [Camel (camel-49) thread #101 - KafkaIdempotentRepository]

It looks to be struggling to connect to my broker? But that's not been a problem before

oscerd commented 3 years ago

Where is mydev running? It should be reachable from the strimzi cluster, i guess

arjun180 commented 3 years ago

Yeah - mydev is just the advertised host name of my broker defined in the Strimzi Kafka configuration.

oscerd commented 3 years ago

If you remove the parameter from the conf, does it work?

arjun180 commented 3 years ago

Once I removed the bootstrap server parameter, it just tries to connect to local host. The log snippet below keeps getting repeated

kubectl logs -f my-kafka-connect-cluster-connect

    at org.apache.camel.component.aws2.s3.AWS2S3Consumer.processBatch(AWS2S3Consumer.java:289)
    at org.apache.camel.component.aws2.s3.AWS2S3Consumer.poll(AWS2S3Consumer.java:164)
    at org.apache.camel.support.ScheduledPollConsumer.doRun(ScheduledPollConsumer.java:190)
    at org.apache.camel.support.ScheduledPollConsumer.run(ScheduledPollConsumer.java:107)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)
2021-07-21 18:53:03,909 ERROR Failed delivery for (MessageId: AB7DE03FAF78DF0-00000000000E39D4 on ExchangeId: AB7DE03FAF78DF0-00000000000E39D4). Exhausted after delivery attempt: 1 caught: java.lang.IllegalStateException: Queue full

Message History (complete message history is disabled)
---------------------------------------------------------------------------------------------------------------------------------------
RouteId              ProcessorId          Processor                                                                        Elapsed (ms)
[route35           ] [route35           ] [from[aws2-s3://my-kakfa-connect?prefix=my-connect/&region=<region>] [        35]
    ...
[route35           ] [toD35             ] [seda:end?pollingConsumerQueueSize=1000&pollingConsumerBlockTimeout=0&pollingCo] [         0]

Stacktrace
---------------------------------------------------------------------------------------------------------------------------------------
 (org.apache.camel.processor.errorhandler.DefaultErrorHandler) [Camel (camel-35) thread #67 - aws2-s3://my-kakfa-connect]
java.lang.IllegalStateException: Queue full
    at java.base/java.util.AbstractQueue.add(AbstractQueue.java:98)
    at org.apache.camel.component.seda.SedaProducer.addToQueue(SedaProducer.java:251)
    at org.apache.camel.component.seda.SedaProducer.process(SedaProducer.java:149)
    at org.apache.camel.processor.SendDynamicProcessor.lambda$process$0(SendDynamicProcessor.java:197)
    at org.apache.camel.support.cache.DefaultProducerCache.doInAsyncProducer(DefaultProducerCache.java:318)
    at org.apache.camel.processor.SendDynamicProcessor.process(SendDynamicProcessor.java:182)
    at org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$SimpleTask.run(RedeliveryErrorHandler.java:439)
    at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:181)
    at org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:62)
    at org.apache.camel.processor.Pipeline.process(Pipeline.java:167)
    at org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:388)
    at org.apache.camel.component.aws2.s3.AWS2S3Consumer.processBatch(AWS2S3Consumer.java:289)
    at org.apache.camel.component.aws2.s3.AWS2S3Consumer.poll(AWS2S3Consumer.java:164)
    at org.apache.camel.support.ScheduledPollConsumer.doRun(ScheduledPollConsumer.java:190)
    at org.apache.camel.support.ScheduledPollConsumer.run(ScheduledPollConsumer.java:107)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)
2021-07-21 18:53:03,909 WARN [Consumer clientId=consumer-d850d938-f647-481c-a82e-315512b12218-11, groupId=d850d938-f647-481c-a82e-315512b12218] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) [Camel (camel-43) thread #89 - KafkaIdempotentRepository]
2021-07-21 18:53:03,909 WARN [Consumer clientId=consumer-d850d938-f647-481c-a82e-315512b12218-11, groupId=d850d938-f647-481c-a82e-315512b12218] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient) [Camel (camel-43) thread #89 - KafkaIdempotentRepository]
2021-07-21 18:53:03,909 WARN Error processing exchange. Exchange[AB7DE03FAF78DF0-00000000000E39D4]. Caused by: [java.lang.IllegalStateException - Queue full] (org.apache.camel.component.aws2.s3.AWS2S3Consumer) [Camel (camel-35) thread #67 - aws2-s3://my-kakfa-connect]
java.lang.IllegalStateException: Queue full
oscerd commented 3 years ago

I think you need to recreate the connector from the beginning, because the queue is still at 1500 elements. The camel context needs to be recreated.

arjun180 commented 3 years ago

So, even after I deleted the Kafka connector, my Kafka connect logs keep showing an error associated with the Kafka Idempotency repository. Is that expected?

2021-07-21 20:22:31,638 WARN [Consumer clientId=consumer-b3c6ac69-f646-4a35-88a3-086f914a64df-17, groupId=b3c6ac69-f646-4a35-88a3-086f914a64df] Bootstrap broker my-dev:9093 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient) [Camel (camel-49) thread #101 - KafkaIdempotentRepository]
oscerd commented 3 years ago

It's first time I see something like see. It's just a warning, but it's weird. Can you try the same configuration with a local cluster? Not Strimzi.

arjun180 commented 3 years ago

I haven't been able to try it with a local cluster. But I'll let you know if I do. To get my Kafka Connect cluster back - I had to tear it backdown and set it up again.

arjun180 commented 3 years ago

The error got removed once I changed the camel.idempotency.repository.type: kafka to camel.idempotency.repository.type: memory