apache / camel-kafka-connector

Camel Kafka Connector allows you to use all Camel components as Kafka Connect connectors
https://camel.apache.org
Apache License 2.0
151 stars 100 forks source link

Connector Kafka Idempotency Crash Resilience #1128

Closed PLCyr closed 3 years ago

PLCyr commented 3 years ago

Hi,

I'm new to Camel Kafka Connector. I configured a SFTP Connector in Kubernetes with download=false and the Kafka Topic idempotency feature enable. The idempotency work fine with the FileName as long as the Kafka Connect Pod is running. However after a connector restart or pod crash, all the files that are still on the SFTP server have newly generated events when the connector is up again. I was expecting the idempotency feature to re-consume the idempotency Kafka topic at offset 0 so it could have some kind of crash resilience. Same behavior with FTP / FTPS / SFTP connector.

Is it the expected behavior or a bug ?

Repo steps For my test, I uploaded a file called "Test_2.txt" in my SFTP Server. The SFTP connector saw it and resulted in one event in Topic and IdempotencyTopic. I deleted my Kafka Connect pod to simulate a crash. Upon restart, my Topic and IdempotencyTopic each have a second event which is a duplicate of the Test_2.txt.

Setup Windows 10 Pro with Kubernetes provided by Docker Desktop Confluent Kafka running in Kubernetes Camel Kafka Connector SFTP Version 0.8.0 running in Kubernetes

SFTP Connector Config "name": "sftp-connector", "connector.class": "org.apache.camel.kafkaconnector.sftp.CamelSftpSourceConnector", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter", "transforms": "RemoteTransformer", "transforms.RemoteTransformer.type": "org.apache.camel.kafkaconnector.sftp.transformers.SftpRemoteFileTransforms", "camel.idempotency.enabled": "true", "camel.idempotency.expression.type": "header", "camel.idempotency.expression.header": "CamelFileNameConsumed", "camel.idempotency.repository.type": "kafka", "camel.idempotency.kafka.bootstrap.servers": "confluent-kafka-cp-kafka:9092", "camel.idempotency.kafka.topic": "SFTP-Persistance-Topic", "camel.source.endpoint.recursive": "true", "camel.source.path.host": "sftp.sftp", "camel.source.path.port": "22", "camel.source.endpoint.username": "foo", "camel.source.endpoint.password": "pass", "camel.source.endpoint.include": ".*.txt", "camel.source.path.directoryName": "data/", "camel.source.endpoint.download": "false", "camel.source.endpoint.noop": "true", "camel.source.endpoint.binary": "false", "camel.source.contentLogLevel": "INFO", "camel.source.endpoint.runLoggingLevel": "INFO", "topics": "SFTP-Topic"

Log at connector restart [2021-03-29 15:03:27,822] INFO Creating Camel route from(sftp:sftp.sftp:22/data/?binary=false&download=false&include=.*.txt&noop=true&password=RAW(pass)&recursive=true&runLoggingLevel=INFO&username=RAW(foo)) (org.apache.camel.kafkaconnector.utils.CamelKafkaConnectMain) [2021-03-29 15:03:27,824] INFO idempotentConsumer(header(expressionHeader), MemoryIdempotentRepository.memoryIdempotentRepository(100)).to(direct:end?pollingConsumerQueueSize=1000&pollingConsumerBlockTimeout=0&pollingConsumerBlockWhenFull=true) (org.apache.camel.kafkaconnector.utils.CamelKafkaConnectMain) [2021-03-29 15:03:27,943] INFO Endpoint is configured with noop=true so forcing endpoint to be idempotent as well (org.apache.camel.component.file.remote.RemoteFileEndpoint) [2021-03-29 15:03:27,944] INFO Using default memory based idempotent repository with cache max size: 1000 (org.apache.camel.component.file.remote.RemoteFileEndpoint) [2021-03-29 15:03:27,954] INFO ConsumerConfig values: allow.auto.create.topics = true auto.commit.interval.ms = 5000 auto.offset.reset = latest bootstrap.servers = [confluent-kafka-cp-kafka:9092] check.crcs = true client.dns.lookup = use_all_dns_ips client.id = consumer-3efa9c9e-3694-451d-a70b-9690bba7fa4f-4 client.rack = connections.max.idle.ms = 540000 default.api.timeout.ms = 60000 enable.auto.commit = true exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = 3efa9c9e-3694-451d-a70b-9690bba7fa4f group.instance.id = null heartbeat.interval.ms = 3000 interceptor.classes = [] internal.leave.group.on.close = true internal.throw.on.fetch.stable.offset.unsupported = false isolation.level = read_uncommitted key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 300000 max.poll.records = 500 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 65536 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 30000 retry.backoff.ms = 100 sasl.client.callback.handler.class = null sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.login.callback.handler.class = null sasl.login.class = null sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 0.05 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT security.providers = null send.buffer.bytes = 131072 session.timeout.ms = 10000 socket.connection.setup.timeout.max.ms = 127000 socket.connection.setup.timeout.ms = 10000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.3] ssl.endpoint.identification.algorithm = https ssl.engine.factory.class = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.certificate.chain = null ssl.keystore.key = null ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLSv1.3 ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.certificates = null ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer (org.apache.kafka.clients.consumer.ConsumerConfig) [2021-03-29 15:03:27,975] INFO Kafka version: 6.1.0-ccs (org.apache.kafka.common.utils.AppInfoParser) [2021-03-29 15:03:27,975] INFO Kafka commitId: 5496d92defc9bbe4 (org.apache.kafka.common.utils.AppInfoParser) [2021-03-29 15:03:27,975] INFO Kafka startTimeMs: 1617030207975 (org.apache.kafka.common.utils.AppInfoParser) [2021-03-29 15:03:27,975] INFO ProducerConfig values: acks = 1 batch.size = 0 bootstrap.servers = [confluent-kafka-cp-kafka:9092] buffer.memory = 33554432 client.dns.lookup = use_all_dns_ips client.id = producer-4 compression.type = none connections.max.idle.ms = 540000 delivery.timeout.ms = 120000 enable.idempotence = false interceptor.classes = [] internal.auto.downgrade.txn.commit = false key.serializer = class org.apache.kafka.common.serialization.StringSerializer linger.ms = 0 max.block.ms = 60000 max.in.flight.requests.per.connection = 5 max.request.size = 1048576 metadata.max.age.ms = 300000 metadata.max.idle.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner receive.buffer.bytes = 32768 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 30000 retries = 2147483647 retry.backoff.ms = 100 sasl.client.callback.handler.class = null sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.login.callback.handler.class = null sasl.login.class = null sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 0.05 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT security.providers = null send.buffer.bytes = 131072 socket.connection.setup.timeout.max.ms = 127000 socket.connection.setup.timeout.ms = 10000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.3] ssl.endpoint.identification.algorithm = https ssl.engine.factory.class = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.certificate.chain = null ssl.keystore.key = null ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLSv1.3 ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.certificates = null ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS transaction.timeout.ms = 60000 transactional.id = null value.serializer = class org.apache.kafka.common.serialization.StringSerializer (org.apache.kafka.clients.producer.ProducerConfig) [2021-03-29 15:03:27,979] INFO Kafka version: 6.1.0-ccs (org.apache.kafka.common.utils.AppInfoParser) [2021-03-29 15:03:27,980] INFO Kafka commitId: 5496d92defc9bbe4 (org.apache.kafka.common.utils.AppInfoParser) [2021-03-29 15:03:27,980] INFO Kafka startTimeMs: 1617030207979 (org.apache.kafka.common.utils.AppInfoParser) [2021-03-29 15:03:27,984] INFO SourceConnectorConfig values: config.action.reload = restart connector.class = org.apache.camel.kafkaconnector.sftp.CamelSftpSourceConnector errors.log.enable = false errors.log.include.messages = false errors.retry.delay.max.ms = 60000 errors.retry.timeout = 0 errors.tolerance = none header.converter = null key.converter = class org.apache.kafka.connect.storage.StringConverter name = sftp-connector predicates = [] tasks.max = 1 topic.creation.groups = [] transforms = [RemoteTransformer] value.converter = class org.apache.kafka.connect.converters.ByteArrayConverter (org.apache.kafka.connect.runtime.SourceConnectorConfig) [2021-03-29 15:03:27,986] INFO Warming up cache from topic SFTP-Persistance-Topic (org.apache.camel.processor.idempotent.kafka.KafkaIdempotentRepository) [2021-03-29 15:03:27,986] INFO EnrichedConnectorConfig values: config.action.reload = restart connector.class = org.apache.camel.kafkaconnector.sftp.CamelSftpSourceConnector errors.log.enable = false errors.log.include.messages = false errors.retry.delay.max.ms = 60000 errors.retry.timeout = 0 errors.tolerance = none header.converter = null key.converter = class org.apache.kafka.connect.storage.StringConverter name = sftp-connector predicates = [] tasks.max = 1 topic.creation.groups = [] transforms = [RemoteTransformer] transforms.RemoteTransformer.key = null transforms.RemoteTransformer.negate = false transforms.RemoteTransformer.predicate = transforms.RemoteTransformer.type = class org.apache.camel.kafkaconnector.sftp.transformers.SftpRemoteFileTransforms value.converter = class org.apache.kafka.connect.converters.ByteArrayConverter (org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig) [2021-03-29 15:03:27,987] INFO Setting task configurations for 1 workers. (org.apache.camel.kafkaconnector.CamelSourceConnector) [2021-03-29 15:03:27,990] INFO [Consumer clientId=consumer-3efa9c9e-3694-451d-a70b-9690bba7fa4f-4, groupId=3efa9c9e-3694-451d-a70b-9690bba7fa4f] Subscribed to topic(s): SFTP-Persistance-Topic (org.apache.kafka.clients.consumer.KafkaConsumer) [2021-03-29 15:03:28,001] INFO [Consumer clientId=consumer-3efa9c9e-3694-451d-a70b-9690bba7fa4f-4, groupId=3efa9c9e-3694-451d-a70b-9690bba7fa4f] Cluster ID: fFQtkPXrT92v270crRLidQ (org.apache.kafka.clients.Metadata) [2021-03-29 15:03:28,001] INFO [Consumer clientId=consumer-3efa9c9e-3694-451d-a70b-9690bba7fa4f-4, groupId=3efa9c9e-3694-451d-a70b-9690bba7fa4f] Discovered group coordinator confluent-kafka-cp-kafka-2.confluent-kafka-cp-kafka-headless.kafka:9092 (id: 2147483645 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [2021-03-29 15:03:28,004] INFO [Producer clientId=producer-4] Cluster ID: fFQtkPXrT92v270crRLidQ (org.apache.kafka.clients.Metadata) [2021-03-29 15:03:28,006] INFO [Consumer clientId=consumer-3efa9c9e-3694-451d-a70b-9690bba7fa4f-4, groupId=3efa9c9e-3694-451d-a70b-9690bba7fa4f] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [2021-03-29 15:03:28,074] INFO [Consumer clientId=consumer-3efa9c9e-3694-451d-a70b-9690bba7fa4f-4, groupId=3efa9c9e-3694-451d-a70b-9690bba7fa4f] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [2021-03-29 15:03:31,096] INFO [Consumer clientId=consumer-3efa9c9e-3694-451d-a70b-9690bba7fa4f-4, groupId=3efa9c9e-3694-451d-a70b-9690bba7fa4f] Successfully joined group with generation Generation{generationId=1, memberId='consumer-3efa9c9e-3694-451d-a70b-9690bba7fa4f-4-124f20c5-6776-424e-abfe-ec07081e230a', protocol='range'} (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [2021-03-29 15:03:31,100] INFO [Consumer clientId=consumer-3efa9c9e-3694-451d-a70b-9690bba7fa4f-4, groupId=3efa9c9e-3694-451d-a70b-9690bba7fa4f] Finished assignment for group at generation 1: {consumer-3efa9c9e-3694-451d-a70b-9690bba7fa4f-4-124f20c5-6776-424e-abfe-ec07081e230a=Assignment(partitions=[SFTP-Persistance-Topic-0])} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [2021-03-29 15:03:31,142] INFO [Consumer clientId=consumer-3efa9c9e-3694-451d-a70b-9690bba7fa4f-4, groupId=3efa9c9e-3694-451d-a70b-9690bba7fa4f] Successfully synced group in generation Generation{generationId=1, memberId='consumer-3efa9c9e-3694-451d-a70b-9690bba7fa4f-4-124f20c5-6776-424e-abfe-ec07081e230a', protocol='range'} (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [2021-03-29 15:03:31,142] INFO [Consumer clientId=consumer-3efa9c9e-3694-451d-a70b-9690bba7fa4f-4, groupId=3efa9c9e-3694-451d-a70b-9690bba7fa4f] Notifying assignor about the new Assignment(partitions=[SFTP-Persistance-Topic-0]) (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [2021-03-29 15:03:31,142] INFO [Consumer clientId=consumer-3efa9c9e-3694-451d-a70b-9690bba7fa4f-4, groupId=3efa9c9e-3694-451d-a70b-9690bba7fa4f] Adding newly assigned partitions: SFTP-Persistance-Topic-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [2021-03-29 15:03:31,149] INFO [Consumer clientId=consumer-3efa9c9e-3694-451d-a70b-9690bba7fa4f-4, groupId=3efa9c9e-3694-451d-a70b-9690bba7fa4f] Found no committed offset for partition SFTP-Persistance-Topic-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [2021-03-29 15:03:31,153] INFO Cache OK (org.apache.camel.processor.idempotent.kafka.KafkaIdempotentRepository) [2021-03-29 15:03:31,164] INFO Known host file not configured, using user known host file: /home/appuser/.ssh/known_hosts (org.apache.camel.component.file.remote.SftpOperations) [2021-03-29 15:03:31,166] INFO [Consumer clientId=consumer-3efa9c9e-3694-451d-a70b-9690bba7fa4f-4, groupId=3efa9c9e-3694-451d-a70b-9690bba7fa4f] Resetting offset for partition SFTP-Persistance-Topic-0 to position FetchPosition{offset=1, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[confluent-kafka-cp-kafka-1.confluent-kafka-cp-kafka-headless.kafka:9092 (id: 1 rack: null)], epoch=0}}. (org.apache.kafka.clients.consumer.internals.SubscriptionState) [2021-03-29 15:03:31,713] WARN JSCH -> Permanently added 'sftp.sftp' (RSA) to the list of known hosts. (org.apache.camel.component.file.remote.SftpOperations) [2021-03-29 15:03:31,713] WARN Server asks for confirmation (yes|no): /home/appuser/.ssh/known_hosts does not exist. Are you sure you want to create it?. Camel will answer no. (org.apache.camel.component.file.remote.SftpOperations) [2021-03-29 15:03:32,037] INFO Routes startup summary (total:1 started:1) (org.apache.camel.impl.engine.AbstractCamelContext) [2021-03-29 15:03:32,038] INFO Started route1 (sftp://sftp.sftp:22/data/) (org.apache.camel.impl.engine.AbstractCamelContext) [2021-03-29 15:03:32,038] INFO Apache Camel 3.8.0 (camel-1) started in 4s306ms (build:98ms init:117ms start:4s91ms) (org.apache.camel.impl.engine.AbstractCamelContext) [2021-03-29 15:03:32,039] INFO CamelSourceTask connector task started (org.apache.camel.kafkaconnector.CamelSourceTask) [2021-03-29 15:03:32,040] INFO WorkerSourceTask{id=sftp-connector-0} Source task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSourceTask) [2021-03-29 15:03:33,035] INFO Scheduled task started on: sftp://sftp.sftp:22/data/?binary=false&download=false&include=..txt&noop=true&password=xxxxxx (org.apache.camel.support.ScheduledPollConsumer) [2021-03-29 15:03:33,079] INFO SourceRecord{sourcePartition={filename=sftp://sftp.sftp:22/data/?binary=false&download=false&include=..txt&noop=true&password=xxxxxx}, sourceOffset={position=9F81987AE7F9AE2-0000000000000000}} ConnectRecord{topic='SFTP-Topic', kafkaPartition=null, key=null, keySchema=null, value=null, valueSchema=null, timestamp=1617030213076, headers=ConnectHeaders(headers=[ConnectHeader(key=CamelHeader.CamelFileAbsolute, value=false, schema=Schema{BOOLEAN}), ConnectHeader(key=CamelHeader.CamelFileAbsolutePath, value=data/Test_2.txt, schema=Schema{STRING}), ConnectHeader(key=CamelHeader.CamelFileHost, value=sftp.sftp, schema=Schema{STRING}), ConnectHeader(key=CamelHeader.CamelFileLastModified, value=1616182842000, schema=Schema{INT64}), ConnectHeader(key=CamelHeader.CamelFileLength, value=72, schema=Schema{INT64}), ConnectHeader(key=CamelHeader.CamelFileName, value=Test_2.txt, schema=Schema{STRING}), ConnectHeader(key=CamelHeader.CamelFileNameConsumed, value=Test_2.txt, schema=Schema{STRING}), ConnectHeader(key=CamelHeader.CamelFileNameOnly, value=Test_2.txt, schema=Schema{STRING}), ConnectHeader(key=CamelHeader.CamelFileParent, value=data, schema=Schema{STRING}), ConnectHeader(key=CamelHeader.CamelFilePath, value=data//Test_2.txt, schema=Schema{STRING}), ConnectHeader(key=CamelHeader.CamelFileRelativePath, value=Test_2.txt, schema=Schema{STRING}), ConnectHeader(key=CamelProperty.CamelBatchSize, value=1, schema=Schema{INT32}), ConnectHeader(key=CamelProperty.CamelUnitOfWorkProcessSync, value=true, schema=Schema{BOOLEAN}), ConnectHeader(key=CamelProperty.CamelBatchComplete, value=true, schema=Schema{BOOLEAN}), ConnectHeader(key=CamelProperty.CamelBatchIndex, value=0, schema=Schema{INT32}), ConnectHeader(key=CamelProperty.CamelToEndpoint, value=direct://end?pollingConsumerBlockTimeout=0&pollingConsumerBlockWhenFull=true&pollingConsumerQueueSize=1000, schema=Schema{STRING})])} (org.apache.camel.kafkaconnector.CamelSourceTask) [2021-03-29 15:03:33,080] INFO Scheduled task completed on: sftp://sftp.sftp:22/data/?binary=false&download=false&include=.*.txt&noop=true&password=xxxxxx (org.apache.camel.support.ScheduledPollConsumer)

oscerd commented 3 years ago

Try to set "camel.source.endpoint.idempotent" to false, because the noop = true implies:

If true, the file is not moved or deleted in any way. This option is good for readonly data, or for ETL type requirements. If noop=true, Camel will set idempotent=true as well, to avoid consuming the same files over and over again.

So the memory idempotent take the precedence. Leave the configuration as is and add "camel.source.endpoint.idempotent" to false

PLCyr commented 3 years ago

I do want to use Kafka idempotence, so I test to set noop=false. If I retry to kill the pod, the connector isn't rebuilding is idempotence from the idempoten topic in Kafka. It could be the expected behavior or a bug, just want someone to confirm it

oscerd commented 3 years ago

You need to set it to false. In that way the kafka idempotency Will be used. Otherwise the component in memory idempotent repo will be used. Just set the option I said to false and it should work.

PLCyr commented 3 years ago

Sorry, I had misread "camel.source.endpoint.idempotent" for "camel.idempotency.enabled". Just tested "camel.source.endpoint.idempotent" to false and noop = true, same behavior

The log still show a hint that Memory Idempotent Repo is used [2021-03-29 17:27:47,943] INFO Setting initial properties in Camel context: [[connector.class=org.apache.camel.kafkaconnector.sftp.CamelSftpSourceConnector, camel.source.endpoint.runLoggingLevel=INFO, camel.idempotency.enabled=true, transforms=RemoteTransformer, camel.idempotency.expression.header=CamelFileNameConsumed, camel.source.endpoint.download=false, camel.source.endpoint.noop=true, key.converter=org.apache.kafka.connect.storage.StringConverter, camel.source.contentLogLevel=INFO, transforms.RemoteTransformer.type=org.apache.camel.kafkaconnector.sftp.transformers.SftpRemoteFileTransforms, camel.source.endpoint.include=..txt, camel.source.endpoint.password=pass, name=sftp-connector, camel.source.path.directoryName=data/, camel.idempotency.kafka.topic=SFTP-Persistance-Topic, camel.source.endpoint.username=foo, camel.source.component=sftp, camel.source.path.host=sftp.sftp, camel.idempotency.repository.type=kafka, camel.idempotency.kafka.bootstrap.servers=confluent-kafka-cp-kafka:9092, value.converter=org.apache.kafka.connect.converters.ByteArrayConverter, camel.idempotency.expression.type=header, camel.source.endpoint.recursive=true, topics=SFTP-Topic, camel.source.path.port=22, camel.source.endpoint.idempotent=false, task.class=org.apache.camel.kafkaconnector.sftp.CamelSftpSourceTask, camel.source.endpoint.binary=false]] (org.apache.camel.kafkaconnector.utils.CamelKafkaConnectMain) [2021-03-29 17:27:47,944] INFO [Producer clientId=connector-producer-sftp-connector-0] Cluster ID: fFQtkPXrT92v270crRLidQ (org.apache.kafka.clients.Metadata) [2021-03-29 17:27:47,961] INFO Creating Camel route from(sftp:sftp.sftp:22/data/?binary=false&download=false&idempotent=false&include=..txt&noop=true&password=RAW(pass)&recursive=true&runLoggingLevel=INFO&username=RAW(foo)) (org.apache.camel.kafkaconnector.utils.CamelKafkaConnectMain) [2021-03-29 17:27:47,963] INFO idempotentConsumer(header(expressionHeader), MemoryIdempotentRepository.memoryIdempotentRepository(100)).to(direct:end?pollingConsumerQueueSize=1000&pollingConsumerBlockTimeout=0&pollingConsumerBlockWhenFull=true) (org.apache.camel.kafkaconnector.utils.CamelKafkaConnectMain)

oscerd commented 3 years ago

Can you please try adding the camel.source.endpoint.initialDelay to a bigger value? like for example 30000 or 40000?

PLCyr commented 3 years ago

Yes, got the same behavior ... The most interesting line in the log I think is this one

[2021-03-30 13:29:56,087] INFO [Consumer clientId=consumer-30bd6ecd-fd2f-4f07-b02f-6eb94c44a7de-6, groupId=30bd6ecd-fd2f-4f07-b02f-6eb94c44a7de] Resetting offset for partition SFTP-Persistance-Topic-0 to position FetchPosition{offset=10, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[confluent-kafka-cp-kafka-1.confluent-kafka-cp-kafka-headless.kafka:9092 (id: 1 rack: null)], epoch=2}}. (org.apache.kafka.clients.consumer.internals.SubscriptionState)

Shouldn't the offset be resetted to 0 since it's a new consumer group id each time the connector restart ?

oscerd commented 3 years ago

Now I noticed this line in your description

"I was expecting the idempotency feature to re-consume the idempotency Kafka topic at offset 0 so it could have some kind of crash resilience."

Basically the idempotency we are using is a camel concept. When you stop a consumer (camel consumer) with a kafka idempotent repository, the offset will be saved. When you restart the connector it will restart from that offset. This is mainly because the sftp component and the others works with just one consumer. In camel to support mutliple consumers we need to do some work on single component and it's not feasible on all the components, another possibility is to use some kind of lock mechanism with camel-master component.

Sorry I didn't notice your sentence there. The behavior is normal from a Camel perspective.

PLCyr commented 3 years ago

That was my main question !

From my perspective, if the connector could re-read all event in the idempotence topic after a crash, it'll be more resilient. If not, in event of a crash we are exposed to duplicate events if we use noop=true without renaming the files in the SFTP server.

You can confirm that Camel Kafka Connector are not expected to have this kind of resilience ?

valdar commented 3 years ago

@PLCyr what you are experiencing is not the intended behavior. We have reviewed a recent fix on an issue that part and done some additional testing and unfortunately we can not reproduce your issue yet.

Can you give us some details on you environment? kafka version, how is deployed on k8s, how is the pod defined and how you restart it?

In addition a log with org.apache.camel.processor.idempotent.kafka logging level set to TRACE would help as well.

valdar commented 3 years ago

@PLCyr it turned out possibly the solution is much simpler than envisioned: the fix we are talking about is not in camel 3.8.0 that camel-kafka-connecotr 0.8.0 is using. We are in the process of releasing camel-kafka-connector 0.9.0 based on camel 3.9.0 that should contain the fix.

It should be ready in a couple of days. I will ping you and you can try if the new version solves your problem.

valdar commented 3 years ago

@PLCyr actually the fix should be included already in camel-kafka-connecotr 0.7.3 (it my seems counter intuitive but due to how camel version are intended to be maintained makes sense) sftp connector is here: https://repo1.maven.org/maven2/org/apache/camel/kafkaconnector/camel-sftp-kafka-connector/0.7.3/

PLCyr commented 3 years ago

I did a test with camel-kafka-connector 0.7.3 and the behavior is good. The idempotent topic is re-read from offset 0 after a connector crash (pod crash) or connector configuration change. These lines in the log show the difference

[2021-04-01 19:20:00,722] INFO [Consumer clientId=consumer-473c5e5a-e3e9-438e-8f92-8ae811cd2214-5, groupId=473c5e5a-e3e9-438e-8f92-8ae811cd2214] Seeking to EARLIEST offset of partition SFTP-Persistance-Topic-0 (org.apache.kafka.clients.consumer.internals.SubscriptionState) [2021-04-01 19:20:00,729] INFO [Consumer clientId=consumer-473c5e5a-e3e9-438e-8f92-8ae811cd2214-5, groupId=473c5e5a-e3e9-438e-8f92-8ae811cd2214] Resetting offset for partition SFTP-Persistance-Topic-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[confluent-kafka-cp-kafka-2.confluent-kafka-cp-kafka-headless.kafka:9092 (id: 2 rack: null)], epoch=0}}. (org.apache.kafka.clients.consumer.internals.SubscriptionState) [2021-04-01 19:20:00,836] INFO Cache OK (org.apache.camel.processor.idempotent.kafka.KafkaIdempotentRepository)

It's a good thing this fix will be include in the incoming camel-kafka-connector 0.9.0 !

PLCyr commented 3 years ago

Another related question, is the "camel.idempotency.expression.header" can support a composite header expression ?

oscerd commented 3 years ago

Yeah, this was something we didn't consider in the beginning. We didn't consider the LTS.

You mean combining multiple headers in an expression?

PLCyr commented 3 years ago

Yes multiple header, like combining the CamelHeader.CamelFileAbsolutePath and CamelHeader.CamelFileLastModified for the idempotent expression check

oscerd commented 3 years ago

No, it's not possible.

valdar commented 3 years ago

@PLCyr I am closing this issue. Feel free to open another one to discuss the "combine header expressions" feature if you like.