apache / camel-kafka-connector

Camel Kafka Connector allows you to use all Camel components as Kafka Connect connectors
https://camel.apache.org
Apache License 2.0
152 stars 101 forks source link

Delete file in ftp after processed #1535

Closed LinzeSu closed 1 year ago

LinzeSu commented 1 year ago

Hi all,

I'd like for some help on the configuration of the sftp connector. Basically I'm reading files from sftp server and forward it to kafka, and I want the files to be deleted after this process. In CAMEL-SFTP-SOURCE-KAFKA-CONNECTOR SOURCE CONFIGURATION there's no such setting. And in an example provided with version 0.11.5, there's similar settings that goes:

camel.source.endpoint.noop=false
camel.source.endpoint.move=.done

and the settings are still provided in CAMEL-FILE-KAFKA-CONNECTOR SOURCE CONFIGURATION. I set camel.source.endpoint.delete to true but it does not really work. I assume that the file source connetor is a more generic way and probably is the underlying layer of the ftp connector, and there should be some way that I could pass the config and make it work. Suggestions are appreciated.

oscerd commented 1 year ago

What version of the connector are you using?

LinzeSu commented 1 year ago

What version of the connector are you using?

Thanks for the lightning fast reply. I'm using version 3.20.3.

oscerd commented 1 year ago

In this case from 3.20.x we are basing the connectors on Kamelets. So the features are from: https://github.com/apache/camel-kamelets/blob/main/kamelets/sftp-source.kamelet.yaml

And there is no move or deletion in the Kamelets, so it must be add there and eventually we'll need a new release.

LinzeSu commented 1 year ago

Thanks. This feature is greatly in need cause when the number of files in the dir the connector reads exceeds 1000, the connector will repeatedly read the files, which I assume is an unwanted situation. I just came across this problem when doing connector upgrade and I found the info here:

Option to use the Idempotent Consumer EIP pattern to let Camel skip already processed files. 
Will by default use a memory based LRUCache that holds 1000 entries. 

Please let me know if this feature is added in the future release.

oscerd commented 1 year ago

idempotent is already enabled in the connector, so you shouldn't consume the same files multiple times. It should work, but it won't delete the files.

LinzeSu commented 1 year ago

Yeah, I know it's enabled by default and I even set it to true just to avoid misunderstanding. Still the connectors won't stop sending same file record to kafka when the number of file is greater than 1000. When there're files less than 1000 it worked fine. According to the documentation the connector only holds the latest 1000 names of the file it processed. So newly processed filename will replace the name in that LRUCache. Also, is there any specifc version that could be used to avoid this? Maybe a version that's not based on Kamelets?

oscerd commented 1 year ago

No, there is no specific version.

oscerd commented 1 year ago

This has been add here: https://github.com/apache/camel-kamelets/issues/1507

And it will be in 3.20.6

LinzeSu commented 1 year ago

That's great. Thanks oscerd.